diff --git a/.env.demo b/.env.demo index ed072d7..fb0f0bc 100644 --- a/.env.demo +++ b/.env.demo @@ -42,4 +42,6 @@ IMAP2_HOST=imap.gmail.com IMAP2_PORT=993 IMAP2_SSL=true IMAP2_POLL_INTERVAL_MINUTES=10 -IMAP2_DELETE_AFTER_PROCESS=false \ No newline at end of file +IMAP2_DELETE_AFTER_PROCESS=false + +GOTENBERG_URL=http://gotenberg:3000 \ No newline at end of file diff --git a/app/celery_worker.py b/app/celery_worker.py index 9784b81..a309798 100644 --- a/app/celery_worker.py +++ b/app/celery_worker.py @@ -14,6 +14,7 @@ from app.tasks.refine_text_with_gpt import refine_text_with_gpt from app.tasks.extract_metadata_with_gpt import extract_metadata_with_gpt from app.tasks.embed_metadata_into_pdf import embed_metadata_into_pdf +from app.tasks.convert_to_pdf import convert_to_pdf # Import new send tasks from app.tasks.upload_to_dropbox import upload_to_dropbox diff --git a/app/config.py b/app/config.py index 146a757..8816b7d 100644 --- a/app/config.py +++ b/app/config.py @@ -27,6 +27,7 @@ class Settings(BaseSettings): azure_ai_key: str azure_region: str azure_endpoint: str + gotenberg_url: str # IMAP 1 imap1_host: Optional[str] = None diff --git a/app/tasks/convert_to_pdf.py b/app/tasks/convert_to_pdf.py new file mode 100644 index 0000000..7f15106 --- /dev/null +++ b/app/tasks/convert_to_pdf.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +import os +import requests +import logging +from celery import shared_task +from app.config import settings +from app.tasks.upload_to_s3 import upload_to_s3 + +logger = logging.getLogger(__name__) + +@shared_task +def convert_to_pdf(file_path): + """ + Converts a file to PDF using Gotenberg's API. + On success, saves the PDF locally and enqueues it for S3 upload. + """ + # Ensure that settings contain the Gotenberg URL (e.g., "http://gotenberg:3000") + gotenberg_url = getattr(settings, "gotenberg_url", None) + if not gotenberg_url: + logger.error("Gotenberg URL is not configured in settings.") + return + + try: + with open(file_path, "rb") as f: + files = {"file": f} + # Adjust the endpoint path if needed. + response = requests.post(f"{gotenberg_url}/convert", files=files) + if response.status_code == 200: + converted_file_path = os.path.splitext(file_path)[0] + ".pdf" + with open(converted_file_path, "wb") as f: + f.write(response.content) + logger.info(f"Converted file saved as PDF: {converted_file_path}") + # Enqueue the upload of the converted PDF. + upload_to_s3.delay(converted_file_path) + return converted_file_path + else: + logger.error(f"Conversion failed for {file_path}. Status code: {response.status_code}") + except Exception as e: + logger.exception(f"Error converting {file_path} to PDF: {e}") diff --git a/app/tasks/imap_tasks.py b/app/tasks/imap_tasks.py index 578febe..2e6745b 100644 --- a/app/tasks/imap_tasks.py +++ b/app/tasks/imap_tasks.py @@ -1,15 +1,16 @@ #!/usr/bin/env python3 - import os import json import email import imaplib import logging import redis +import re from datetime import datetime, timedelta, timezone from celery import shared_task from app.config import settings from app.tasks.upload_to_s3 import upload_to_s3 +from app.tasks.convert_to_pdf import convert_to_pdf # new conversion task logger = logging.getLogger(__name__) @@ -17,7 +18,7 @@ redis_client = redis.StrictRedis.from_url(settings.redis_url, decode_responses=True) LOCK_KEY = "imap_lock" # Unique key for locking -LOCK_EXPIRE = 300 # Lock expires in 5 minutes (300 seconds) +LOCK_EXPIRE = 300 # Lock expires in 5 minutes # Local cache file for tracking processed emails CACHE_FILE = os.path.join(settings.workdir, "processed_mails.json") @@ -46,7 +47,7 @@ def load_processed_emails(): try: with open(CACHE_FILE, "r") as f: processed_emails = json.load(f) - processed_emails = cleanup_old_entries(processed_emails) # Remove old entries + processed_emails = cleanup_old_entries(processed_emails) return processed_emails except json.JSONDecodeError: logger.warning("Failed to decode JSON, resetting processed emails cache.") @@ -64,14 +65,11 @@ def cleanup_old_entries(processed_emails): """Remove entries older than 7 days from the cache to avoid infinite growth.""" seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7) valid_emails = {} - for msg_id, date_str in processed_emails.items(): naive_dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S") - aware_dt = naive_dt.replace(tzinfo=timezone.utc) # Make it UTC-aware - + aware_dt = naive_dt.replace(tzinfo=timezone.utc) if aware_dt > seven_days_ago: valid_emails[msg_id] = date_str - return valid_emails @@ -89,7 +87,7 @@ def pull_all_inboxes(): try: logger.info("Starting pull_all_inboxes") - # Mailbox #1 + # Mailbox #1 (non-Gmail) check_and_pull_mailbox( mailbox_key="imap1", host=settings.imap1_host, @@ -114,7 +112,6 @@ def pull_all_inboxes(): logger.info("Finished pull_all_inboxes") finally: - # Ensure the lock is always released release_lock() @@ -148,8 +145,12 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte """ Connects to the IMAP inbox, fetches new unread emails from the last 3 days, and processes attachments while preserving the original unread status. - Gmail hosts also check for the 'Ingested' label to skip already-labeled emails, - then apply star + label to processed emails. + + For Gmail: + - Attempts to select the localized All Mail folder. + - Runs an X-GM-RAW query: "in:anywhere in:unread newer_than:3d has:attachment". + + For non-Gmail mailboxes, it falls back to selecting the INBOX with a SINCE/UNSEEN filter. """ logger.info(f"Connecting to {mailbox_key} at {host}:{port} (SSL={use_ssl})") processed_emails = load_processed_emails() @@ -157,10 +158,25 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte try: mail = imaplib.IMAP4_SSL(host, port) if use_ssl else imaplib.IMAP4(host, port) mail.login(username, password) - mail.select("INBOX") - since_date = (datetime.now(timezone.utc) - timedelta(days=3)).strftime("%d-%b-%Y") - status, search_data = mail.search(None, f'(SINCE {since_date} UNSEEN)') + is_gmail_host = "gmail" in host.lower() + if is_gmail_host: + # For Gmail, try to select the localized All Mail folder. + all_mail_folder = find_all_mail_folder(mail) + if all_mail_folder: + logger.info(f"Using Gmail All Mail folder: {all_mail_folder}") + mail.select(f'"{all_mail_folder}"') + else: + logger.warning("Gmail All Mail folder not found, falling back to INBOX.") + mail.select("INBOX") + # Use the X-GM-RAW query for Gmail. + raw_query = "in:anywhere in:unread newer_than:3d has:attachment" + status, search_data = mail.search(None, "X-GM-RAW", f'"{raw_query}"') + else: + # For non-Gmail, select INBOX and use SINCE/UNSEEN query. + mail.select("INBOX") + since_date = (datetime.now(timezone.utc) - timedelta(days=3)).strftime("%d-%b-%Y") + status, search_data = mail.search(None, f'(SINCE {since_date} UNSEEN)') if status != "OK": logger.warning(f"Search failed on mailbox {mailbox_key}. Status={status}") @@ -185,38 +201,30 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte logger.warning(f"Skipping email without Message-ID in {mailbox_key}") continue - # Skip if already processed (via local JSON cache) if msg_id in processed_emails: logger.info(f"Skipping already processed email {msg_id} in {mailbox_key}") continue - # For Gmail only, check if already labeled "Ingested" - # to avoid reprocessing messages that have the label. - is_gmail_host = "gmail" in host.lower() + # For Gmail, check if the email already has the "Ingested" label. if is_gmail_host: if email_already_has_label(mail, num, "Ingested"): - logger.info(f"Skipping email {msg_id} in {mailbox_key}, already has 'Ingested' label.") + logger.info(f"Skipping email {msg_id} in {mailbox_key}, already labeled 'Ingested'.") continue - # Process attachments + # Process attachments (and convert non-PDF files) has_attachment = fetch_attachments_and_enqueue(email_message) - # If it's Gmail, mark processed with star/label if is_gmail_host: mark_as_processed_with_star(mail, num) mark_as_processed_with_label(mail, num, label="Ingested") - # Record that we processed this email processed_emails[msg_id] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") save_processed_emails(processed_emails) - # Delete or restore unread status if delete_after_process: logger.info(f"Deleting message {num.decode()} from {mailbox_key}") mail.store(num, "+FLAGS", "\\Deleted") else: - # Mark as read if you want to. - # Currently, it just resets the \Seen flag if it was unread. mail.store(num, "-FLAGS", "\\Seen") if delete_after_process: @@ -232,62 +240,128 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte def fetch_attachments_and_enqueue(email_message): """ - Extracts PDF attachments and enqueues them for upload to S3. - Returns True if at least one PDF attachment was found. + Extracts attachments from the email. + + - If the attachment is a PDF (MIME type "application/pdf"), it is enqueued for upload. + - For any other attachment, a conversion task is enqueued that converts it to PDF. + + Returns True if at least one attachment was processed. """ has_attachment = False - for part in email_message.walk(): if part.get_content_maintype() == "multipart": continue filename = part.get_filename() - content_type = part.get_content_type() + if not filename: + continue - if filename and content_type == "application/pdf": - file_path = os.path.join(settings.workdir, filename) - with open(file_path, "wb") as f: - f.write(part.get_payload(decode=True)) + file_path = os.path.join(settings.workdir, filename) + with open(file_path, "wb") as f: + f.write(part.get_payload(decode=True)) + if part.get_content_type() == "application/pdf": upload_to_s3.delay(file_path) - logger.info(f"Enqueued PDF attachment for upload: {filename}") - has_attachment = True + logger.info(f"Enqueued PDF for upload: {filename}") + else: + # Enqueue conversion to PDF using the Gotenberg service. + convert_to_pdf.delay(file_path) + logger.info(f"Enqueued file for conversion to PDF: {filename}") + has_attachment = True return has_attachment def email_already_has_label(mail, msg_id, label="Ingested"): """ - Check if the given message (msg_id) already has the specified label in Gmail. + Checks if the given message (msg_id) has the specified Gmail label. Returns True if the label is found, False otherwise. """ try: label_status, label_data = mail.fetch(msg_id, "(X-GM-LABELS)") if label_status == "OK" and label_data and len(label_data) > 0: - # label_data[0] = (b'num (X-GM-LABELS (\\Inbox \\Unread "Ingested"))', ...) raw_labels = label_data[0][1].decode("utf-8", errors="ignore") - # Check if the label string is present in the raw label data if label in raw_labels: return True except Exception as e: logger.error(f"Failed to fetch labels for msg_id={msg_id}: {e}") - return False def mark_as_processed_with_star(mail, msg_id): - """Star the email in Gmail.""" + """Stars the email in Gmail.""" try: mail.store(msg_id, "+FLAGS", "\\Flagged") logger.info(f"Email {msg_id} starred in Gmail.") except Exception as e: - logger.error(f"Failed to mark email {msg_id} with star: {e}") + logger.error(f"Failed to star email {msg_id}: {e}") def mark_as_processed_with_label(mail, msg_id, label="Ingested"): - """Add a custom label to the email in Gmail.""" + """Adds a custom label to the email in Gmail.""" try: mail.store(msg_id, "+X-GM-LABELS", label) logger.info(f"Email {msg_id} labeled '{label}' in Gmail.") except Exception as e: - logger.error(f"Failed to mark email {msg_id} with label {label}: {e}") + logger.error(f"Failed to label email {msg_id} with {label}: {e}") + + +def find_all_mail_folder(mail): + """ + Attempts to select the Gmail All Mail folder using known localized names. + Falls back to using XLIST if needed. + Returns the folder name if found, otherwise None. + """ + COMMON_ALL_MAIL_NAMES = [ + "[Gmail]/Alle Nachrichten", + "[Gmail]/All Mail", + "[Gmail]/Todos", + "[Gmail]/Tutte le mail", + "[Gmail]/Tous les messages", + ] + for candidate in COMMON_ALL_MAIL_NAMES: + status, _ = mail.select(f'"{candidate}"', readonly=True) + if status == "OK": + return candidate + + capabilities = get_capabilities(mail) + if "XLIST" in capabilities: + candidate = find_all_mail_xlist(mail) + if candidate: + return candidate + return None + + +def get_capabilities(mail): + """Returns a list of capabilities supported by the IMAP server.""" + typ, data = mail.capability() + if typ == "OK" and data: + caps = data[0].decode("utf-8", errors="ignore").upper().split() + return caps + return [] + + +def find_all_mail_xlist(mail): + """ + Uses XLIST to discover the mailbox flagged as All Mail. + Returns the folder name if found, otherwise None. + """ + tag = mail._new_tag().decode("ascii") + command_str = f"{tag} XLIST \"\" \"*\"" + mail.send((command_str + "\r\n").encode("utf-8")) + + all_mail_folder = None + while True: + line = mail.readline() + if not line: + break + line_str = line.decode("utf-8", errors="ignore").strip() + if line_str.upper().startswith("* XLIST ") and "\\ALLMAIL" in line_str.upper(): + match = re.search(r'"([^"]+)"$', line_str) + if match: + candidate = match.group(1) + logger.info(f"Found All Mail folder via XLIST: {candidate}") + all_mail_folder = candidate + if line_str.startswith(tag): + break + return all_mail_folder diff --git a/docker-compose.yaml b/docker-compose.yaml index 4329d81..583566e 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -45,6 +45,7 @@ services: depends_on: - redis + - gotenberg # Mount the shared directory (and optionally your code if you want dev mode) volumes: @@ -52,6 +53,11 @@ services: # - ./app:/app - /var/docparse/workdir:/workdir + gotenberg: + image: gotenberg/gotenberg:latest + container_name: gotenberg + + redis: image: redis:alpine container_name: document_redis