Skip to content

Commit

Permalink
fixed errors
Browse files Browse the repository at this point in the history
  • Loading branch information
christianlouis committed Feb 18, 2025
1 parent f234a55 commit 2241c1d
Showing 1 changed file with 53 additions and 24 deletions.
77 changes: 53 additions & 24 deletions app/tasks/imap_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import email
import imaplib
import logging
import redis # Import Redis for task locking
import redis
from datetime import datetime, timedelta, timezone
from celery import shared_task
from app.config import settings
Expand All @@ -24,10 +24,7 @@


def acquire_lock():
"""
Attempt to acquire a Redis-based lock. If acquired, set an expiration
time to prevent the lock from remaining stuck if the task crashes.
"""
"""Attempt to acquire a Redis-based lock. If acquired, set an expiration."""
lock_acquired = redis_client.setnx(LOCK_KEY, "locked")
if lock_acquired:
redis_client.expire(LOCK_KEY, LOCK_EXPIRE)
Expand Down Expand Up @@ -66,11 +63,16 @@ def save_processed_emails(processed_emails):
def cleanup_old_entries(processed_emails):
"""Remove entries older than 7 days from the cache to avoid infinite growth."""
seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7)
return {
msg_id: date
for msg_id, date in processed_emails.items()
if datetime.strptime(date, "%Y-%m-%dT%H:%M:%S") > seven_days_ago
}
valid_emails = {}

for msg_id, date_str in processed_emails.items():
naive_dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S")
aware_dt = naive_dt.replace(tzinfo=timezone.utc) # Make it UTC-aware

if aware_dt > seven_days_ago:
valid_emails[msg_id] = date_str

return valid_emails


@shared_task
Expand Down Expand Up @@ -125,9 +127,7 @@ def check_and_pull_mailbox(
use_ssl: bool,
delete_after_process: bool,
):
"""
Verifies mailbox configuration and pulls emails from the mailbox if valid.
"""
"""Validates config and invokes pulling from the mailbox if valid."""
if not (host and port and username and password):
logger.warning(f"Mailbox {mailbox_key} is missing config, skipping.")
return
Expand All @@ -147,7 +147,9 @@ def check_and_pull_mailbox(
def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_after_process):
"""
Connects to the IMAP inbox, fetches new unread emails from the last 3 days,
processes attachments, and handles flags/labels based on configuration.
and processes attachments while preserving the original unread status.
Gmail hosts also check for the 'Ingested' label to skip already-labeled emails,
then apply star + label to processed emails.
"""
logger.info(f"Connecting to {mailbox_key} at {host}:{port} (SSL={use_ssl})")
processed_emails = load_processed_emails()
Expand All @@ -157,7 +159,6 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte
mail.login(username, password)
mail.select("INBOX")

# Fetch unread emails from the last 3 days
since_date = (datetime.now(timezone.utc) - timedelta(days=3)).strftime("%d-%b-%Y")
status, search_data = mail.search(None, f'(SINCE {since_date} UNSEEN)')

Expand All @@ -184,28 +185,38 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte
logger.warning(f"Skipping email without Message-ID in {mailbox_key}")
continue

# Skip if already processed
# Skip if already processed (via local JSON cache)
if msg_id in processed_emails:
logger.info(f"Skipping already processed email {msg_id} in {mailbox_key}")
continue

# For Gmail only, check if already labeled "Ingested"
# to avoid reprocessing messages that have the label.
is_gmail_host = "gmail" in host.lower()
if is_gmail_host:
if email_already_has_label(mail, num, "Ingested"):
logger.info(f"Skipping email {msg_id} in {mailbox_key}, already has 'Ingested' label.")
continue

# Process attachments
has_attachment = fetch_attachments_and_enqueue(email_message)

# If it's Gmail, mark processed with star/label
if "gmail" in host.lower():
if is_gmail_host:
mark_as_processed_with_star(mail, num)
mark_as_processed_with_label(mail, num, label="Ingested")

# Store processed email
# Record that we processed this email
processed_emails[msg_id] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
save_processed_emails(cleanup_old_entries(processed_emails))
save_processed_emails(processed_emails)

# Delete or restore unread status
if delete_after_process:
logger.info(f"Deleting message {num.decode()} from {mailbox_key}")
mail.store(num, "+FLAGS", "\\Deleted")
else:
# Mark as read if you want to.
# Currently, it just resets the \Seen flag if it was unread.
mail.store(num, "-FLAGS", "\\Seen")

if delete_after_process:
Expand All @@ -221,9 +232,8 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte

def fetch_attachments_and_enqueue(email_message):
"""
Extracts PDF attachments from the email and enqueues them for S3 upload.
Returns:
bool: True if an attachment was found, False otherwise.
Extracts PDF attachments and enqueues them for upload to S3.
Returns True if at least one PDF attachment was found.
"""
has_attachment = False

Expand All @@ -246,8 +256,27 @@ def fetch_attachments_and_enqueue(email_message):
return has_attachment


def email_already_has_label(mail, msg_id, label="Ingested"):
"""
Check if the given message (msg_id) already has the specified label in Gmail.
Returns True if the label is found, False otherwise.
"""
try:
label_status, label_data = mail.fetch(msg_id, "(X-GM-LABELS)")
if label_status == "OK" and label_data and len(label_data) > 0:
# label_data[0] = (b'num (X-GM-LABELS (\\Inbox \\Unread "Ingested"))', ...)
raw_labels = label_data[0][1].decode("utf-8", errors="ignore")
# Check if the label string is present in the raw label data
if label in raw_labels:
return True
except Exception as e:
logger.error(f"Failed to fetch labels for msg_id={msg_id}: {e}")

return False


def mark_as_processed_with_star(mail, msg_id):
"""Mark email as processed using a star in Gmail."""
"""Star the email in Gmail."""
try:
mail.store(msg_id, "+FLAGS", "\\Flagged")
logger.info(f"Email {msg_id} starred in Gmail.")
Expand All @@ -256,7 +285,7 @@ def mark_as_processed_with_star(mail, msg_id):


def mark_as_processed_with_label(mail, msg_id, label="Ingested"):
"""Mark email as processed by adding a custom label in Gmail."""
"""Add a custom label to the email in Gmail."""
try:
mail.store(msg_id, "+X-GM-LABELS", label)
logger.info(f"Email {msg_id} labeled '{label}' in Gmail.")
Expand Down

0 comments on commit 2241c1d

Please sign in to comment.