Skip to content

Commit

Permalink
fixes to paperless #2
Browse files Browse the repository at this point in the history
  • Loading branch information
christianlouis committed Feb 12, 2025
1 parent 09013a4 commit e7efc38
Showing 1 changed file with 118 additions and 59 deletions.
177 changes: 118 additions & 59 deletions app/tasks/upload_to_paperless.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,49 @@
import os
import re
import json
import time
import requests
from typing import Optional, Dict, Any, List

from app.config import settings
from app.tasks.retry_config import BaseTaskWithRetry
from app.celery_app import celery

POLL_MAX_ATTEMPTS = 10
POLL_INTERVAL_SEC = 3

def _get_headers():
"""Returns the authorization header for Paperless-ngx."""
"""Returns HTTP headers for Paperless-ngx API calls."""
return {
"Authorization": f"Token {settings.paperless_ngx_api_token}"
}

def _paperless_api_url(path: str) -> str:
"""Constructs the full API URL based on the paperless_host."""
"""
Constructs a full Paperless-ngx API URL using `settings.paperless_host`.
Ensures the path is appended with a leading slash if missing.
"""
host = settings.paperless_host.rstrip("/")
if not path.startswith("/"):
path = f"/{path}"
path = "/" + path
return f"{host}{path}"

def get_or_create_correspondent(name: str) -> Optional[int]:
"""Look up or create a correspondent with the given name. Returns None if empty/Unknown."""
"""Look up or create a Paperless 'correspondent' by name. Return its ID or None if empty/unknown."""
if not name or name.lower() == "unknown":
return None

url = _paperless_api_url("/api/correspondents/")
# Try to find an existing one by name:
# Attempt to find existing by name
resp = requests.get(url, headers=_get_headers(), params={"name": name})
resp.raise_for_status()

data = resp.json()

existing = [c for c in data["results"] if c["name"] == name]
if existing:
return existing[0]["id"]

