-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added first working version of the code. Processes
PDF files, no upload yet.
- Loading branch information
1 parent
22f8f60
commit 1ad9425
Showing
17 changed files
with
559 additions
and
62 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,29 +1,29 @@ | ||
# Stage 1: Build | ||
|
||
# Stage 1: Build dependencies | ||
FROM python:3.11 AS builder | ||
|
||
WORKDIR /app | ||
|
||
# Copy only dependency files first for better caching | ||
COPY requirements.txt /app/ | ||
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt | ||
RUN pip install --no-cache-dir -r requirements.txt | ||
|
||
# Stage 2: Final image | ||
FROM python:3.11-slim | ||
|
||
WORKDIR /app | ||
|
||
# Copy installed dependencies from builder stage | ||
# Copy installed dependencies | ||
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages | ||
COPY --from=builder /usr/local/bin /usr/local/bin | ||
|
||
# Copy application files | ||
COPY ./app /app | ||
# Copy application files correctly | ||
COPY ./app /app/app | ||
|
||
# Set environment variables | ||
ENV PYTHONUNBUFFERED=1 | ||
# Set Python path explicitly | ||
ENV PYTHONPATH=/app | ||
|
||
# Expose API port | ||
EXPOSE 8000 | ||
WORKDIR /app | ||
|
||
# Default command | ||
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# app/celery_app.py | ||
|
||
from celery import Celery | ||
from app.config import settings | ||
|
||
celery = Celery( | ||
"document_processor", | ||
broker=settings.redis_url, | ||
backend=settings.redis_url, | ||
) | ||
|
||
|
||
# Optionally add this line to retain connection retry behavior at startup: | ||
celery.conf.broker_connection_retry_on_startup = True | ||
|
||
# Set the default queue and routing so that tasks are enqueued on "document_processor" | ||
celery.conf.task_default_queue = 'document_processor' | ||
celery.conf.task_routes = { | ||
"app.tasks.*": {"queue": "document_processor"}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,25 @@ | ||
#!/usr/bin/env python3 | ||
|
||
from fastapi import FastAPI, Depends | ||
from .config import settings | ||
from .database import engine, SessionLocal | ||
from .models import Base | ||
from .tasks import process_document | ||
from fastapi import FastAPI, HTTPException | ||
from .tasks.upload_to_s3 import upload_to_s3 | ||
import os | ||
|
||
app = FastAPI(title="Document Processor") | ||
|
||
# Initialize database | ||
Base.metadata.create_all(bind=engine) | ||
app = FastAPI(title="Document Processing API") | ||
|
||
@app.get("/") | ||
def root(): | ||
return {"message": "Document Processing API"} | ||
|
||
@app.post("/process/") | ||
def process(file_path: str): | ||
"""Trigger document processing""" | ||
task = process_document.delay(file_path) | ||
""" | ||
API Endpoint to start document processing. | ||
This enqueues the first task (upload_to_s3), which handles the full pipeline. | ||
""" | ||
|
||
if not os.path.exists(file_path): | ||
raise HTTPException(status_code=400, detail=f"File {file_path} not found.") | ||
|
||
task = upload_to_s3.delay(file_path) | ||
return {"task_id": task.id, "status": "queued"} | ||
|
This file was deleted.
Oops, something went wrong.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import os | ||
import shutil | ||
import fitz # PyMuPDF for PDF metadata editing | ||
import json | ||
from app.config import settings | ||
from app.tasks.retry_config import BaseTaskWithRetry | ||
from app.tasks.finalize_document_storage import finalize_document_storage | ||
|
||
# Import the shared Celery instance | ||
from app.celery_app import celery | ||
|
||
def unique_filepath(directory, base_filename, extension=".pdf"): | ||
""" | ||
Returns a unique filepath in the specified directory. | ||
If 'base_filename.pdf' exists, it will append an underscore and counter. | ||
""" | ||
candidate = os.path.join(directory, base_filename + extension) | ||
if not os.path.exists(candidate): | ||
return candidate | ||
counter = 1 | ||
while True: | ||
candidate = os.path.join(directory, f"{base_filename}_{counter}{extension}") | ||
if not os.path.exists(candidate): | ||
return candidate | ||
counter += 1 | ||
|
||
def persist_metadata(metadata, final_pdf_path): | ||
""" | ||
Saves the metadata dictionary to a JSON file with the same base name as the final PDF. | ||
For example, if final_pdf_path is "/var/docparse/working/processed/MyFile.pdf", | ||
the metadata will be saved as "/var/docparse/working/processed/MyFile.json". | ||
""" | ||
base, _ = os.path.splitext(final_pdf_path) | ||
json_path = base + ".json" | ||
with open(json_path, "w", encoding="utf-8") as f: | ||
json.dump(metadata, f, ensure_ascii=False, indent=2) | ||
return json_path | ||
|
||
@celery.task(base=BaseTaskWithRetry) | ||
def embed_metadata_into_pdf(local_file_path: str, extracted_text: str, metadata: dict): | ||
""" | ||
Embeds extracted metadata into the PDF's standard metadata fields. | ||
The mapping is as follows: | ||
- title: uses the extracted metadata "filename" | ||
- author: uses "absender" (or "Unknown" if missing) | ||
- subject: uses "document_type" (or "Unknown") | ||
- keywords: a comma‐separated list from the "tags" field | ||
After processing, the file is moved to | ||
/var/docparse/working/processed/<suggested_filename.pdf> | ||
where <suggested_filename.pdf> is derived from metadata["filename"]. | ||
The output PDF is saved incrementally while preserving its original encryption. | ||
Additionally, the metadata is persisted to a JSON file with the same base name. | ||
""" | ||
# Check for file existence; if not found, try the known shared directory. | ||
if not os.path.exists(local_file_path): | ||
alt_path = os.path.join("/var/docparse/working/tmp", os.path.basename(local_file_path)) | ||
if os.path.exists(alt_path): | ||
local_file_path = alt_path | ||
else: | ||
print(f"[ERROR] Local file {local_file_path} not found, cannot embed metadata.") | ||
return {"error": "File not found"} | ||
|
||
# Work on a safe copy in /tmp | ||
tmp_dir = "/tmp" | ||
original_file = local_file_path | ||
processed_file = os.path.join(tmp_dir, f"processed_{os.path.basename(local_file_path)}") | ||
|
||
# Create a safe copy to work on | ||
shutil.copy(original_file, processed_file) | ||
|
||
try: | ||
print(f"[DEBUG] Embedding metadata into {processed_file}...") | ||
|
||
# Open the PDF | ||
doc = fitz.open(processed_file) | ||
# Set PDF metadata using only the standard keys. | ||
doc.set_metadata({ | ||
"title": metadata.get("filename", "Unknown Document"), | ||
"author": metadata.get("absender", "Unknown"), | ||
"subject": metadata.get("document_type", "Unknown"), | ||
"keywords": ", ".join(metadata.get("tags", [])) | ||
}) | ||
# Save incrementally and preserve encryption | ||
doc.save(processed_file, incremental=True, encryption=fitz.PDF_ENCRYPT_KEEP) | ||
doc.close() | ||
|
||
print(f"[INFO] Metadata embedded successfully in {processed_file}") | ||
|
||
# Use the suggested filename from metadata; if not provided, use the original basename. | ||
suggested_filename = metadata.get("filename", os.path.splitext(os.path.basename(local_file_path))[0]) | ||
# Remove any extension and then add .pdf | ||
suggested_filename = os.path.splitext(suggested_filename)[0] | ||
# Define the final directory and ensure it exists. | ||
final_dir = "/var/docparse/working/processed" | ||
os.makedirs(final_dir, exist_ok=True) | ||
# Get a unique filepath in case of collisions. | ||
final_file_path = unique_filepath(final_dir, suggested_filename, extension=".pdf") | ||
|
||
# Move the processed file using shutil.move to handle cross-device moves. | ||
shutil.move(processed_file, final_file_path) | ||
|
||
# Persist the metadata into a JSON file with the same base name. | ||
json_path = persist_metadata(metadata, final_file_path) | ||
print(f"[INFO] Metadata persisted to {json_path}") | ||
|
||
# Trigger the next step: final storage. | ||
finalize_document_storage.delay(original_file, final_file_path, metadata) | ||
|
||
return {"file": final_file_path, "metadata_file": json_path, "status": "Metadata embedded"} | ||
|
||
except Exception as e: | ||
print(f"[ERROR] Failed to embed metadata into {processed_file}: {e}") | ||
return {"error": str(e)} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import json | ||
import re | ||
from openai import OpenAI | ||
from app.config import settings | ||
from app.tasks.retry_config import BaseTaskWithRetry | ||
from app.tasks.embed_metadata_into_pdf import embed_metadata_into_pdf | ||
|
||
# Import the shared Celery instance | ||
from app.celery_app import celery | ||
|
||
client = OpenAI(api_key=settings.openai_api_key) | ||
|
||
def extract_json_from_text(text): | ||
""" | ||
Try to extract a JSON object from the text. | ||
- First, check for a JSON block inside triple backticks. | ||
- If not found, try to extract text from the first '{' to the last '}'. | ||
""" | ||
pattern = r"```(?:json)?\s*(\{.*?\})\s*```" | ||
match = re.search(pattern, text, re.DOTALL) | ||
if match: | ||
return match.group(1) | ||
else: | ||
start = text.find("{") | ||
end = text.rfind("}") | ||
if start != -1 and end != -1 and end > start: | ||
return text[start:end+1] | ||
return None | ||
|
||
@celery.task(base=BaseTaskWithRetry) | ||
def extract_metadata_with_gpt(s3_filename: str, cleaned_text: str): | ||
"""Uses OpenAI GPT-4o to classify document metadata.""" | ||
|
||
prompt = f""" | ||
You are an intelligent document classifier. | ||
Given the following extracted text from a document, analyze it and return a JSON object with the following fields: | ||
1. "filename": A machine-readable filename in the format YYYY-MM-DD_DescriptiveTitle (use only letters, numbers, periods, and underscores). | ||
2. "empfaenger": The recipient, or "Unknown" if not found. | ||
3. "absender": The sender, or "Unknown" if not found. | ||
4. "correspondent": A correspondent extracted from the document, or "Unknown". | ||
5. "kommunikationsart": One of [Behoerdlicher_Brief, Rechnung, Kontoauszug, Vertrag, Quittung, Privater_Brief, Einladung, Gewerbliche_Korrespondenz, Newsletter, Werbung, Sonstiges]. | ||
6. "kommunikationskategorie": One of [Amtliche_Postbehoerdliche_Dokumente, Finanz_und_Vertragsdokumente, Geschaeftliche_Kommunikation, Private_Korrespondenz, Sonstige_Informationen]. | ||
7. "document_type": The document type, or "Unknown". | ||
8. "tags": A list of additional keywords extracted from the document. | ||
9. "language": The detected language code (e.g., "DE"). | ||
10. "title": A human-friendly title for the document. | ||
Extracted text: | ||
{cleaned_text} | ||
Return only valid JSON with no additional commentary. | ||
""" | ||
|
||
try: | ||
print(f"[DEBUG] Sending classification request for {s3_filename}...") | ||
completion = client.chat.completions.create( | ||
model="gpt-4o", | ||
messages=[ | ||
{"role": "system", "content": "You are an intelligent document classifier."}, | ||
{"role": "user", "content": prompt} | ||
], | ||
temperature=0 | ||
) | ||
|
||
content = completion.choices[0].message.content | ||
print(f"[DEBUG] Raw classification response for {s3_filename}: {content}") | ||
|
||
json_text = extract_json_from_text(content) | ||
if not json_text: | ||
print(f"[ERROR] Could not find valid JSON in GPT response for {s3_filename}.") | ||
return {} | ||
|
||
metadata = json.loads(json_text) | ||
print(f"[DEBUG] Extracted metadata: {metadata}") | ||
|
||
# Trigger the next step: embedding metadata into the PDF | ||
embed_metadata_into_pdf.delay(s3_filename, cleaned_text, metadata) | ||
|
||
return {"s3_file": s3_filename, "metadata": metadata} | ||
|
||
except Exception as e: | ||
print(f"[ERROR] OpenAI classification failed for {s3_filename}: {e}") | ||
return {} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
#!/usr/bin/env python3 | ||
|
||
from app.config import settings | ||
from app.tasks.retry_config import BaseTaskWithRetry | ||
|
||
# Import the shared Celery instance | ||
from app.celery_app import celery | ||
|
||
@celery.task(base=BaseTaskWithRetry) | ||
def finalize_document_storage(original_file: str, processed_file: str, metadata: dict): | ||
"""Final storage step after embedding metadata.""" | ||
print(f"[INFO] Finalizing document storage for {processed_file}") | ||
return {"status": "Completed", "file": processed_file} | ||
|
Oops, something went wrong.