diff --git a/app/tasks/extract_metadata_with_gpt.py b/app/tasks/extract_metadata_with_gpt.py index 2cdfee2..22c83b0 100644 --- a/app/tasks/extract_metadata_with_gpt.py +++ b/app/tasks/extract_metadata_with_gpt.py @@ -31,21 +31,33 @@ def extract_json_from_text(text): @celery.task(base=BaseTaskWithRetry) def extract_metadata_with_gpt(s3_filename: str, cleaned_text: str): - """Uses OpenAI GPT-4o to classify document metadata.""" - + """Uses OpenAI GPT-4o-mini to classify document metadata.""" + prompt = f""" -You are an intelligent document classifier. -Given the following extracted text from a document, analyze it and return a JSON object with the following fields: -1. "filename": A machine-readable filename in the format YYYY-MM-DD_DescriptiveTitle (use only letters, numbers, periods, and underscores). -2. "empfaenger": The recipient, or "Unknown" if not found. -3. "absender": The sender, or "Unknown" if not found. -4. "correspondent": A correspondent extracted from the document, or "Unknown". -5. "kommunikationsart": One of [Behoerdlicher_Brief, Rechnung, Kontoauszug, Vertrag, Quittung, Privater_Brief, Einladung, Gewerbliche_Korrespondenz, Newsletter, Werbung, Sonstiges]. -6. "kommunikationskategorie": One of [Amtliche_Postbehoerdliche_Dokumente, Finanz_und_Vertragsdokumente, Geschaeftliche_Kommunikation, Private_Korrespondenz, Sonstige_Informationen]. -7. "document_type": The document type, or "Unknown". -8. "tags": A list of additional keywords extracted from the document. -9. "language": The detected language code (e.g., "DE"). -10. "title": A human-friendly title for the document. +You are a specialized document analyzer trained to extract structured metadata from documents. +Your task is to analyze the given text and return a well-structured JSON object. + +Extract and return the following fields: +1. **filename**: Machine-readable filename (YYYY-MM-DD_DescriptiveTitle, use only letters, numbers, periods, and underscores). +2. **empfaenger**: The recipient, or "Unknown" if not found. +3. **absender**: The sender, or "Unknown" if not found. +4. **correspondent**: The entity or company that issued the document (shortest possible name, e.g., "Amazon" instead of "Amazon EU SARL, German branch"). +5. **kommunikationsart**: One of [Behoerdlicher_Brief, Rechnung, Kontoauszug, Vertrag, Quittung, Privater_Brief, Einladung, Gewerbliche_Korrespondenz, Newsletter, Werbung, Sonstiges]. +6. **kommunikationskategorie**: One of [Amtliche_Postbehoerdliche_Dokumente, Finanz_und_Vertragsdokumente, Geschaeftliche_Kommunikation, Private_Korrespondenz, Sonstige_Informationen]. +7. **document_type**: Precise classification (e.g., Invoice, Contract, Information, Unknown). +8. **tags**: A list of up to 4 relevant thematic keywords. +9. **language**: Detected document language (ISO 639-1 code, e.g., "de" or "en"). +10. **title**: A human-readable title summarizing the document content. +11. **confidence_score**: A numeric value (0-100) indicating the confidence level of the extracted metadata. +12. **reference_number**: Extracted invoice/order/reference number if available. +13. **monetary_amounts**: A list of key monetary values detected in the document. + +### Important Rules: +- **OCR Correction**: Assume the text has been corrected for OCR errors. +- **Tagging**: Max 4 tags, avoiding generic or overly specific terms. +- **Title**: Concise, no addresses, and contains key identifying features. +- **Date Selection**: Use the most relevant date if multiple are found. +- **Output Language**: Maintain the document's original language. Extracted text: {cleaned_text} @@ -56,7 +68,7 @@ def extract_metadata_with_gpt(s3_filename: str, cleaned_text: str): try: print(f"[DEBUG] Sending classification request for {s3_filename}...") completion = client.chat.completions.create( - model="gpt-4o", + model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are an intelligent document classifier."}, {"role": "user", "content": prompt} @@ -83,4 +95,3 @@ def extract_metadata_with_gpt(s3_filename: str, cleaned_text: str): except Exception as e: print(f"[ERROR] OpenAI classification failed for {s3_filename}: {e}") return {} - diff --git a/app/tasks/imap_tasks.py b/app/tasks/imap_tasks.py index d25a0ac..a129593 100644 --- a/app/tasks/imap_tasks.py +++ b/app/tasks/imap_tasks.py @@ -114,7 +114,7 @@ def pull_inbox( delete_after_process: bool ): """ - Connects to the IMAP inbox, fetches new emails (last 7 days), + Connects to the IMAP inbox, fetches new emails (last 3 days), and processes attachments while preserving the original unread status. """ logger.info(f"Connecting to {mailbox_key} at {host}:{port} (SSL={use_ssl})") @@ -129,8 +129,8 @@ def pull_inbox( mail.login(username, password) mail.select("INBOX") - # Fetch emails from the last 7 days - since_date = (datetime.utcnow() - timedelta(days=7)).strftime("%d-%b-%Y") + # Fetch emails from the last 3 days + since_date = (datetime.utcnow() - timedelta(days=3)).strftime("%d-%b-%Y") status, search_data = mail.search(None, f'(SINCE {since_date})') if status != "OK": logger.warning(f"Search failed on mailbox {mailbox_key}. Status={status}") @@ -139,7 +139,7 @@ def pull_inbox( return msg_numbers = search_data[0].split() - logger.info(f"Found {len(msg_numbers)} emails from the last 7 days in {mailbox_key}.") + logger.info(f"Found {len(msg_numbers)} emails from the last 3 days in {mailbox_key}.") for num in msg_numbers: # Check if email is unread diff --git a/app/tasks/upload_to_paperless.py b/app/tasks/upload_to_paperless.py index 0c7ae97..8f14c72 100644 --- a/app/tasks/upload_to_paperless.py +++ b/app/tasks/upload_to_paperless.py @@ -1,12 +1,11 @@ #!/usr/bin/env python3 import os -import re import json import time import requests import logging -from typing import Optional, Dict, Any, List +from typing import Dict, Any from app.config import settings from app.tasks.retry_config import BaseTaskWithRetry @@ -33,145 +32,6 @@ def _paperless_api_url(path: str) -> str: path = "/" + path return f"{host}{path}" -def get_or_create_correspondent(name: str) -> Optional[int]: - """ - Look up or create a Paperless 'correspondent' by name. - Return its ID or None if name is empty/unknown or if creation fails. - """ - if not name or name.lower() == "unknown": - return None - - url = _paperless_api_url("/api/correspondents/") - try: - # Attempt to find existing by name - resp = requests.get(url, headers=_get_headers(), params={"name": name}) - resp.raise_for_status() - data = resp.json() - - existing = [c for c in data["results"] if c["name"] == name] - if existing: - return existing[0]["id"] - - # Create new - create_resp = requests.post( - url, - headers={**_get_headers(), "Content-Type": "application/json"}, - json={"name": name} - ) - create_resp.raise_for_status() - return create_resp.json()["id"] - - except requests.exceptions.RequestException as exc: - logger.warning( - "Failed to get/create correspondent '%s'. Error: %s. Response=%s", - name, exc, getattr(exc.response, "text", "") - ) - return None - - -def get_or_create_document_type(name: str) -> Optional[int]: - """ - Look up or create a Paperless 'document_type' by name. - Return its ID or None if name is empty/unknown or if creation fails. - """ - if not name or name.lower() == "unknown": - return None - - url = _paperless_api_url("/api/document_types/") - try: - resp = requests.get(url, headers=_get_headers(), params={"name": name}) - resp.raise_for_status() - data = resp.json() - - existing = [dt for dt in data["results"] if dt["name"] == name] - if existing: - return existing[0]["id"] - - create_resp = requests.post( - url, - headers={**_get_headers(), "Content-Type": "application/json"}, - json={"name": name} - ) - create_resp.raise_for_status() - return create_resp.json()["id"] - - except requests.exceptions.RequestException as exc: - logger.warning( - "Failed to get/create document type '%s'. Error: %s. Response=%s", - name, exc, getattr(exc.response, "text", "") - ) - return None - - -def get_or_create_tag(tag_name: str) -> Optional[int]: - """ - Look up or create a Paperless 'tag' by name. - Return its ID or None if tag_name is empty/unknown or if creation fails. - """ - if not tag_name or tag_name.lower() == "unknown": - return None - - url = _paperless_api_url("/api/tags/") - try: - resp = requests.get(url, headers=_get_headers(), params={"name": tag_name}) - resp.raise_for_status() - data = resp.json() - - existing = [t for t in data["results"] if t["name"] == tag_name] - if existing: - return existing[0]["id"] - - create_resp = requests.post( - url, - headers={**_get_headers(), "Content-Type": "application/json"}, - json={"name": tag_name} - ) - create_resp.raise_for_status() - return create_resp.json()["id"] - - except requests.exceptions.RequestException as exc: - logger.warning( - "Failed to get/create tag '%s'. Error: %s. Response=%s", - tag_name, exc, getattr(exc.response, "text", "") - ) - return None - - -def get_or_create_custom_field(field_name: str) -> Optional[int]: - """ - Look up or create a Paperless 'custom_field' by name. - Return its ID or None if empty or if creation fails. - """ - if not field_name: - logger.warning("Field name must not be empty.") - return None - - url = _paperless_api_url("/api/custom_fields/") - try: - resp = requests.get(url, headers=_get_headers(), params={"name": field_name}) - resp.raise_for_status() - data = resp.json() - - existing = [cf for cf in data["results"] if cf["name"] == field_name] - if existing: - return existing[0]["id"] - - create_resp = requests.post( - url, - headers={**_get_headers(), "Content-Type": "application/json"}, - json={"name": field_name, "data_type": "string"} - ) - create_resp.raise_for_status() - return create_resp.json()["id"] - - except requests.exceptions.RequestException as exc: - logger.warning( - "Failed to get/create custom field '%s'. Error: %s. Response=%s", - field_name, exc, getattr(exc.response, "text", "") - ) - return None - - def poll_task_for_document_id(task_id: str) -> int: """ Polls /api/tasks/?task_id= until we get status=SUCCESS or FAILURE, @@ -198,7 +58,6 @@ def poll_task_for_document_id(task_id: str) -> int: attempts += 1 continue - # The response is typically a list or a dict with "results" if isinstance(tasks_data, dict) and "results" in tasks_data: tasks_data = tasks_data["results"] @@ -206,13 +65,9 @@ def poll_task_for_document_id(task_id: str) -> int: task_info = tasks_data[0] status = task_info.get("status") if status == "SUCCESS": - doc_str = task_info.get("related_document") # e.g. "56712" + doc_str = task_info.get("related_document") if doc_str: return int(doc_str) - # Fallback: parse from 'result' text - match = re.search(r"New document id (\d+)", task_info.get("result", "")) - if match: - return int(match.group(1)) raise RuntimeError( f"Task {task_id} completed but no doc ID found. Task info: {task_info}" ) @@ -226,113 +81,35 @@ def poll_task_for_document_id(task_id: str) -> int: f"Task {task_id} didn't reach SUCCESS within {POLL_MAX_ATTEMPTS} attempts." ) - -def patch_document_custom_fields(document_id: int, field_values: Dict[int, str]) -> None: - """ - Applies custom field values to an existing document: - PATCH /api/documents// - JSON body: { "custom_fields": [ { "field": , "value": }, ... ] } - """ - if not field_values: - return - - url = _paperless_api_url(f"/api/documents/{document_id}/") - payload = { - "custom_fields": [ - {"field": cf_id, "value": cf_val} - for cf_id, cf_val in field_values.items() - ] - } - - try: - resp = requests.patch( - url, - headers={**_get_headers(), "Content-Type": "application/json"}, - json=payload - ) - resp.raise_for_status() - except requests.exceptions.RequestException as exc: - logger.warning( - "Failed to patch custom fields for doc %d. Error: %s. Payload=%s Response=%s", - document_id, exc, payload, getattr(exc.response, "text", "") - ) - # We skip raising here, so ingestion can continue. - - @celery.task(base=BaseTaskWithRetry) def upload_to_paperless(file_path: str) -> Dict[str, Any]: """ - 1. Reads JSON metadata from a matching .json file. - 2. Creates/fetches correspondents, doc types, tags, custom fields as needed (with graceful error handling). - 3. POSTs the PDF to Paperless => returns a quoted UUID string (task_id). - 4. Polls /api/tasks/?task_id= until SUCCESS or FAILURE => doc_id - 5. PATCHes custom fields onto the doc if present. + Uploads a PDF to Paperless with minimal metadata (filename and date only). + + 1. Extracts the filename and date from the file. + 2. POSTs the PDF to Paperless => returns a quoted UUID string (task_id). + 3. Polls /api/tasks/?task_id= until SUCCESS or FAILURE => doc_id. Returns a dict with status, the paperless_task_id, paperless_document_id, and file_path. """ if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") - base_name, ext = os.path.splitext(file_path) - json_path = f"{base_name}.json" - if not os.path.exists(json_path): - raise FileNotFoundError(f"Metadata JSON not found: {json_path}") - - # Read JSON metadata - with open(json_path, "r", encoding="utf-8") as jf: - metadata = json.load(jf) - - # Built-in Paperless fields - title = metadata.get("title") or os.path.basename(base_name) - corr_name = metadata.get("correspondent", "") or metadata.get("absender", "") - if corr_name.lower() == "unknown": - corr_name = "" - # Attempt creation, if fails, returns None - correspondent_id = get_or_create_correspondent(corr_name) + base_name = os.path.basename(file_path) - doc_type_str = metadata.get("document_type", "") - if doc_type_str.lower() == "unknown": - doc_type_str = "" - # Attempt creation, if fails, returns None - document_type_id = get_or_create_document_type(doc_type_str) if doc_type_str else None - - # Tags - tag_ids: List[int] = [] - tags_list = metadata.get("tags", []) - for tag_item in tags_list: - if tag_item and tag_item.lower() != "unknown": - # If creation fails, returns None and is skipped - tid = get_or_create_tag(tag_item) - if tid: - tag_ids.append(tid) - else: - logger.warning("Skipping invalid tag '%s'", tag_item) - - # 1) Upload PDF + # Upload the PDF post_url = _paperless_api_url("/api/documents/post_document/") with open(file_path, "rb") as f: files = { - "document": (os.path.basename(file_path), f, "application/pdf"), + "document": (base_name, f, "application/pdf"), } - data = {"title": title} - if correspondent_id: - data["correspondent"] = correspondent_id - if document_type_id: - data["document_type"] = document_type_id - - # Paperless can handle repeated form fields for tags[] or a single array - for t_id in tag_ids: - data.setdefault("tags", []).append(str(t_id)) + data = {"title": base_name} # Title = Filename (no additional metadata) try: - logger.debug( - "Posting document to Paperless: data=%s, file=%s", - data, os.path.basename(file_path) - ) + logger.debug("Posting document to Paperless: file=%s", base_name) resp = requests.post(post_url, headers=_get_headers(), files=files, data=data) resp.raise_for_status() except requests.exceptions.RequestException as exc: - # This is a hard fail: if the main doc upload fails, there's no doc in Paperless at all logger.error( "Failed to upload document '%s' to Paperless. Error: %s. Response=%s", file_path, exc, getattr(exc.response, "text", "") @@ -342,30 +119,10 @@ def upload_to_paperless(file_path: str) -> Dict[str, Any]: raw_task_id = resp.text.strip().strip('"').strip("'") logger.info(f"Received Paperless task ID: {raw_task_id}") - # 2) Poll tasks until success/fail => get doc_id + # Poll tasks until success/fail => get doc_id doc_id = poll_task_for_document_id(raw_task_id) logger.info(f"Document {file_path} successfully ingested => ID={doc_id}") - # 3) Create custom fields for leftover JSON keys - built_in_keys = {"filename", "title", "tags", "document_type", "correspondent", "absender"} - field_values_map = {} - for key, val in metadata.items(): - if key in built_in_keys: - continue - if not val or str(val).lower() == "unknown": - continue - - cf_id = get_or_create_custom_field(key) - if cf_id is not None: - field_values_map[cf_id] = str(val) - else: - logger.warning("Skipping custom field '%s' due to creation error.", key) - - # 4) Patch custom fields onto the doc - if field_values_map: - patch_document_custom_fields(doc_id, field_values_map) - logger.info(f"Patched custom fields for doc {doc_id}") - return { "status": "Completed", "paperless_task_id": raw_task_id,