diff --git a/app/tasks/imap_tasks.py b/app/tasks/imap_tasks.py index 6d375eb..578febe 100644 --- a/app/tasks/imap_tasks.py +++ b/app/tasks/imap_tasks.py @@ -5,7 +5,7 @@ import email import imaplib import logging -import redis # Import Redis for task locking +import redis from datetime import datetime, timedelta, timezone from celery import shared_task from app.config import settings @@ -24,10 +24,7 @@ def acquire_lock(): - """ - Attempt to acquire a Redis-based lock. If acquired, set an expiration - time to prevent the lock from remaining stuck if the task crashes. - """ + """Attempt to acquire a Redis-based lock. If acquired, set an expiration.""" lock_acquired = redis_client.setnx(LOCK_KEY, "locked") if lock_acquired: redis_client.expire(LOCK_KEY, LOCK_EXPIRE) @@ -66,11 +63,16 @@ def save_processed_emails(processed_emails): def cleanup_old_entries(processed_emails): """Remove entries older than 7 days from the cache to avoid infinite growth.""" seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7) - return { - msg_id: date - for msg_id, date in processed_emails.items() - if datetime.strptime(date, "%Y-%m-%dT%H:%M:%S") > seven_days_ago - } + valid_emails = {} + + for msg_id, date_str in processed_emails.items(): + naive_dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S") + aware_dt = naive_dt.replace(tzinfo=timezone.utc) # Make it UTC-aware + + if aware_dt > seven_days_ago: + valid_emails[msg_id] = date_str + + return valid_emails @shared_task @@ -125,9 +127,7 @@ def check_and_pull_mailbox( use_ssl: bool, delete_after_process: bool, ): - """ - Verifies mailbox configuration and pulls emails from the mailbox if valid. - """ + """Validates config and invokes pulling from the mailbox if valid.""" if not (host and port and username and password): logger.warning(f"Mailbox {mailbox_key} is missing config, skipping.") return @@ -147,7 +147,9 @@ def check_and_pull_mailbox( def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_after_process): """ Connects to the IMAP inbox, fetches new unread emails from the last 3 days, - processes attachments, and handles flags/labels based on configuration. + and processes attachments while preserving the original unread status. + Gmail hosts also check for the 'Ingested' label to skip already-labeled emails, + then apply star + label to processed emails. """ logger.info(f"Connecting to {mailbox_key} at {host}:{port} (SSL={use_ssl})") processed_emails = load_processed_emails() @@ -157,7 +159,6 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte mail.login(username, password) mail.select("INBOX") - # Fetch unread emails from the last 3 days since_date = (datetime.now(timezone.utc) - timedelta(days=3)).strftime("%d-%b-%Y") status, search_data = mail.search(None, f'(SINCE {since_date} UNSEEN)') @@ -184,28 +185,38 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte logger.warning(f"Skipping email without Message-ID in {mailbox_key}") continue - # Skip if already processed + # Skip if already processed (via local JSON cache) if msg_id in processed_emails: logger.info(f"Skipping already processed email {msg_id} in {mailbox_key}") continue + # For Gmail only, check if already labeled "Ingested" + # to avoid reprocessing messages that have the label. + is_gmail_host = "gmail" in host.lower() + if is_gmail_host: + if email_already_has_label(mail, num, "Ingested"): + logger.info(f"Skipping email {msg_id} in {mailbox_key}, already has 'Ingested' label.") + continue + # Process attachments has_attachment = fetch_attachments_and_enqueue(email_message) # If it's Gmail, mark processed with star/label - if "gmail" in host.lower(): + if is_gmail_host: mark_as_processed_with_star(mail, num) mark_as_processed_with_label(mail, num, label="Ingested") - # Store processed email + # Record that we processed this email processed_emails[msg_id] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") - save_processed_emails(cleanup_old_entries(processed_emails)) + save_processed_emails(processed_emails) # Delete or restore unread status if delete_after_process: logger.info(f"Deleting message {num.decode()} from {mailbox_key}") mail.store(num, "+FLAGS", "\\Deleted") else: + # Mark as read if you want to. + # Currently, it just resets the \Seen flag if it was unread. mail.store(num, "-FLAGS", "\\Seen") if delete_after_process: @@ -221,9 +232,8 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte def fetch_attachments_and_enqueue(email_message): """ - Extracts PDF attachments from the email and enqueues them for S3 upload. - Returns: - bool: True if an attachment was found, False otherwise. + Extracts PDF attachments and enqueues them for upload to S3. + Returns True if at least one PDF attachment was found. """ has_attachment = False @@ -246,8 +256,27 @@ def fetch_attachments_and_enqueue(email_message): return has_attachment +def email_already_has_label(mail, msg_id, label="Ingested"): + """ + Check if the given message (msg_id) already has the specified label in Gmail. + Returns True if the label is found, False otherwise. + """ + try: + label_status, label_data = mail.fetch(msg_id, "(X-GM-LABELS)") + if label_status == "OK" and label_data and len(label_data) > 0: + # label_data[0] = (b'num (X-GM-LABELS (\\Inbox \\Unread "Ingested"))', ...) + raw_labels = label_data[0][1].decode("utf-8", errors="ignore") + # Check if the label string is present in the raw label data + if label in raw_labels: + return True + except Exception as e: + logger.error(f"Failed to fetch labels for msg_id={msg_id}: {e}") + + return False + + def mark_as_processed_with_star(mail, msg_id): - """Mark email as processed using a star in Gmail.""" + """Star the email in Gmail.""" try: mail.store(msg_id, "+FLAGS", "\\Flagged") logger.info(f"Email {msg_id} starred in Gmail.") @@ -256,7 +285,7 @@ def mark_as_processed_with_star(mail, msg_id): def mark_as_processed_with_label(mail, msg_id, label="Ingested"): - """Mark email as processed by adding a custom label in Gmail.""" + """Add a custom label to the email in Gmail.""" try: mail.store(msg_id, "+X-GM-LABELS", label) logger.info(f"Email {msg_id} labeled '{label}' in Gmail.")