You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
First of all, thanks for the superb library @samuelcolvin & team! After trying (unsuccessfully) to setup async task processing celery I finally decided to switch to Arq, especially given the team behind.
While Arq handles async workloads natively, it still doesn't natively support Pydantic serialization of task inputs / outputs. Given the ubiquity of Pydantic in modern Python applications, I wanted to explore whether there’s interest in introducing native support for this.
Leads to loss of type hints and auto-completion benefits
Can be error-prone if conversions are forgotten
JSON serialization does not support UUIDs natively, requiring additional conversion
Pickle, while supporting more Python types, is insecure when deserializing untrusted data and is not interoperable with non-Python consumers
Proposed Solution
To improve this, I implemented a Msgpack-based serializer that:
Supports Pydantic models, lists/dicts of models, and UUIDs
Automatically converts models during task enqueueing and execution
Uses dynamic class loading to reconstruct Pydantic models at runtime
Why Msgpack?
Msgpack is compact and faster than JSON when dealing with structured data.
Unlike Pickle, Msgpack does not execute arbitrary code during deserialization, making it safer.
Supports dicts, lists, and other common Python structures without needing extensive manual handling.
The core of the solution involves:
Adding normalization before serialization
supports Pydantic's BaseModel, UUIDs, lists, tuples, dicts as the most common objects requiring additional handling with msgpack
Adding denormalization after deserialization
Pydantic models are reconstructed dynamically from their fully qualified names and UUIDs are converted back to their native type.
Example usage with this approach:
asyncdefprocess_document_task(ctx: WorkerContext, document: Document) ->Document:
result=awaitprocess_document(document)
returnresult# No need for manual conversiondocument=Document(document_id=uuid4(), content="Hello")
awaitredis.enqueue_job('process_document_task', document=document)
Implementation Summary
I implemented a MsgpackSerializer class that seamlessly integrates into Arq’s WorkerSettings as job_serializer and job_deserializer.
Would you be open to adding native support for Pydantic serialization in Arq?
If so, what would be the best approach to contribute this? Should it be a built-in feature, or is a plugin-based system preferred?
Do you have any concerns regarding this approach? For example, performance trade-offs due to dynamic model loading or security considerations when deserializing user-controlled inputs?
I’d love to get your thoughts on whether this aligns with Arq’s roadmap and how best to adapt my implementation to fit natively.
Thanks again for the great work on Arq!
P.S. I wrote an article provide a bit more detailed overview of the solution + it's limitations here
The text was updated successfully, but these errors were encountered:
First of all, thanks for the superb library @samuelcolvin & team! After trying (unsuccessfully) to setup async task processing celery I finally decided to switch to Arq, especially given the team behind.
While Arq handles async workloads natively, it still doesn't natively support Pydantic serialization of task inputs / outputs. Given the ubiquity of Pydantic in modern Python applications, I wanted to explore whether there’s interest in introducing native support for this.
Problem
Currently, passing Pydantic models to Arq tasks requires handling serialization/deserialization manually:
This approach has several downsides:
Proposed Solution
To improve this, I implemented a Msgpack-based serializer that:
Why Msgpack?
The core of the solution involves:
Example usage with this approach:
Implementation Summary
I implemented a MsgpackSerializer class that seamlessly integrates into Arq’s WorkerSettings as job_serializer and job_deserializer.
Here’s a simplified version:
Questions for the Arq Team
I’d love to get your thoughts on whether this aligns with Arq’s roadmap and how best to adapt my implementation to fit natively.
Thanks again for the great work on Arq!
P.S. I wrote an article provide a bit more detailed overview of the solution + it's limitations here
The text was updated successfully, but these errors were encountered: