Skip to content

Commit

Permalink
added skipping for mail without attachments and already processed mail
Browse files Browse the repository at this point in the history
  • Loading branch information
christianlouis committed Feb 14, 2025
1 parent 230c59f commit 99684f9
Showing 1 changed file with 89 additions and 72 deletions.
161 changes: 89 additions & 72 deletions app/tasks/imap_tasks.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,81 @@
#!/usr/bin/env python3

import os
import time
import json
import email
import imaplib
import tempfile
import logging
from datetime import datetime, timedelta

from celery import shared_task
from app.config import settings

# Import your pipeline's first-step task:
from app.tasks.upload_to_s3 import upload_to_s3

logger = logging.getLogger(__name__)

# We'll keep track of the last poll time in a dictionary (mailbox -> datetime).
# This is in-memory only, so if you have multiple workers or restarts, it won't persist.
LAST_POLL_TIMES = {
"imap1": None,
"imap2": None,
}
# Local cache file for tracking processed emails
CACHE_FILE = os.path.join(settings.workdir, "processed_mails.json")


def load_processed_emails():
"""Load the list of already processed emails from a local JSON file."""
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, "r") as f:
return json.load(f)
except json.JSONDecodeError:
logger.warning("Failed to decode JSON, resetting processed emails cache.")
return {}
return {}


def save_processed_emails(processed_emails):
"""Save the processed email IDs to a local JSON file."""
with open(CACHE_FILE, "w") as f:
json.dump(processed_emails, f, indent=4)


def cleanup_old_entries(processed_emails):
"""Remove entries older than 7 days from the cache to avoid infinite growth."""
seven_days_ago = datetime.utcnow() - timedelta(days=7)
return {
msg_id: date
for msg_id, date in processed_emails.items()
if datetime.strptime(date, "%Y-%m-%dT%H:%M:%S") > seven_days_ago
}


@shared_task
def pull_all_inboxes():
"""
This task is run periodically (e.g., every minute).
It checks each IMAP config, sees if it's time to poll (based on poll interval),
and if so, fetches attachments.
Periodic task that checks all configured IMAP mailboxes
and fetches attachments from new emails.
"""
logger.info("pull_all_inboxes - starting")
logger.info("Starting pull_all_inboxes")

# Check mailbox #1
# Process Mailbox #1
check_and_pull_mailbox(
mailbox_key="imap1",
host=settings.imap1_host,
port=settings.imap1_port,
username=settings.imap1_username,
password=settings.imap1_password,
use_ssl=settings.imap1_ssl,
poll_interval=settings.imap1_poll_interval_minutes,
delete_after_process=settings.imap1_delete_after_process,
)

# Check mailbox #2
# Process Mailbox #2
check_and_pull_mailbox(
mailbox_key="imap2",
host=settings.imap2_host,
port=settings.imap2_port,
username=settings.imap2_username,
password=settings.imap2_password,
use_ssl=settings.imap2_ssl,
poll_interval=settings.imap2_poll_interval_minutes,
delete_after_process=settings.imap2_delete_after_process,
)

logger.info("pull_all_inboxes - done")
logger.info("Finished pull_all_inboxes")


def check_and_pull_mailbox(
Expand All @@ -67,33 +85,23 @@ def check_and_pull_mailbox(
username: str | None,
password: str | None,
use_ssl: bool,
poll_interval: int,
delete_after_process: bool,
):
"""Check if it's time to poll this mailbox. If so, do it."""
# If not configured, skip
"""Check and pull new emails from a given mailbox."""
if not (host and port and username and password):
logger.warning(f"Mailbox {mailbox_key} is missing config, skipping.")
return

# If we never polled before, we can do it immediately.
last_poll = LAST_POLL_TIMES.get(mailbox_key)
now = datetime.utcnow()

if last_poll is None or now - last_poll >= timedelta(minutes=poll_interval):
logger.info(f"Time to poll mailbox {mailbox_key}!")
pull_inbox(
mailbox_key=mailbox_key,
host=host,
port=port,
username=username,
password=password,
use_ssl=use_ssl,
delete_after_process=delete_after_process
)
LAST_POLL_TIMES[mailbox_key] = now
else:
logger.debug(f"Skipping {mailbox_key}, not due yet.")
logger.info(f"Checking mailbox: {mailbox_key}")
pull_inbox(
mailbox_key=mailbox_key,
host=host,
port=port,
username=username,
password=password,
use_ssl=use_ssl,
delete_after_process=delete_after_process,
)


def pull_inbox(
Expand All @@ -106,10 +114,11 @@ def pull_inbox(
delete_after_process: bool
):
"""
Connect to the IMAP inbox, fetch UNSEEN messages, look for attachments,
and enqueue them to the pipeline. Then either delete or mark read.
Connects to the IMAP inbox, fetches new emails (last 7 days),
and processes attachments without marking emails as read.
"""
logger.info(f"Connecting to {mailbox_key} at {host}:{port} (SSL={use_ssl})")
processed_emails = load_processed_emails()

try:
if use_ssl:
Expand All @@ -120,37 +129,49 @@ def pull_inbox(
mail.login(username, password)
mail.select("INBOX")

# fetch only unseen messages
status, search_data = mail.search(None, "(UNSEEN)")
# Fetch emails from the last 7 days
since_date = (datetime.utcnow() - timedelta(days=7)).strftime("%d-%b-%Y")
status, search_data = mail.search(None, f'(SINCE {since_date})')
if status != "OK":
logger.warning(f"Search failed on mailbox {mailbox_key}. Status={status}")
mail.close()
mail.logout()
return

msg_numbers = search_data[0].split()
logger.info(f"Found {len(msg_numbers)} new messages in {mailbox_key}.")
logger.info(f"Found {len(msg_numbers)} emails from the last 7 days in {mailbox_key}.")

for num in msg_numbers:
# fetch the full RFC822 message
status, msg_data = mail.fetch(num, "(RFC822)")
if status != "OK":
logger.warning(f"Failed to fetch message {num} in {mailbox_key}. Status={status}")
continue

raw_email = msg_data[0][1]
email_message = email.message_from_bytes(raw_email)
msg_id = email_message.get("Message-ID")

# Extract attachments and enqueue
fetch_attachments_and_enqueue(email_message)
if not msg_id:
logger.warning(f"Skipping email without Message-ID in {mailbox_key}")
continue

# Mark read or delete
if delete_after_process:
logger.info(f"Deleting message {num.decode()} from {mailbox_key}")
mail.store(num, "+FLAGS", "\\Deleted")
else:
logger.info(f"Marking message {num.decode()} as seen in {mailbox_key}")
mail.store(num, "+FLAGS", "\\Seen")
# Skip if already processed
if msg_id in processed_emails:
logger.info(f"Skipping already processed email {msg_id} in {mailbox_key}")
continue

# Process attachments
has_attachment = fetch_attachments_and_enqueue(email_message)

# If an attachment was found, store in cache
if has_attachment:
processed_emails[msg_id] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
save_processed_emails(cleanup_old_entries(processed_emails))

# Delete email if configured
if delete_after_process:
logger.info(f"Deleting message {num.decode()} from {mailbox_key}")
mail.store(num, "+FLAGS", "\\Deleted")

if delete_after_process:
mail.expunge()
Expand All @@ -165,39 +186,35 @@ def pull_inbox(

def fetch_attachments_and_enqueue(email_message):
"""
Iterate over parts of the email, extract PDF attachments,
save them to settings.workdir/tmp, and then enqueue
the pipeline with upload_to_s3.delay().
Extracts PDF attachments from an email and enqueues them for processing.
Returns:
bool: True if an attachment was found, False otherwise.
"""
has_attachment = False

for part in email_message.walk():
content_type = part.get_content_type()
filename = part.get_filename()

if part.get_content_maintype() == 'multipart':
continue # skip container parts
continue # Skip container parts
if not filename:
continue # skip if there's no filename
continue # Skip if there's no filename

# Example: only PDF attachments
# Process PDF attachments only
if content_type == "application/pdf":
logger.info(f"Found PDF attachment: {filename}")
has_attachment = True

# Create a path in tmp with the original filename
# Save to temporary directory
file_path = os.path.join(settings.workdir, filename)

# If a file with that name exists, we could rename it, but let's just overwrite
# or create a unique name. For simplicity:
with open(file_path, "wb") as f:
f.write(part.get_payload(decode=True))

logger.info(f"Saved attachment to {file_path}")

# Now enqueue the pipeline's first step:
# E.g.: upload_to_s3 -> process_with_textract -> refine_text_with_gpt, etc.
# Enqueue for upload
upload_to_s3.delay(file_path)

# If you prefer to *only* do the PDF embedding steps, you'd queue that first, etc.
# Or if you want to do the entire pipeline that you do in /process/ endpoint,
# you can replicate that chain here.

# End fetch_attachments_and_enqueue
return has_attachment

0 comments on commit 99684f9

Please sign in to comment.