Skip to content

Commit

Permalink
Added better GMail handling and multi file type support
Browse files Browse the repository at this point in the history
  • Loading branch information
christianlouis committed Feb 19, 2025
1 parent 2241c1d commit c40e9f0
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 44 deletions.
4 changes: 3 additions & 1 deletion .env.demo
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ IMAP2_HOST=imap.gmail.com
IMAP2_PORT=993
IMAP2_SSL=true
IMAP2_POLL_INTERVAL_MINUTES=10
IMAP2_DELETE_AFTER_PROCESS=false
IMAP2_DELETE_AFTER_PROCESS=false

GOTENBERG_URL=http://gotenberg:3000
1 change: 1 addition & 0 deletions app/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from app.tasks.refine_text_with_gpt import refine_text_with_gpt
from app.tasks.extract_metadata_with_gpt import extract_metadata_with_gpt
from app.tasks.embed_metadata_into_pdf import embed_metadata_into_pdf
from app.tasks.convert_to_pdf import convert_to_pdf

# Import new send tasks
from app.tasks.upload_to_dropbox import upload_to_dropbox
Expand Down
1 change: 1 addition & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class Settings(BaseSettings):
azure_ai_key: str
azure_region: str
azure_endpoint: str
gotenberg_url: str

# IMAP 1
imap1_host: Optional[str] = None
Expand Down
39 changes: 39 additions & 0 deletions app/tasks/convert_to_pdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env python3
import os
import requests
import logging
from celery import shared_task
from app.config import settings
from app.tasks.upload_to_s3 import upload_to_s3

logger = logging.getLogger(__name__)

@shared_task
def convert_to_pdf(file_path):
"""
Converts a file to PDF using Gotenberg's API.
On success, saves the PDF locally and enqueues it for S3 upload.
"""
# Ensure that settings contain the Gotenberg URL (e.g., "http://gotenberg:3000")
gotenberg_url = getattr(settings, "gotenberg_url", None)
if not gotenberg_url:
logger.error("Gotenberg URL is not configured in settings.")
return

try:
with open(file_path, "rb") as f:
files = {"file": f}
# Adjust the endpoint path if needed.
response = requests.post(f"{gotenberg_url}/convert", files=files)
if response.status_code == 200:
converted_file_path = os.path.splitext(file_path)[0] + ".pdf"
with open(converted_file_path, "wb") as f:
f.write(response.content)
logger.info(f"Converted file saved as PDF: {converted_file_path}")
# Enqueue the upload of the converted PDF.
upload_to_s3.delay(converted_file_path)
return converted_file_path
else:
logger.error(f"Conversion failed for {file_path}. Status code: {response.status_code}")
except Exception as e:
logger.exception(f"Error converting {file_path} to PDF: {e}")
160 changes: 117 additions & 43 deletions app/tasks/imap_tasks.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
#!/usr/bin/env python3

import os
import json
import email
import imaplib
import logging
import redis
import re
from datetime import datetime, timedelta, timezone
from celery import shared_task
from app.config import settings
from app.tasks.upload_to_s3 import upload_to_s3
from app.tasks.convert_to_pdf import convert_to_pdf # new conversion task

logger = logging.getLogger(__name__)

# Initialize Redis connection using Celery's Redis settings
redis_client = redis.StrictRedis.from_url(settings.redis_url, decode_responses=True)

LOCK_KEY = "imap_lock" # Unique key for locking
LOCK_EXPIRE = 300 # Lock expires in 5 minutes (300 seconds)
LOCK_EXPIRE = 300 # Lock expires in 5 minutes

# Local cache file for tracking processed emails
CACHE_FILE = os.path.join(settings.workdir, "processed_mails.json")
Expand Down Expand Up @@ -46,7 +47,7 @@ def load_processed_emails():
try:
with open(CACHE_FILE, "r") as f:
processed_emails = json.load(f)
processed_emails = cleanup_old_entries(processed_emails) # Remove old entries
processed_emails = cleanup_old_entries(processed_emails)
return processed_emails
except json.JSONDecodeError:
logger.warning("Failed to decode JSON, resetting processed emails cache.")
Expand All @@ -64,14 +65,11 @@ def cleanup_old_entries(processed_emails):
"""Remove entries older than 7 days from the cache to avoid infinite growth."""
seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7)
valid_emails = {}

for msg_id, date_str in processed_emails.items():
naive_dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S")
aware_dt = naive_dt.replace(tzinfo=timezone.utc) # Make it UTC-aware

aware_dt = naive_dt.replace(tzinfo=timezone.utc)
if aware_dt > seven_days_ago:
valid_emails[msg_id] = date_str

return valid_emails


Expand All @@ -89,7 +87,7 @@ def pull_all_inboxes():
try:
logger.info("Starting pull_all_inboxes")

