Skip to content

Commit

Permalink
putting it all together
Browse files Browse the repository at this point in the history
  • Loading branch information
christianlouis committed Feb 12, 2025
1 parent 93c54f4 commit 80d07eb
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 3 deletions.
1 change: 1 addition & 0 deletions app/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from app.tasks.upload_to_paperless import upload_to_paperless
from app.tasks.upload_to_nextcloud import upload_to_nextcloud
from app.tasks.imap_tasks import pull_all_inboxes
from app.tasks.send_to_all import send_to_all_destinations

celery.conf.task_routes = {
"app.tasks.*": {"queue": "default"},
Expand Down
54 changes: 54 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from app.tasks.upload_to_dropbox import upload_to_dropbox
from app.tasks.upload_to_paperless import upload_to_paperless
from app.tasks.upload_to_nextcloud import upload_to_nextcloud
from app.tasks.send_to_all import send_to_all_destinations

app = FastAPI(title="Document Processing API")

Expand Down Expand Up @@ -57,3 +58,56 @@ def send_to_nextcloud(file_path: str):
raise HTTPException(status_code=400, detail=f"File {file_path} not found.")
task = upload_to_nextcloud.delay(file_path)
return {"task_id": task.id, "status": "queued"}


@app.post("/send_to_all_destinations/")
def send_to_all_destinations_endpoint(file_path: str):
"""
Call the aggregator task that sends this file to dropbox, nextcloud, and paperless.
"""
if not os.path.isabs(file_path):
# If not absolute, assume it's in processed subdir
file_path = os.path.join(settings.workdir, 'processed', file_path)

if not os.path.exists(file_path):
raise HTTPException(
status_code=400,
detail=f"File {file_path} not found."
)

task = send_to_all_destinations.delay(file_path)
return {"task_id": task.id, "status": "queued", "file_path": file_path}



@app.post("/processall")
def process_all_pdfs_in_workdir():
"""
Finds all .pdf files in <workdir>/processed
and enqueues them for upload_to_s3.
"""
target_dir = settings.workdir
if not os.path.exists(target_dir):
raise HTTPException(status_code=400, detail=f"Directory {target_dir} does not exist.")

pdf_files = []
for filename in os.listdir(target_dir):
if filename.lower().endswith(".pdf"):
pdf_files.append(filename)

if not pdf_files:
return {"message": "No PDF files found in processed directory."}

task_ids = []
for pdf in pdf_files:
file_path = os.path.join(target_dir, pdf)
# Enqueue upload_to_s3
from app.tasks.upload_to_s3 import upload_to_s3
task = upload_to_s3.delay(file_path)
task_ids.append(task.id)

return {
"message": f"Enqueued {len(pdf_files)} PDFs to upload_to_s3",
"pdf_files": pdf_files,
"task_ids": task_ids
}
18 changes: 15 additions & 3 deletions app/tasks/finalize_document_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@

from app.config import settings
from app.tasks.retry_config import BaseTaskWithRetry

# Import the shared Celery instance
from app.celery_app import celery

# 1) Import the aggregator task
from app.tasks.send_to_all import send_to_all_destinations


@celery.task(base=BaseTaskWithRetry)
def finalize_document_storage(original_file: str, processed_file: str, metadata: dict):
"""Final storage step after embedding metadata."""
"""
Final storage step after embedding metadata.
We will now call 'send_to_all_destinations' to push the final PDF to Dropbox/Nextcloud/Paperless.
"""
print(f"[INFO] Finalizing document storage for {processed_file}")
return {"status": "Completed", "file": processed_file}

# 2) Enqueue uploads to all destinations (Dropbox, Nextcloud, Paperless)
send_to_all_destinations.delay(processed_file)

return {
"status": "Completed",
"file": processed_file
}
21 changes: 21 additions & 0 deletions app/tasks/send_to_all.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# app/tasks/send_to_all.py

from app.celery_app import celery
from app.tasks.upload_to_dropbox import upload_to_dropbox
from app.tasks.upload_to_nextcloud import upload_to_nextcloud
from app.tasks.upload_to_paperless import upload_to_paperless

@celery.task
def send_to_all_destinations(file_path: str):
"""
Fires off tasks to upload a single file to Dropbox, Nextcloud, and Paperless.
These tasks run in parallel (Celery returns immediately from each .delay()).
"""
upload_to_dropbox.delay(file_path)
upload_to_nextcloud.delay(file_path)
upload_to_paperless.delay(file_path)

return {
"status": "All upload tasks enqueued",
"file_path": file_path
}

0 comments on commit 80d07eb

Please sign in to comment.