Skip to content

Commit

Permalink
Only process unread mail
Browse files Browse the repository at this point in the history
  • Loading branch information
christianlouis committed Feb 18, 2025
1 parent 3ebedbb commit 03ac92d
Showing 1 changed file with 11 additions and 31 deletions.
42 changes: 11 additions & 31 deletions app/tasks/imap_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import email
import imaplib
import logging
from datetime import datetime, timedelta

from datetime import datetime, timedelta, timezone
from celery import shared_task
from app.config import settings
from app.tasks.upload_to_s3 import upload_to_s3
Expand Down Expand Up @@ -107,17 +106,9 @@ def check_and_pull_mailbox(
)


def pull_inbox(
mailbox_key: str,
host: str,
port: int,
username: str,
password: str,
use_ssl: bool,
delete_after_process: bool
):
def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_after_process):
"""
Connects to the IMAP inbox, fetches new emails (last 3 days),
Connects to the IMAP inbox, fetches new unread emails from the last 3 days,
and processes attachments while preserving the original unread status.
"""
logger.info(f"Connecting to {mailbox_key} at {host}:{port} (SSL={use_ssl})")
Expand All @@ -132,27 +123,21 @@ def pull_inbox(
mail.login(username, password)
mail.select("INBOX")

# Fetch emails from the last 3 days
since_date = (datetime.utcnow() - timedelta(days=3)).strftime("%d-%b-%Y")
status, search_data = mail.search(None, f'(SINCE {since_date})')
# Fetch unread emails from the last 3 days (timezone-aware)
since_date = (datetime.now(timezone.utc) - timedelta(days=3)).strftime("%d-%b-%Y")
status, search_data = mail.search(None, f'(SINCE {since_date} UNSEEN)')

if status != "OK":
logger.warning(f"Search failed on mailbox {mailbox_key}. Status={status}")
mail.close()
mail.logout()
return

msg_numbers = search_data[0].split()
logger.info(f"Found {len(msg_numbers)} emails from the last 3 days in {mailbox_key}.")
logger.info(f"Found {len(msg_numbers)} unread emails from the last 3 days in {mailbox_key}.")

for num in msg_numbers:
# Check if email is unread
status, flag_data = mail.fetch(num, "(FLAGS)")
if status != "OK":
logger.warning(f"Failed to fetch flags for message {num} in {mailbox_key}. Status={status}")
continue

is_unread = b"\\Seen" not in flag_data[0] # Email was originally unread

# Process the email
status, msg_data = mail.fetch(num, "(RFC822)")
if status != "OK":
logger.warning(f"Failed to fetch message {num} in {mailbox_key}. Status={status}")
Expand All @@ -175,21 +160,16 @@ def pull_inbox(
has_attachment = fetch_attachments_and_enqueue(email_message)

# Store processed email in cache
processed_emails[msg_id] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
processed_emails[msg_id] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
save_processed_emails(cleanup_old_entries(processed_emails))

# Mark the email as processed (either by star or label)
if has_attachment:
mark_as_processed_with_label(mail, num, label="Ingested") # Or use mark_as_processed_with_star()

# Handle deletion or unread restoration
if delete_after_process:
logger.info(f"Deleting message {num.decode()} from {mailbox_key}")
mail.store(num, "+FLAGS", "\\Deleted")
else:
# Restore unread status if it was unread before processing
if is_unread:
mail.store(num, "-FLAGS", "\\Seen")
mail.store(num, "-FLAGS", "\\Seen")

if delete_after_process:
mail.expunge()
Expand Down

0 comments on commit 03ac92d

Please sign in to comment.