Skip to content

Commit

Permalink
Added first working version of the code. Processes
Browse files Browse the repository at this point in the history
PDF files, no upload yet.
  • Loading branch information
christianlouis committed Feb 11, 2025
1 parent e16af90 commit 1f2885a
Show file tree
Hide file tree
Showing 17 changed files with 559 additions and 62 deletions.
18 changes: 9 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
# Stage 1: Build

# Stage 1: Build dependencies
FROM python:3.11 AS builder

WORKDIR /app

# Copy only dependency files first for better caching
COPY requirements.txt /app/
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Stage 2: Final image
FROM python:3.11-slim

WORKDIR /app

# Copy installed dependencies from builder stage
# Copy installed dependencies
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin

# Copy application files
COPY ./app /app
# Copy application files correctly
COPY ./app /app/app

# Set environment variables
ENV PYTHONUNBUFFERED=1
# Set Python path explicitly
ENV PYTHONPATH=/app

# Expose API port
EXPOSE 8000
WORKDIR /app

# Default command
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Empty file added app/__init__.py
Empty file.
20 changes: 20 additions & 0 deletions app/celery_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# app/celery_app.py

from celery import Celery
from app.config import settings

celery = Celery(
"document_processor",
broker=settings.redis_url,
backend=settings.redis_url,
)


# Optionally add this line to retain connection retry behavior at startup:
celery.conf.broker_connection_retry_on_startup = True

# Set the default queue and routing so that tasks are enqueued on "document_processor"
celery.conf.task_default_queue = 'document_processor'
celery.conf.task_routes = {
"app.tasks.*": {"queue": "document_processor"},
}
19 changes: 16 additions & 3 deletions app/celery_worker.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
#!/usr/bin/env python3

from celery import Celery
from .config import settings
from app.config import settings

# Import the shared Celery instance
from app.celery_app import celery

# Ensure tasks are loaded
from app import tasks # <— This imports app/tasks.py so Celery can register 'process_document'

# **Ensure all tasks are imported before Celery starts**
from app.tasks.upload_to_s3 import upload_to_s3
from app.tasks.process_with_textract import process_with_textract
from app.tasks.refine_text_with_gpt import refine_text_with_gpt
from app.tasks.extract_metadata_with_gpt import extract_metadata_with_gpt
from app.tasks.embed_metadata_into_pdf import embed_metadata_into_pdf


celery = Celery("document_processor", broker=settings.redis_url, backend=settings.redis_url)

