Skip to content

Commit

Permalink
Added locking for IMAP tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
christianlouis committed Feb 18, 2025
1 parent 03ac92d commit f234a55
Showing 1 changed file with 90 additions and 62 deletions.
152 changes: 90 additions & 62 deletions app/tasks/imap_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,51 @@
import email
import imaplib
import logging
import redis # Import Redis for task locking
from datetime import datetime, timedelta, timezone
from celery import shared_task
from app.config import settings
from app.tasks.upload_to_s3 import upload_to_s3

logger = logging.getLogger(__name__)

# Initialize Redis connection using Celery's Redis settings
redis_client = redis.StrictRedis.from_url(settings.redis_url, decode_responses=True)

LOCK_KEY = "imap_lock" # Unique key for locking
LOCK_EXPIRE = 300 # Lock expires in 5 minutes (300 seconds)

# Local cache file for tracking processed emails
CACHE_FILE = os.path.join(settings.workdir, "processed_mails.json")


def acquire_lock():
"""
Attempt to acquire a Redis-based lock. If acquired, set an expiration
time to prevent the lock from remaining stuck if the task crashes.
"""
lock_acquired = redis_client.setnx(LOCK_KEY, "locked")
if lock_acquired:
redis_client.expire(LOCK_KEY, LOCK_EXPIRE)
logger.info("Lock acquired for IMAP processing.")
return True
logger.warning("Lock already held. Skipping this cycle.")
return False


def release_lock():
"""Release the lock by deleting the Redis key."""
redis_client.delete(LOCK_KEY)
logger.info("Lock released.")


def load_processed_emails():
"""Load the list of already processed emails from a local JSON file."""
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, "r") as f:
processed_emails = json.load(f)
# Clean up emails older than 7 days
processed_emails = cleanup_old_entries(processed_emails)
processed_emails = cleanup_old_entries(processed_emails) # Remove old entries
return processed_emails
except json.JSONDecodeError:
logger.warning("Failed to decode JSON, resetting processed emails cache.")
Expand All @@ -39,7 +65,7 @@ def save_processed_emails(processed_emails):

