From 03ac92d931081956e9de97dc8c6830810cdc466c Mon Sep 17 00:00:00 2001 From: Christian Krakau-Louis Date: Tue, 18 Feb 2025 03:55:29 +0100 Subject: [PATCH] Only process unread mail --- app/tasks/imap_tasks.py | 42 +++++++++++------------------------------ 1 file changed, 11 insertions(+), 31 deletions(-) diff --git a/app/tasks/imap_tasks.py b/app/tasks/imap_tasks.py index f4712ff..f8dfb93 100644 --- a/app/tasks/imap_tasks.py +++ b/app/tasks/imap_tasks.py @@ -5,8 +5,7 @@ import email import imaplib import logging -from datetime import datetime, timedelta - +from datetime import datetime, timedelta, timezone from celery import shared_task from app.config import settings from app.tasks.upload_to_s3 import upload_to_s3 @@ -107,17 +106,9 @@ def check_and_pull_mailbox( ) -def pull_inbox( - mailbox_key: str, - host: str, - port: int, - username: str, - password: str, - use_ssl: bool, - delete_after_process: bool -): +def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_after_process): """ - Connects to the IMAP inbox, fetches new emails (last 3 days), + Connects to the IMAP inbox, fetches new unread emails from the last 3 days, and processes attachments while preserving the original unread status. """ logger.info(f"Connecting to {mailbox_key} at {host}:{port} (SSL={use_ssl})") @@ -132,9 +123,10 @@ def pull_inbox( mail.login(username, password) mail.select("INBOX") - # Fetch emails from the last 3 days - since_date = (datetime.utcnow() - timedelta(days=3)).strftime("%d-%b-%Y") - status, search_data = mail.search(None, f'(SINCE {since_date})') + # Fetch unread emails from the last 3 days (timezone-aware) + since_date = (datetime.now(timezone.utc) - timedelta(days=3)).strftime("%d-%b-%Y") + status, search_data = mail.search(None, f'(SINCE {since_date} UNSEEN)') + if status != "OK": logger.warning(f"Search failed on mailbox {mailbox_key}. Status={status}") mail.close() @@ -142,17 +134,10 @@ def pull_inbox( return msg_numbers = search_data[0].split() - logger.info(f"Found {len(msg_numbers)} emails from the last 3 days in {mailbox_key}.") + logger.info(f"Found {len(msg_numbers)} unread emails from the last 3 days in {mailbox_key}.") for num in msg_numbers: - # Check if email is unread - status, flag_data = mail.fetch(num, "(FLAGS)") - if status != "OK": - logger.warning(f"Failed to fetch flags for message {num} in {mailbox_key}. Status={status}") - continue - - is_unread = b"\\Seen" not in flag_data[0] # Email was originally unread - + # Process the email status, msg_data = mail.fetch(num, "(RFC822)") if status != "OK": logger.warning(f"Failed to fetch message {num} in {mailbox_key}. Status={status}") @@ -175,21 +160,16 @@ def pull_inbox( has_attachment = fetch_attachments_and_enqueue(email_message) # Store processed email in cache - processed_emails[msg_id] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + processed_emails[msg_id] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") save_processed_emails(cleanup_old_entries(processed_emails)) - # Mark the email as processed (either by star or label) - if has_attachment: - mark_as_processed_with_label(mail, num, label="Ingested") # Or use mark_as_processed_with_star() - # Handle deletion or unread restoration if delete_after_process: logger.info(f"Deleting message {num.decode()} from {mailbox_key}") mail.store(num, "+FLAGS", "\\Deleted") else: # Restore unread status if it was unread before processing - if is_unread: - mail.store(num, "-FLAGS", "\\Seen") + mail.store(num, "-FLAGS", "\\Seen") if delete_after_process: mail.expunge()