celery.conf.task_routes = {
"app.tasks.*": {"queue": "default"},
Expand All @@ -12,3 +24,4 @@
@celery.task
def test_task():
return "Celery is working!"

6 changes: 5 additions & 1 deletion app/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

from pydantic import BaseSettings
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
admin_username: str
Expand All @@ -11,6 +11,10 @@ class Settings(BaseSettings):
aws_textract_role_arn: str
database_url: str
redis_url: str
s3_bucket_name: str
openai_api_key: str



class Config:
env_file = ".env"
Expand Down
25 changes: 14 additions & 11 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
#!/usr/bin/env python3

from fastapi import FastAPI, Depends
from .config import settings
from .database import engine, SessionLocal
from .models import Base
from .tasks import process_document
from fastapi import FastAPI, HTTPException
from .tasks.upload_to_s3 import upload_to_s3
import os

app = FastAPI(title="Document Processor")

# Initialize database
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Document Processing API")

@app.get("/")
def root():
return {"message": "Document Processing API"}

@app.post("/process/")
def process(file_path: str):
"""Trigger document processing"""
task = process_document.delay(file_path)
"""
API Endpoint to start document processing.
This enqueues the first task (upload_to_s3), which handles the full pipeline.
"""

if not os.path.exists(file_path):
raise HTTPException(status_code=400, detail=f"File {file_path} not found.")

task = upload_to_s3.delay(file_path)
return {"task_id": task.id, "status": "queued"}

27 changes: 0 additions & 27 deletions app/tasks.py

This file was deleted.

Empty file added app/tasks/__init__.py
Empty file.
117 changes: 117 additions & 0 deletions app/tasks/embed_metadata_into_pdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python3

import os
import shutil
import fitz # PyMuPDF for PDF metadata editing
import json
from app.config import settings
from app.tasks.retry_config import BaseTaskWithRetry
from app.tasks.finalize_document_storage import finalize_document_storage

# Import the shared Celery instance
from app.celery_app import celery

def unique_filepath(directory, base_filename, extension=".pdf"):
"""
Returns a unique filepath in the specified directory.
If 'base_filename.pdf' exists, it will append an underscore and counter.
"""
candidate = os.path.join(directory, base_filename + extension)
if not os.path.exists(candidate):
return candidate
counter = 1
while True:
candidate = os.path.join(directory, f"{base_filename}_{counter}{extension}")
if not os.path.exists(candidate):
return candidate
counter += 1

def persist_metadata(metadata, final_pdf_path):
"""
Saves the metadata dictionary to a JSON file with the same base name as the final PDF.
For example, if final_pdf_path is "/var/docparse/working/processed/MyFile.pdf",
the metadata will be saved as "/var/docparse/working/processed/MyFile.json".
"""
base, _ = os.path.splitext(final_pdf_path)
json_path = base + ".json"
with open(json_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
return json_path

@celery.task(base=BaseTaskWithRetry)
def embed_metadata_into_pdf(local_file_path: str, extracted_text: str, metadata: dict):
"""
Embeds extracted metadata into the PDF's standard metadata fields.
The mapping is as follows:
- title: uses the extracted metadata "filename"
- author: uses "absender" (or "Unknown" if missing)
- subject: uses "document_type" (or "Unknown")
- keywords: a comma‐separated list from the "tags" field
After processing, the file is moved to
/var/docparse/working/processed/<suggested_filename.pdf>
where <suggested_filename.pdf> is derived from metadata["filename"].
The output PDF is saved incrementally while preserving its original encryption.
Additionally, the metadata is persisted to a JSON file with the same base name.
"""
# Check for file existence; if not found, try the known shared directory.
if not os.path.exists(local_file_path):
alt_path = os.path.join("/var/docparse/working/tmp", os.path.basename(local_file_path))
if os.path.exists(alt_path):
local_file_path = alt_path
else:
print(f"[ERROR] Local file {local_file_path} not found, cannot embed metadata.")
return {"error": "File not found"}

# Work on a safe copy in /tmp
tmp_dir = "/tmp"
original_file = local_file_path
processed_file = os.path.join(tmp_dir, f"processed_{os.path.basename(local_file_path)}")

# Create a safe copy to work on
shutil.copy(original_file, processed_file)

try:
print(f"[DEBUG] Embedding metadata into {processed_file}...")

# Open the PDF
doc = fitz.open(processed_file)
# Set PDF metadata using only the standard keys.
doc.set_metadata({
"title": metadata.get("filename", "Unknown Document"),
"author": metadata.get("absender", "Unknown"),
"subject": metadata.get("document_type", "Unknown"),
"keywords": ", ".join(metadata.get("tags", []))
})
# Save incrementally and preserve encryption
doc.save(processed_file, incremental=True, encryption=fitz.PDF_ENCRYPT_KEEP)
doc.close()

print(f"[INFO] Metadata embedded successfully in {processed_file}")

# Use the suggested filename from metadata; if not provided, use the original basename.
suggested_filename = metadata.get("filename", os.path.splitext(os.path.basename(local_file_path))[0])
# Remove any extension and then add .pdf
suggested_filename = os.path.splitext(suggested_filename)[0]
# Define the final directory and ensure it exists.
final_dir = "/var/docparse/working/processed"
os.makedirs(final_dir, exist_ok=True)
# Get a unique filepath in case of collisions.
final_file_path = unique_filepath(final_dir, suggested_filename, extension=".pdf")

# Move the processed file using shutil.move to handle cross-device moves.
shutil.move(processed_file, final_file_path)

# Persist the metadata into a JSON file with the same base name.
json_path = persist_metadata(metadata, final_file_path)
print(f"[INFO] Metadata persisted to {json_path}")

# Trigger the next step: final storage.
finalize_document_storage.delay(original_file, final_file_path, metadata)

return {"file": final_file_path, "metadata_file": json_path, "status": "Metadata embedded"}

except Exception as e:
print(f"[ERROR] Failed to embed metadata into {processed_file}: {e}")
return {"error": str(e)}

86 changes: 86 additions & 0 deletions app/tasks/extract_metadata_with_gpt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python3

import json
import re
from openai import OpenAI
from app.config import settings
from app.tasks.retry_config import BaseTaskWithRetry
from app.tasks.embed_metadata_into_pdf import embed_metadata_into_pdf

# Import the shared Celery instance
from app.celery_app import celery

client = OpenAI(api_key=settings.openai_api_key)

def extract_json_from_text(text):
"""
Try to extract a JSON object from the text.
- First, check for a JSON block inside triple backticks.
- If not found, try to extract text from the first '{' to the last '}'.
"""
pattern = r"```(?:json)?\s*(\{.*?\})\s*```"
match = re.search(pattern, text, re.DOTALL)
if match:
return match.group(1)
else:
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end > start:
return text[start:end+1]
return None

@celery.task(base=BaseTaskWithRetry)
def extract_metadata_with_gpt(s3_filename: str, cleaned_text: str):
"""Uses OpenAI GPT-4o to classify document metadata."""

prompt = f"""
You are an intelligent document classifier.
Given the following extracted text from a document, analyze it and return a JSON object with the following fields:
1. "filename": A machine-readable filename in the format YYYY-MM-DD_DescriptiveTitle (use only letters, numbers, periods, and underscores).
2. "empfaenger": The recipient, or "Unknown" if not found.
3. "absender": The sender, or "Unknown" if not found.
4. "correspondent": A correspondent extracted from the document, or "Unknown".
5. "kommunikationsart": One of [Behoerdlicher_Brief, Rechnung, Kontoauszug, Vertrag, Quittung, Privater_Brief, Einladung, Gewerbliche_Korrespondenz, Newsletter, Werbung, Sonstiges].
6. "kommunikationskategorie": One of [Amtliche_Postbehoerdliche_Dokumente, Finanz_und_Vertragsdokumente, Geschaeftliche_Kommunikation, Private_Korrespondenz, Sonstige_Informationen].
7. "document_type": The document type, or "Unknown".
8. "tags": A list of additional keywords extracted from the document.
9. "language": The detected language code (e.g., "DE").
10. "title": A human-friendly title for the document.
Extracted text:
{cleaned_text}
Return only valid JSON with no additional commentary.
"""

try:
print(f"[DEBUG] Sending classification request for {s3_filename}...")
completion = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are an intelligent document classifier."},
{"role": "user", "content": prompt}
],
temperature=0
)

content = completion.choices[0].message.content
print(f"[DEBUG] Raw classification response for {s3_filename}: {content}")

json_text = extract_json_from_text(content)
if not json_text:
print(f"[ERROR] Could not find valid JSON in GPT response for {s3_filename}.")
return {}

metadata = json.loads(json_text)
print(f"[DEBUG] Extracted metadata: {metadata}")

# Trigger the next step: embedding metadata into the PDF
embed_metadata_into_pdf.delay(s3_filename, cleaned_text, metadata)

return {"s3_file": s3_filename, "metadata": metadata}

except Exception as e:
print(f"[ERROR] OpenAI classification failed for {s3_filename}: {e}")
return {}

14 changes: 14 additions & 0 deletions app/tasks/finalize_document_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env python3

from app.config import settings
from app.tasks.retry_config import BaseTaskWithRetry

# Import the shared Celery instance
from app.celery_app import celery

@celery.task(base=BaseTaskWithRetry)
def finalize_document_storage(original_file: str, processed_file: str, metadata: dict):
"""Final storage step after embedding metadata."""
print(f"[INFO] Finalizing document storage for {processed_file}")
return {"status": "Completed", "file": processed_file}

Loading

0 comments on commit 1f2885a

Please sign in to comment.