def cleanup_old_entries(processed_emails):
"""Remove entries older than 7 days from the cache to avoid infinite growth."""
seven_days_ago = datetime.utcnow() - timedelta(days=7)
seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7)
return {
msg_id: date
for msg_id, date in processed_emails.items()
Expand All @@ -50,34 +76,44 @@ def cleanup_old_entries(processed_emails):
@shared_task
def pull_all_inboxes():
"""
Periodic task that checks all configured IMAP mailboxes
Periodic Celery task that checks all configured IMAP mailboxes
and fetches attachments from new emails.
Ensures only one instance runs at a time using Redis-based locking.
"""
logger.info("Starting pull_all_inboxes")

# Process Mailbox #1
check_and_pull_mailbox(
mailbox_key="imap1",
host=settings.imap1_host,
port=settings.imap1_port,
username=settings.imap1_username,
password=settings.imap1_password,
use_ssl=settings.imap1_ssl,
delete_after_process=settings.imap1_delete_after_process,
)

# Process Mailbox #2 (Gmail)
check_and_pull_mailbox(
mailbox_key="imap2",
host=settings.imap2_host,
port=settings.imap2_port,
username=settings.imap2_username,
password=settings.imap2_password,
use_ssl=settings.imap2_ssl,
delete_after_process=settings.imap2_delete_after_process,
)
if not acquire_lock():
logger.info("Skipping execution: Another instance is running.")
return

logger.info("Finished pull_all_inboxes")
try:
logger.info("Starting pull_all_inboxes")

# Mailbox #1
check_and_pull_mailbox(
mailbox_key="imap1",
host=settings.imap1_host,
port=settings.imap1_port,
username=settings.imap1_username,
password=settings.imap1_password,
use_ssl=settings.imap1_ssl,
delete_after_process=settings.imap1_delete_after_process,
)

# Mailbox #2 (Gmail)
check_and_pull_mailbox(
mailbox_key="imap2",
host=settings.imap2_host,
port=settings.imap2_port,
username=settings.imap2_username,
password=settings.imap2_password,
use_ssl=settings.imap2_ssl,
delete_after_process=settings.imap2_delete_after_process,
)

logger.info("Finished pull_all_inboxes")

finally:
# Ensure the lock is always released
release_lock()


def check_and_pull_mailbox(
Expand All @@ -89,7 +125,9 @@ def check_and_pull_mailbox(
use_ssl: bool,
delete_after_process: bool,
):
"""Check and pull new emails from a given mailbox."""
"""
Verifies mailbox configuration and pulls emails from the mailbox if valid.
"""
if not (host and port and username and password):
logger.warning(f"Mailbox {mailbox_key} is missing config, skipping.")
return
Expand All @@ -109,21 +147,17 @@ def check_and_pull_mailbox(
def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_after_process):
"""
Connects to the IMAP inbox, fetches new unread emails from the last 3 days,
and processes attachments while preserving the original unread status.
processes attachments, and handles flags/labels based on configuration.
"""
logger.info(f"Connecting to {mailbox_key} at {host}:{port} (SSL={use_ssl})")
processed_emails = load_processed_emails()

try:
if use_ssl:
mail = imaplib.IMAP4_SSL(host, port)
else:
mail = imaplib.IMAP4(host, port)

mail = imaplib.IMAP4_SSL(host, port) if use_ssl else imaplib.IMAP4(host, port)
mail.login(username, password)
mail.select("INBOX")

# Fetch unread emails from the last 3 days (timezone-aware)
# Fetch unread emails from the last 3 days
since_date = (datetime.now(timezone.utc) - timedelta(days=3)).strftime("%d-%b-%Y")
status, search_data = mail.search(None, f'(SINCE {since_date} UNSEEN)')

Expand All @@ -134,10 +168,9 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte
return

msg_numbers = search_data[0].split()
logger.info(f"Found {len(msg_numbers)} unread emails from the last 3 days in {mailbox_key}.")
logger.info(f"Found {len(msg_numbers)} unread emails in {mailbox_key}.")

for num in msg_numbers:
# Process the email
status, msg_data = mail.fetch(num, "(RFC822)")
if status != "OK":
logger.warning(f"Failed to fetch message {num} in {mailbox_key}. Status={status}")
Expand All @@ -159,16 +192,20 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte
# Process attachments
has_attachment = fetch_attachments_and_enqueue(email_message)

# Store processed email in cache
# If it's Gmail, mark processed with star/label
if "gmail" in host.lower():
mark_as_processed_with_star(mail, num)
mark_as_processed_with_label(mail, num, label="Ingested")

# Store processed email
processed_emails[msg_id] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
save_processed_emails(cleanup_old_entries(processed_emails))

# Handle deletion or unread restoration
# Delete or restore unread status
if delete_after_process:
logger.info(f"Deleting message {num.decode()} from {mailbox_key}")
mail.store(num, "+FLAGS", "\\Deleted")
else:
# Restore unread status if it was unread before processing
mail.store(num, "-FLAGS", "\\Seen")

if delete_after_process:
Expand All @@ -184,53 +221,44 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte

def fetch_attachments_and_enqueue(email_message):
"""
Extracts PDF attachments from an email and enqueues them for processing.
Extracts PDF attachments from the email and enqueues them for S3 upload.
Returns:
bool: True if an attachment was found, False otherwise.
"""
has_attachment = False

for part in email_message.walk():
content_type = part.get_content_type()
filename = part.get_filename()
if part.get_content_maintype() == "multipart":
continue

if part.get_content_maintype() == 'multipart':
continue # Skip container parts
if not filename:
continue # Skip if there's no filename

# Process PDF attachments only
if content_type == "application/pdf":
logger.info(f"Found PDF attachment: {filename}")
has_attachment = True
filename = part.get_filename()
content_type = part.get_content_type()

# Save to temporary directory
if filename and content_type == "application/pdf":
file_path = os.path.join(settings.workdir, filename)
with open(file_path, "wb") as f:
f.write(part.get_payload(decode=True))

logger.info(f"Saved attachment to {file_path}")

# Enqueue for upload
upload_to_s3.delay(file_path)
logger.info(f"Enqueued PDF attachment for upload: {filename}")
has_attachment = True

return has_attachment


def mark_as_processed_with_star(mail, msg_id):
"""Mark email as processed using a star in Gmail."""
try:
mail.store(msg_id, '+FLAGS', '\\Flagged') # This sets the "Starred" status for Gmail
logger.info(f"Email {msg_id} has been starred and marked as ingested.")
mail.store(msg_id, "+FLAGS", "\\Flagged")
logger.info(f"Email {msg_id} starred in Gmail.")
except Exception as e:
logger.error(f"Failed to mark email {msg_id} with star: {e}")


def mark_as_processed_with_label(mail, msg_id, label="Ingested"):
"""Mark email as processed by adding a custom label in Gmail."""
try:
mail.store(msg_id, '+X-GM-LABELS', label) # Add custom label 'Ingested'
logger.info(f"Email {msg_id} has been labeled with '{label}' and marked as ingested.")
mail.store(msg_id, "+X-GM-LABELS", label)
logger.info(f"Email {msg_id} labeled '{label}' in Gmail.")
except Exception as e:
logger.error(f"Failed to mark email {msg_id} with label {label}: {e}")

0 comments on commit f234a55

Please sign in to comment.