Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native support for Pydantic serialization #497

Open
alexander-zuev opened this issue Feb 8, 2025 · 0 comments
Open

Native support for Pydantic serialization #497

alexander-zuev opened this issue Feb 8, 2025 · 0 comments

Comments

@alexander-zuev
Copy link

alexander-zuev commented Feb 8, 2025

First of all, thanks for the superb library @samuelcolvin & team! After trying (unsuccessfully) to setup async task processing celery I finally decided to switch to Arq, especially given the team behind.

While Arq handles async workloads natively, it still doesn't natively support Pydantic serialization of task inputs / outputs. Given the ubiquity of Pydantic in modern Python applications, I wanted to explore whether there’s interest in introducing native support for this.

Problem

Currently, passing Pydantic models to Arq tasks requires handling serialization/deserialization manually:

# Define model
class Document(BaseModel):
    document_id: UUID
    content: str

# Task with manual serialization
async def process_document_task(ctx: WorkerContext, document_dict: dict) -> dict:
    document = Document.model_validate(document_dict)
    result = await process_document(document)
    return result.model_dump()

# Enqueue task (manual conversion required)
document = Document(document_id=uuid4(), content="Hello")
await redis.enqueue_job('process_document_task', document_dict=document.model_dump(mode="json"))

This approach has several downsides:

  • Requires explicit serialization/deserialization everywhere
  • Leads to loss of type hints and auto-completion benefits
  • Can be error-prone if conversions are forgotten
  • JSON serialization does not support UUIDs natively, requiring additional conversion
  • Pickle, while supporting more Python types, is insecure when deserializing untrusted data and is not interoperable with non-Python consumers

Proposed Solution

To improve this, I implemented a Msgpack-based serializer that:

  • Supports Pydantic models, lists/dicts of models, and UUIDs
  • Automatically converts models during task enqueueing and execution
  • Uses dynamic class loading to reconstruct Pydantic models at runtime

Why Msgpack?

  • Msgpack is compact and faster than JSON when dealing with structured data.
  • Unlike Pickle, Msgpack does not execute arbitrary code during deserialization, making it safer.
  • Supports dicts, lists, and other common Python structures without needing extensive manual handling.

The core of the solution involves:

  1. Adding normalization before serialization
  • supports Pydantic's BaseModel, UUIDs, lists, tuples, dicts as the most common objects requiring additional handling with msgpack
  1. Adding denormalization after deserialization
  • Pydantic models are reconstructed dynamically from their fully qualified names and UUIDs are converted back to their native type.

Example usage with this approach:

async def process_document_task(ctx: WorkerContext, document: Document) -> Document:
    result = await process_document(document)
    return result

# No need for manual conversion
document = Document(document_id=uuid4(), content="Hello")
await redis.enqueue_job('process_document_task', document=document)

Implementation Summary

I implemented a MsgpackSerializer class that seamlessly integrates into Arq’s WorkerSettings as job_serializer and job_deserializer.

Here’s a simplified version:

class MsgpackSerializer:
    def _normalize(self, obj: Any) -> Any:
        if isinstance(obj, BaseModel):
            return {"__pydantic__": f"{obj.__class__.__module__}.{obj.__class__.__qualname__}", "data": obj.model_dump(mode="json")}
        elif isinstance(obj, UUID):
            return {"__uuid__": str(obj)}
        elif isinstance(obj, (list, tuple)):
            return [self._normalize(item) for item in obj]
        elif isinstance(obj, dict):
            return {str(key): self._normalize(value) for key, value in obj.items()}
        return obj

    def _denormalize(self, obj: Any) -> Any:
        if isinstance(obj, dict):
            if "__pydantic__" in obj:
                model_cls = get_model_class(obj["__pydantic__"])
                return model_cls.model_validate(obj["data"])
            elif "__uuid__" in obj:
                return UUID(obj["__uuid__"])
            return {key: self._denormalize(value) for key, value in obj.items()}
        elif isinstance(obj, list):
            return [self._denormalize(item) for item in obj]
        return obj

# This is then integrated into Arq’s settings as follows:
class WorkerSettings:
    job_serializer = serialize
    job_deserializer = deserialize

Questions for the Arq Team

  1. Would you be open to adding native support for Pydantic serialization in Arq?
  2. If so, what would be the best approach to contribute this? Should it be a built-in feature, or is a plugin-based system preferred?
  3. Do you have any concerns regarding this approach? For example, performance trade-offs due to dynamic model loading or security considerations when deserializing user-controlled inputs?

I’d love to get your thoughts on whether this aligns with Arq’s roadmap and how best to adapt my implementation to fit natively.

Thanks again for the great work on Arq!

P.S. I wrote an article provide a bit more detailed overview of the solution + it's limitations here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant