diff --git a/app/tasks/imap_tasks.py b/app/tasks/imap_tasks.py index 8c1a1d1..3c210c6 100644 --- a/app/tasks/imap_tasks.py +++ b/app/tasks/imap_tasks.py @@ -1,39 +1,59 @@ #!/usr/bin/env python3 import os -import time +import json import email import imaplib -import tempfile import logging from datetime import datetime, timedelta from celery import shared_task from app.config import settings - -# Import your pipeline's first-step task: from app.tasks.upload_to_s3 import upload_to_s3 logger = logging.getLogger(__name__) -# We'll keep track of the last poll time in a dictionary (mailbox -> datetime). -# This is in-memory only, so if you have multiple workers or restarts, it won't persist. -LAST_POLL_TIMES = { - "imap1": None, - "imap2": None, -} +# Local cache file for tracking processed emails +CACHE_FILE = os.path.join(settings.workdir, "processed_mails.json") + + +def load_processed_emails(): + """Load the list of already processed emails from a local JSON file.""" + if os.path.exists(CACHE_FILE): + try: + with open(CACHE_FILE, "r") as f: + return json.load(f) + except json.JSONDecodeError: + logger.warning("Failed to decode JSON, resetting processed emails cache.") + return {} + return {} + + +def save_processed_emails(processed_emails): + """Save the processed email IDs to a local JSON file.""" + with open(CACHE_FILE, "w") as f: + json.dump(processed_emails, f, indent=4) + + +def cleanup_old_entries(processed_emails): + """Remove entries older than 7 days from the cache to avoid infinite growth.""" + seven_days_ago = datetime.utcnow() - timedelta(days=7) + return { + msg_id: date + for msg_id, date in processed_emails.items() + if datetime.strptime(date, "%Y-%m-%dT%H:%M:%S") > seven_days_ago + } @shared_task def pull_all_inboxes(): """ - This task is run periodically (e.g., every minute). - It checks each IMAP config, sees if it's time to poll (based on poll interval), - and if so, fetches attachments. + Periodic task that checks all configured IMAP mailboxes + and fetches attachments from new emails. """ - logger.info("pull_all_inboxes - starting") + logger.info("Starting pull_all_inboxes") - # Check mailbox #1 + # Process Mailbox #1 check_and_pull_mailbox( mailbox_key="imap1", host=settings.imap1_host, @@ -41,11 +61,10 @@ def pull_all_inboxes(): username=settings.imap1_username, password=settings.imap1_password, use_ssl=settings.imap1_ssl, - poll_interval=settings.imap1_poll_interval_minutes, delete_after_process=settings.imap1_delete_after_process, ) - # Check mailbox #2 + # Process Mailbox #2 check_and_pull_mailbox( mailbox_key="imap2", host=settings.imap2_host, @@ -53,11 +72,10 @@ def pull_all_inboxes(): username=settings.imap2_username, password=settings.imap2_password, use_ssl=settings.imap2_ssl, - poll_interval=settings.imap2_poll_interval_minutes, delete_after_process=settings.imap2_delete_after_process, ) - logger.info("pull_all_inboxes - done") + logger.info("Finished pull_all_inboxes") def check_and_pull_mailbox( @@ -67,33 +85,23 @@ def check_and_pull_mailbox( username: str | None, password: str | None, use_ssl: bool, - poll_interval: int, delete_after_process: bool, ): - """Check if it's time to poll this mailbox. If so, do it.""" - # If not configured, skip + """Check and pull new emails from a given mailbox.""" if not (host and port and username and password): logger.warning(f"Mailbox {mailbox_key} is missing config, skipping.") return - # If we never polled before, we can do it immediately. - last_poll = LAST_POLL_TIMES.get(mailbox_key) - now = datetime.utcnow() - - if last_poll is None or now - last_poll >= timedelta(minutes=poll_interval): - logger.info(f"Time to poll mailbox {mailbox_key}!") - pull_inbox( - mailbox_key=mailbox_key, - host=host, - port=port, - username=username, - password=password, - use_ssl=use_ssl, - delete_after_process=delete_after_process - ) - LAST_POLL_TIMES[mailbox_key] = now - else: - logger.debug(f"Skipping {mailbox_key}, not due yet.") + logger.info(f"Checking mailbox: {mailbox_key}") + pull_inbox( + mailbox_key=mailbox_key, + host=host, + port=port, + username=username, + password=password, + use_ssl=use_ssl, + delete_after_process=delete_after_process, + ) def pull_inbox( @@ -106,10 +114,11 @@ def pull_inbox( delete_after_process: bool ): """ - Connect to the IMAP inbox, fetch UNSEEN messages, look for attachments, - and enqueue them to the pipeline. Then either delete or mark read. + Connects to the IMAP inbox, fetches new emails (last 7 days), + and processes attachments without marking emails as read. """ logger.info(f"Connecting to {mailbox_key} at {host}:{port} (SSL={use_ssl})") + processed_emails = load_processed_emails() try: if use_ssl: @@ -120,8 +129,9 @@ def pull_inbox( mail.login(username, password) mail.select("INBOX") - # fetch only unseen messages - status, search_data = mail.search(None, "(UNSEEN)") + # Fetch emails from the last 7 days + since_date = (datetime.utcnow() - timedelta(days=7)).strftime("%d-%b-%Y") + status, search_data = mail.search(None, f'(SINCE {since_date})') if status != "OK": logger.warning(f"Search failed on mailbox {mailbox_key}. Status={status}") mail.close() @@ -129,10 +139,9 @@ def pull_inbox( return msg_numbers = search_data[0].split() - logger.info(f"Found {len(msg_numbers)} new messages in {mailbox_key}.") + logger.info(f"Found {len(msg_numbers)} emails from the last 7 days in {mailbox_key}.") for num in msg_numbers: - # fetch the full RFC822 message status, msg_data = mail.fetch(num, "(RFC822)") if status != "OK": logger.warning(f"Failed to fetch message {num} in {mailbox_key}. Status={status}") @@ -140,17 +149,29 @@ def pull_inbox( raw_email = msg_data[0][1] email_message = email.message_from_bytes(raw_email) + msg_id = email_message.get("Message-ID") - # Extract attachments and enqueue - fetch_attachments_and_enqueue(email_message) + if not msg_id: + logger.warning(f"Skipping email without Message-ID in {mailbox_key}") + continue - # Mark read or delete - if delete_after_process: - logger.info(f"Deleting message {num.decode()} from {mailbox_key}") - mail.store(num, "+FLAGS", "\\Deleted") - else: - logger.info(f"Marking message {num.decode()} as seen in {mailbox_key}") - mail.store(num, "+FLAGS", "\\Seen") + # Skip if already processed + if msg_id in processed_emails: + logger.info(f"Skipping already processed email {msg_id} in {mailbox_key}") + continue + + # Process attachments + has_attachment = fetch_attachments_and_enqueue(email_message) + + # If an attachment was found, store in cache + if has_attachment: + processed_emails[msg_id] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + save_processed_emails(cleanup_old_entries(processed_emails)) + + # Delete email if configured + if delete_after_process: + logger.info(f"Deleting message {num.decode()} from {mailbox_key}") + mail.store(num, "+FLAGS", "\\Deleted") if delete_after_process: mail.expunge() @@ -165,39 +186,35 @@ def pull_inbox( def fetch_attachments_and_enqueue(email_message): """ - Iterate over parts of the email, extract PDF attachments, - save them to settings.workdir/tmp, and then enqueue - the pipeline with upload_to_s3.delay(). + Extracts PDF attachments from an email and enqueues them for processing. + + Returns: + bool: True if an attachment was found, False otherwise. """ + has_attachment = False + for part in email_message.walk(): content_type = part.get_content_type() filename = part.get_filename() if part.get_content_maintype() == 'multipart': - continue # skip container parts + continue # Skip container parts if not filename: - continue # skip if there's no filename + continue # Skip if there's no filename - # Example: only PDF attachments + # Process PDF attachments only if content_type == "application/pdf": logger.info(f"Found PDF attachment: {filename}") + has_attachment = True - # Create a path in tmp with the original filename + # Save to temporary directory file_path = os.path.join(settings.workdir, filename) - - # If a file with that name exists, we could rename it, but let's just overwrite - # or create a unique name. For simplicity: with open(file_path, "wb") as f: f.write(part.get_payload(decode=True)) logger.info(f"Saved attachment to {file_path}") - # Now enqueue the pipeline's first step: - # E.g.: upload_to_s3 -> process_with_textract -> refine_text_with_gpt, etc. + # Enqueue for upload upload_to_s3.delay(file_path) - # If you prefer to *only* do the PDF embedding steps, you'd queue that first, etc. - # Or if you want to do the entire pipeline that you do in /process/ endpoint, - # you can replicate that chain here. - - # End fetch_attachments_and_enqueue + return has_attachment