# Create new
# If none found, create
create_resp = requests.post(
url,
headers={**_get_headers(), "Content-Type": "application/json"},
Expand All @@ -48,7 +55,7 @@ def get_or_create_correspondent(name: str) -> Optional[int]:
return create_resp.json()["id"]

def get_or_create_document_type(name: str) -> Optional[int]:
"""Look up or create a document type by name."""
"""Look up or create a Paperless 'document_type' by name. Return its ID or None if empty/unknown."""
if not name or name.lower() == "unknown":
return None

Expand All @@ -70,7 +77,7 @@ def get_or_create_document_type(name: str) -> Optional[int]:
return create_resp.json()["id"]

def get_or_create_tag(tag_name: str) -> Optional[int]:
"""Look up or create a tag by name."""
"""Look up or create a Paperless 'tag' by name. Return its ID or None if empty/unknown."""
if not tag_name or tag_name.lower() == "unknown":
return None

Expand All @@ -92,7 +99,10 @@ def get_or_create_tag(tag_name: str) -> Optional[int]:
return create_resp.json()["id"]

def get_or_create_custom_field(field_name: str) -> int:
"""Look up or create a custom field by name."""
"""
Look up or create a Paperless 'custom_field' by name.
Returns its ID. Raises ValueError if field_name is empty.
"""
if not field_name:
raise ValueError("Field name must not be empty")

Expand All @@ -113,107 +123,156 @@ def get_or_create_custom_field(field_name: str) -> int:
create_resp.raise_for_status()
return create_resp.json()["id"]

def poll_task_for_document_id(task_id: str) -> int:
"""
Polls /api/tasks/?task_id=<uuid> until we get status=SUCCESS or FAILURE,
or until we run out of attempts.
On SUCCESS: returns the int document_id from 'related_document'.
On FAILURE: raises RuntimeError with the task's 'result' message.
If times out, raises TimeoutError.
"""
url = _paperless_api_url("/api/tasks/")
attempts = 0

while attempts < POLL_MAX_ATTEMPTS:
resp = requests.get(url, headers=_get_headers(), params={"task_id": task_id})
resp.raise_for_status()

# The response is typically a list of length 1, e.g.:
# [
# {
# "task_id": "uuid",
# "status": "SUCCESS",
# "related_document": "56712",
# "result": "Success. New document id 56712 created",
# ...
# }
# ]
tasks_data = resp.json()
if isinstance(tasks_data, dict) and "results" in tasks_data:
# Some versions wrap tasks in { "results": [ ... ] }
tasks_data = tasks_data["results"]

if tasks_data:
task_info = tasks_data[0]
status = task_info.get("status")
if status == "SUCCESS":
doc_str = task_info.get("related_document") # e.g. "56712"
if doc_str:
return int(doc_str)
# Fallback: parse from 'result' text
match = re.search(r"New document id (\d+)", task_info.get("result", ""))
if match:
return int(match.group(1))
raise RuntimeError(
f"Task {task_id} completed but no doc ID found. Task info: {task_info}"
)
elif status == "FAILURE":
raise RuntimeError(f"Task {task_id} failed: {task_info.get('result')}")

attempts += 1
time.sleep(POLL_INTERVAL_SEC)

raise TimeoutError(f"Task {task_id} didn't reach SUCCESS within {POLL_MAX_ATTEMPTS} attempts.")

def patch_document_custom_fields(document_id: int, field_values: Dict[int, str]) -> None:
"""
Sets custom field values on a document via PATCH /api/documents/<id>/.
field_values is { custom_field_id: value }.
Applies custom field values to an existing document:
PATCH /api/documents/<id>/
JSON body: { "custom_fields": [ { "field": <id>, "value": <val> }, ... ] }
"""
if not field_values:
return

url = _paperless_api_url(f"/api/documents/{document_id}/")
payload = {
"custom_fields": [
{"field": fid, "value": val} for fid, val in field_values.items()
{"field": cf_id, "value": cf_val}
for cf_id, cf_val in field_values.items()
]
}

resp = requests.patch(url, headers={**_get_headers(), "Content-Type": "application/json"}, json=payload)
resp.raise_for_status()

@celery.task(base=BaseTaskWithRetry)
def upload_to_paperless(file_path: str):
def upload_to_paperless(file_path: str) -> Dict[str, Any]:
"""
Upload a PDF to Paperless-ngx with the associated .json metadata.
1. Parse metadata
2. Create/update correspondents, doc types, tags, and custom fields
3. Upload PDF to /api/documents/post_document/
-> parse the plain-text response to get document id
4. PATCH custom fields
1. Reads JSON metadata from a matching .json file.
2. Creates/fetches correspondents, doc types, tags, custom fields as needed.
3. POSTs the PDF to Paperless => returns a quoted UUID string (task_id).
4. Polls /api/tasks/?task_id=<uuid> until SUCCESS or FAILURE => doc_id
5. PATCHes custom fields onto the doc if present.
"""

if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")

# Expect a matching .json file next to the PDF
base_name, ext = os.path.splitext(file_path)
json_path = f"{base_name}.json"
if not os.path.exists(json_path):
raise FileNotFoundError(f"Metadata JSON not found at {json_path}")
raise FileNotFoundError(f"Metadata JSON not found: {json_path}")

# Read JSON
with open(json_path, "r", encoding="utf-8") as f:
metadata = json.load(f)
# Read JSON metadata
with open(json_path, "r", encoding="utf-8") as jf:
metadata = json.load(jf)

# Prepare fields for built-in Paperless fields
# Built-in Paperless fields
title = metadata.get("title") or os.path.basename(base_name)

# Determine the final "correspondent" string (fallback to "absender" if "correspondent" is unknown/empty)
corr = metadata.get("correspondent", "") or metadata.get("absender", "")
if corr.lower() == "unknown":
corr = ""

correspondent_id = get_or_create_correspondent(corr)
corr_name = metadata.get("correspondent", "") or metadata.get("absender", "")
if corr_name.lower() == "unknown":
corr_name = ""
correspondent_id = get_or_create_correspondent(corr_name)

doc_type_str = metadata.get("document_type", "")
if doc_type_str.lower() == "unknown":
doc_type_str = ""
document_type_id = get_or_create_document_type(doc_type_str) if doc_type_str else None

# Tags
tag_ids: List[int] = []
tags_list = metadata.get("tags", [])
tag_ids = []
for tag_item in tags_list:
if tag_item and tag_item.lower() != "unknown":
tid = get_or_create_tag(tag_item)
if tid:
tag_ids.append(tid)

# Upload PDF (multipart form)
upload_url = _paperless_api_url("/api/documents/post_document/")
# 1) Upload PDF
post_url = _paperless_api_url("/api/documents/post_document/")
files = {
"document": (os.path.basename(file_path), open(file_path, "rb"), "application/pdf"),
}
data = {"title": title}
data = {
"title": title
}
if correspondent_id:
data["correspondent"] = correspondent_id
if document_type_id:
data["document_type"] = document_type_id

# Paperless might require repeated form fields for each tag id or a single JSON array.
# We'll do repeated form fields for safety:
# Usually Paperless expects repeated form fields for tags[] or a single tags array
# We'll do repeated form fields for each tag
for t_id in tag_ids:
data.setdefault("tags", [])
data["tags"].append(str(t_id))
data.setdefault("tags", []).append(str(t_id))

resp = requests.post(upload_url, headers=_get_headers(), files=files, data=data)
# close file handle
# Send the POST
resp = requests.post(post_url, headers=_get_headers(), files=files, data=data)
# Close file handle
files["document"][1].close()

resp.raise_for_status()

# Paperless returns plain text like "Success. New document id 56708 created"
response_text = resp.text.strip()
match = re.search(r"New document id (\d+) created", response_text)
if not match:
# If there's no match, we can't parse the ID
# Log the response and raise an error
raise RuntimeError(f"Could not parse 'New document id ### created' from response: {response_text}")
# The response is typically just: "some-uuid"
raw_task_id = resp.text.strip().strip('"').strip("'")
print(f"[INFO] Received Paperless task ID: {raw_task_id}")

document_id = int(match.group(1))
print(f"[INFO] Successfully uploaded doc. ID = {document_id}")
# 2) Poll tasks until success/fail => get doc_id
doc_id = poll_task_for_document_id(raw_task_id)
print(f"[INFO] Document created (or found duplicate) => ID={doc_id}")

# Create & map additional metadata to custom fields.
# Skip known built-in keys so we don't double-store them as CF.
# 3) Create custom fields for leftover JSON keys
# Skip these built-ins to avoid storing duplicates
built_in_keys = {"filename", "title", "tags", "document_type", "correspondent"}
field_values_map = {}
for key, val in metadata.items():
Expand All @@ -224,13 +283,13 @@ def upload_to_paperless(file_path: str):
cf_id = get_or_create_custom_field(key)
field_values_map[cf_id] = str(val)

# Patch custom fields
if field_values_map:
patch_document_custom_fields(document_id, field_values_map)
print(f"[INFO] Custom fields successfully patched for document {document_id}")
patch_document_custom_fields(doc_id, field_values_map)
print(f"[INFO] Patched custom fields for doc {doc_id}")

return {
"status": "Completed",
"paperless_document_id": document_id,
"paperless_task_id": raw_task_id,
"paperless_document_id": doc_id,
"file_path": file_path
}

0 comments on commit e7efc38

Please sign in to comment.