# Mailbox #1
# Mailbox #1 (non-Gmail)
check_and_pull_mailbox(
mailbox_key="imap1",
host=settings.imap1_host,
Expand All @@ -114,7 +112,6 @@ def pull_all_inboxes():
logger.info("Finished pull_all_inboxes")

finally:
# Ensure the lock is always released
release_lock()


Expand Down Expand Up @@ -148,19 +145,38 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte
"""
Connects to the IMAP inbox, fetches new unread emails from the last 3 days,
and processes attachments while preserving the original unread status.
Gmail hosts also check for the 'Ingested' label to skip already-labeled emails,
then apply star + label to processed emails.
For Gmail:
- Attempts to select the localized All Mail folder.
- Runs an X-GM-RAW query: "in:anywhere in:unread newer_than:3d has:attachment".
For non-Gmail mailboxes, it falls back to selecting the INBOX with a SINCE/UNSEEN filter.
"""
logger.info(f"Connecting to {mailbox_key} at {host}:{port} (SSL={use_ssl})")
processed_emails = load_processed_emails()

try:
mail = imaplib.IMAP4_SSL(host, port) if use_ssl else imaplib.IMAP4(host, port)
mail.login(username, password)
mail.select("INBOX")

since_date = (datetime.now(timezone.utc) - timedelta(days=3)).strftime("%d-%b-%Y")
status, search_data = mail.search(None, f'(SINCE {since_date} UNSEEN)')
is_gmail_host = "gmail" in host.lower()
if is_gmail_host:
# For Gmail, try to select the localized All Mail folder.
all_mail_folder = find_all_mail_folder(mail)
if all_mail_folder:
logger.info(f"Using Gmail All Mail folder: {all_mail_folder}")
mail.select(f'"{all_mail_folder}"')
else:
logger.warning("Gmail All Mail folder not found, falling back to INBOX.")
mail.select("INBOX")
# Use the X-GM-RAW query for Gmail.
raw_query = "in:anywhere in:unread newer_than:3d has:attachment"
status, search_data = mail.search(None, "X-GM-RAW", f'"{raw_query}"')
else:
# For non-Gmail, select INBOX and use SINCE/UNSEEN query.
mail.select("INBOX")
since_date = (datetime.now(timezone.utc) - timedelta(days=3)).strftime("%d-%b-%Y")
status, search_data = mail.search(None, f'(SINCE {since_date} UNSEEN)')

if status != "OK":
logger.warning(f"Search failed on mailbox {mailbox_key}. Status={status}")
Expand All @@ -185,38 +201,30 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte
logger.warning(f"Skipping email without Message-ID in {mailbox_key}")
continue

# Skip if already processed (via local JSON cache)
if msg_id in processed_emails:
logger.info(f"Skipping already processed email {msg_id} in {mailbox_key}")
continue

# For Gmail only, check if already labeled "Ingested"
# to avoid reprocessing messages that have the label.
is_gmail_host = "gmail" in host.lower()
# For Gmail, check if the email already has the "Ingested" label.
if is_gmail_host:
if email_already_has_label(mail, num, "Ingested"):
logger.info(f"Skipping email {msg_id} in {mailbox_key}, already has 'Ingested' label.")
logger.info(f"Skipping email {msg_id} in {mailbox_key}, already labeled 'Ingested'.")
continue

# Process attachments
# Process attachments (and convert non-PDF files)
has_attachment = fetch_attachments_and_enqueue(email_message)

# If it's Gmail, mark processed with star/label
if is_gmail_host:
mark_as_processed_with_star(mail, num)
mark_as_processed_with_label(mail, num, label="Ingested")

# Record that we processed this email
processed_emails[msg_id] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
save_processed_emails(processed_emails)

# Delete or restore unread status
if delete_after_process:
logger.info(f"Deleting message {num.decode()} from {mailbox_key}")
mail.store(num, "+FLAGS", "\\Deleted")
else:
# Mark as read if you want to.
# Currently, it just resets the \Seen flag if it was unread.
mail.store(num, "-FLAGS", "\\Seen")

if delete_after_process:
Expand All @@ -232,62 +240,128 @@ def pull_inbox(mailbox_key, host, port, username, password, use_ssl, delete_afte

def fetch_attachments_and_enqueue(email_message):
"""
Extracts PDF attachments and enqueues them for upload to S3.
Returns True if at least one PDF attachment was found.
Extracts attachments from the email.
- If the attachment is a PDF (MIME type "application/pdf"), it is enqueued for upload.
- For any other attachment, a conversion task is enqueued that converts it to PDF.
Returns True if at least one attachment was processed.
"""
has_attachment = False

for part in email_message.walk():
if part.get_content_maintype() == "multipart":
continue

filename = part.get_filename()
content_type = part.get_content_type()
if not filename:
continue

if filename and content_type == "application/pdf":
file_path = os.path.join(settings.workdir, filename)
with open(file_path, "wb") as f:
f.write(part.get_payload(decode=True))
file_path = os.path.join(settings.workdir, filename)
with open(file_path, "wb") as f:
f.write(part.get_payload(decode=True))

if part.get_content_type() == "application/pdf":
upload_to_s3.delay(file_path)
logger.info(f"Enqueued PDF attachment for upload: {filename}")
has_attachment = True
logger.info(f"Enqueued PDF for upload: {filename}")
else:
# Enqueue conversion to PDF using the Gotenberg service.
convert_to_pdf.delay(file_path)
logger.info(f"Enqueued file for conversion to PDF: {filename}")

has_attachment = True
return has_attachment


def email_already_has_label(mail, msg_id, label="Ingested"):
"""
Check if the given message (msg_id) already has the specified label in Gmail.
Checks if the given message (msg_id) has the specified Gmail label.
Returns True if the label is found, False otherwise.
"""
try:
label_status, label_data = mail.fetch(msg_id, "(X-GM-LABELS)")
if label_status == "OK" and label_data and len(label_data) > 0:
# label_data[0] = (b'num (X-GM-LABELS (\\Inbox \\Unread "Ingested"))', ...)
raw_labels = label_data[0][1].decode("utf-8", errors="ignore")
# Check if the label string is present in the raw label data
if label in raw_labels:
return True
except Exception as e:
logger.error(f"Failed to fetch labels for msg_id={msg_id}: {e}")

return False


def mark_as_processed_with_star(mail, msg_id):
"""Star the email in Gmail."""
"""Stars the email in Gmail."""
try:
mail.store(msg_id, "+FLAGS", "\\Flagged")
logger.info(f"Email {msg_id} starred in Gmail.")
except Exception as e:
logger.error(f"Failed to mark email {msg_id} with star: {e}")
logger.error(f"Failed to star email {msg_id}: {e}")


def mark_as_processed_with_label(mail, msg_id, label="Ingested"):
"""Add a custom label to the email in Gmail."""
"""Adds a custom label to the email in Gmail."""
try:
mail.store(msg_id, "+X-GM-LABELS", label)
logger.info(f"Email {msg_id} labeled '{label}' in Gmail.")
except Exception as e:
logger.error(f"Failed to mark email {msg_id} with label {label}: {e}")
logger.error(f"Failed to label email {msg_id} with {label}: {e}")


def find_all_mail_folder(mail):
"""
Attempts to select the Gmail All Mail folder using known localized names.
Falls back to using XLIST if needed.
Returns the folder name if found, otherwise None.
"""
COMMON_ALL_MAIL_NAMES = [
"[Gmail]/Alle Nachrichten",
"[Gmail]/All Mail",
"[Gmail]/Todos",
"[Gmail]/Tutte le mail",
"[Gmail]/Tous les messages",
]
for candidate in COMMON_ALL_MAIL_NAMES:
status, _ = mail.select(f'"{candidate}"', readonly=True)
if status == "OK":
return candidate

capabilities = get_capabilities(mail)
if "XLIST" in capabilities:
candidate = find_all_mail_xlist(mail)
if candidate:
return candidate
return None


def get_capabilities(mail):
"""Returns a list of capabilities supported by the IMAP server."""
typ, data = mail.capability()
if typ == "OK" and data:
caps = data[0].decode("utf-8", errors="ignore").upper().split()
return caps
return []


def find_all_mail_xlist(mail):
"""
Uses XLIST to discover the mailbox flagged as All Mail.
Returns the folder name if found, otherwise None.
"""
tag = mail._new_tag().decode("ascii")
command_str = f"{tag} XLIST \"\" \"*\""
mail.send((command_str + "\r\n").encode("utf-8"))

all_mail_folder = None
while True:
line = mail.readline()
if not line:
break
line_str = line.decode("utf-8", errors="ignore").strip()
if line_str.upper().startswith("* XLIST ") and "\\ALLMAIL" in line_str.upper():
match = re.search(r'"([^"]+)"$', line_str)
if match:
candidate = match.group(1)
logger.info(f"Found All Mail folder via XLIST: {candidate}")
all_mail_folder = candidate
if line_str.startswith(tag):
break
return all_mail_folder
6 changes: 6 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@ services:

depends_on:
- redis
- gotenberg

# Mount the shared directory (and optionally your code if you want dev mode)
volumes:
# optional: mount your code if you want local dev changes
# - ./app:/app
- /var/docparse/workdir:/workdir

gotenberg:
image: gotenberg/gotenberg:latest
container_name: gotenberg


redis:
image: redis:alpine
container_name: document_redis
Expand Down

0 comments on commit c40e9f0

Please sign in to comment.