From f234a55b52b465e92a24d25e023d96e5e4aa5aec Mon Sep 17 00:00:00 2001 From: Christian Krakau-Louis Date: Tue, 18 Feb 2025 04:29:33 +0100 Subject: [PATCH] Added locking for IMAP tasks --- app/tasks/imap_tasks.py | 152 ++++++++++++++++++++++++---------------- 1 file changed, 90 insertions(+), 62 deletions(-) diff --git a/app/tasks/imap_tasks.py b/app/tasks/imap_tasks.py index f8dfb93..6d375eb 100644 --- a/app/tasks/imap_tasks.py +++ b/app/tasks/imap_tasks.py @@ -5,6 +5,7 @@ import email import imaplib import logging +import redis # Import Redis for task locking from datetime import datetime, timedelta, timezone from celery import shared_task from app.config import settings @@ -12,18 +13,43 @@ logger = logging.getLogger(__name__) +# Initialize Redis connection using Celery's Redis settings +redis_client = redis.StrictRedis.from_url(settings.redis_url, decode_responses=True) + +LOCK_KEY = "imap_lock" # Unique key for locking +LOCK_EXPIRE = 300 # Lock expires in 5 minutes (300 seconds) + # Local cache file for tracking processed emails CACHE_FILE = os.path.join(settings.workdir, "processed_mails.json") +def acquire_lock(): + """ + Attempt to acquire a Redis-based lock. If acquired, set an expiration + time to prevent the lock from remaining stuck if the task crashes. + """ + lock_acquired = redis_client.setnx(LOCK_KEY, "locked") + if lock_acquired: + redis_client.expire(LOCK_KEY, LOCK_EXPIRE) + logger.info("Lock acquired for IMAP processing.") + return True + logger.warning("Lock already held. Skipping this cycle.") + return False + + +def release_lock(): + """Release the lock by deleting the Redis key.""" + redis_client.delete(LOCK_KEY) + logger.info("Lock released.") + + def load_processed_emails(): """Load the list of already processed emails from a local JSON file.""" if os.path.exists(CACHE_FILE): try: with open(CACHE_FILE, "r") as f: processed_emails = json.load(f) - # Clean up emails older than 7 days - processed_emails = cleanup_old_entries(processed_emails) + processed_emails = cleanup_old_entries(processed_emails) # Remove old entries return processed_emails except json.JSONDecodeError: logger.warning("Failed to decode JSON, resetting processed emails cache.") @@ -39,7 +65,7 @@ def save_processed_emails(processed_emails): def cleanup_old_entries(processed_emails): """Remove entries older than 7 days from the cache to avoid infinite growth.""" - seven_days_ago = datetime.utcnow() - timedelta(days=7) + seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7) return { msg_id: date for msg_id, date in processed_emails.items() @@ -50,34 +76,44 @@ def cleanup_old_entries(processed_emails): @shared_task def pull_all_inboxes(): """ - Periodic task that checks all configured IMAP mailboxes + Periodic Celery task that checks all configured IMAP mailboxes and fetches attachments from new emails. + Ensures only one instance runs at a time using Redis-based locking. """ - logger.info("Starting pull_all_inboxes") - - # Process Mailbox #1 - check_and_pull_mailbox( - mailbox_key="imap1", - host=settings.imap1_host, - port=settings.imap1_port, - username=settings.imap1_username, - password=settings.imap1_password, - use_ssl=settings.imap1_ssl, - delete_after_process=settings.imap1_delete_after_process, - ) - - # Process Mailbox #2 (Gmail) - check_and_pull_mailbox( - mailbox_key="imap2", - host=settings.imap2_host, - port=settings.imap2_port, - username=settings.imap2_username, - password=settings.imap2_password, - use_ssl=settings.imap2_ssl, - delete_after_process=settings.imap2_delete_after_process, - ) + if not acquire_lock(): + logger.info("Skipping execution: Another instance is running.") + return - logger.info("Finished pull_all_inboxes") + try: + logger.info("Starting pull_all_inboxes") + + # Mailbox #1 + check_and_pull_mailbox( + mailbox_key="imap1", + host=settings.imap1_host, + port=settings.imap1_port, + username=settings.imap1_username, + password=settings.imap1_password, + use_ssl=settings.imap1_ssl, + delete_after_process=settings.imap1_delete_after_process, + ) + + # Mailbox #2 (Gmail) + check_and_pull_mailbox( + mailbox_key="imap2", + host=settings.imap2_host, + port=settings.imap2_port, + username=settings.imap2_username, + password=settings.imap2_password, + use_ssl=settings.imap2_ssl, + delete_after_process=settings.imap2_delete_after_process, + ) + + logger.info("Finished pull_all_inboxes") + + finally: + # Ensure the lock is always released + release_lock() def check_and_pull_mailbox( @@ -89,7 +125,9 @@ def check_and_pull_mailbox( use_ssl: bool, delete_after_process: bool, ): - """Check and pull new emails from a given mailbox.""" + """ + Verifies mailbox configuration and pulls emails from the mailbox if valid. + """ if not (host and port and username and password): logger.warning(f"Mailbox {mailbox_key} is missing config, skipping.") return @@ -109,21 +147,17 @@ def check_and_pull_mailbox( def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_after_process): """ Connects to the IMAP inbox, fetches new unread emails from the last 3 days, - and processes attachments while preserving the original unread status. + processes attachments, and handles flags/labels based on configuration. """ logger.info(f"Connecting to {mailbox_key} at {host}:{port} (SSL={use_ssl})") processed_emails = load_processed_emails() try: - if use_ssl: - mail = imaplib.IMAP4_SSL(host, port) - else: - mail = imaplib.IMAP4(host, port) - + mail = imaplib.IMAP4_SSL(host, port) if use_ssl else imaplib.IMAP4(host, port) mail.login(username, password) mail.select("INBOX") - # Fetch unread emails from the last 3 days (timezone-aware) + # Fetch unread emails from the last 3 days since_date = (datetime.now(timezone.utc) - timedelta(days=3)).strftime("%d-%b-%Y") status, search_data = mail.search(None, f'(SINCE {since_date} UNSEEN)') @@ -134,10 +168,9 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte return msg_numbers = search_data[0].split() - logger.info(f"Found {len(msg_numbers)} unread emails from the last 3 days in {mailbox_key}.") + logger.info(f"Found {len(msg_numbers)} unread emails in {mailbox_key}.") for num in msg_numbers: - # Process the email status, msg_data = mail.fetch(num, "(RFC822)") if status != "OK": logger.warning(f"Failed to fetch message {num} in {mailbox_key}. Status={status}") @@ -159,16 +192,20 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte # Process attachments has_attachment = fetch_attachments_and_enqueue(email_message) - # Store processed email in cache + # If it's Gmail, mark processed with star/label + if "gmail" in host.lower(): + mark_as_processed_with_star(mail, num) + mark_as_processed_with_label(mail, num, label="Ingested") + + # Store processed email processed_emails[msg_id] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") save_processed_emails(cleanup_old_entries(processed_emails)) - # Handle deletion or unread restoration + # Delete or restore unread status if delete_after_process: logger.info(f"Deleting message {num.decode()} from {mailbox_key}") mail.store(num, "+FLAGS", "\\Deleted") else: - # Restore unread status if it was unread before processing mail.store(num, "-FLAGS", "\\Seen") if delete_after_process: @@ -184,36 +221,27 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte def fetch_attachments_and_enqueue(email_message): """ - Extracts PDF attachments from an email and enqueues them for processing. - + Extracts PDF attachments from the email and enqueues them for S3 upload. Returns: bool: True if an attachment was found, False otherwise. """ has_attachment = False for part in email_message.walk(): - content_type = part.get_content_type() - filename = part.get_filename() + if part.get_content_maintype() == "multipart": + continue - if part.get_content_maintype() == 'multipart': - continue # Skip container parts - if not filename: - continue # Skip if there's no filename - - # Process PDF attachments only - if content_type == "application/pdf": - logger.info(f"Found PDF attachment: {filename}") - has_attachment = True + filename = part.get_filename() + content_type = part.get_content_type() - # Save to temporary directory + if filename and content_type == "application/pdf": file_path = os.path.join(settings.workdir, filename) with open(file_path, "wb") as f: f.write(part.get_payload(decode=True)) - logger.info(f"Saved attachment to {file_path}") - - # Enqueue for upload upload_to_s3.delay(file_path) + logger.info(f"Enqueued PDF attachment for upload: {filename}") + has_attachment = True return has_attachment @@ -221,8 +249,8 @@ def fetch_attachments_and_enqueue(email_message): def mark_as_processed_with_star(mail, msg_id): """Mark email as processed using a star in Gmail.""" try: - mail.store(msg_id, '+FLAGS', '\\Flagged') # This sets the "Starred" status for Gmail - logger.info(f"Email {msg_id} has been starred and marked as ingested.") + mail.store(msg_id, "+FLAGS", "\\Flagged") + logger.info(f"Email {msg_id} starred in Gmail.") except Exception as e: logger.error(f"Failed to mark email {msg_id} with star: {e}") @@ -230,7 +258,7 @@ def mark_as_processed_with_star(mail, msg_id): def mark_as_processed_with_label(mail, msg_id, label="Ingested"): """Mark email as processed by adding a custom label in Gmail.""" try: - mail.store(msg_id, '+X-GM-LABELS', label) # Add custom label 'Ingested' - logger.info(f"Email {msg_id} has been labeled with '{label}' and marked as ingested.") + mail.store(msg_id, "+X-GM-LABELS", label) + logger.info(f"Email {msg_id} labeled '{label}' in Gmail.") except Exception as e: logger.error(f"Failed to mark email {msg_id} with label {label}: {e}")