Skip to content

Commit

Permalink
Optimize JobWrapper queueing ActiveJobs (Betterment#35)
Browse files Browse the repository at this point in the history
This PR introduces a change to avoid calling `ActiveJob::Base#deserialize` during queueing the job whilst maintaining existing functionality.

The change is in `JobWrapper`, which previously only took in a hash of `job_data` and knew how to deserialize it into a job object so we can interrogate it for `queue_name` or `max_attempts`, etc. To avoid the roundtrip through serialization, we change `JobWrapper#initialize` to take in either the job object directly, or a hash as it did previously. When it's a hash (needed when running the job in the worker), it still deserializes it into a job object. When it's a job object just assign it to `@job` directly and continue.

We discovered this through having a second-order effect in `#deserialize` using it to trigger some logic as the job comes off the queue, but we were then seeing the logic run multiple times for one job execution. Tracked it back to the `JobClass.perform_later` calling `#deserialize` and being majorly confused why it was deserialising in the web process.

This will make enqueueing jobs faster too, because we don't need a roundtrip through `#serialize`, `#deserialize` as well as allocating less objects. In practice this is likely a small effect, but we get it for free avoiding the behaviour above.

Looking at all the built-in adapters in ActiveJob, as well as good_job and solid_queue, they all avoid calling `job.serialize` during the enqueuing, even those adapters that support invoking methods on the Job class instance like delayed does.

From one angle it's an optimization, from another angle it's a bugfix (:
  • Loading branch information
caius authored Jan 31, 2024
1 parent 869c08a commit f2e5edc
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 4 deletions.
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,27 @@ and this project aims to adhere to [Semantic Versioning](http://semver.org/spec/
### Removed <!-- for now removed features. -->
### Fixed <!-- for any bug fixes. -->

## [0.5.3] - 2024-01-12
### Changed

- Bring ActiveJob queue adapter's enqueuing behaviour inline with delayed_job &
upstream adapters, by stopping it serializing & deserializing the job instance
during the enqueue process. Optimises the enqueueing of jobs slightly too.

## [0.5.2] - 2023-10-19
### Fixed

- Fixed regression for Rails 7.1.1 not executing jobs, due to lazy loading change in Rails.
The `JobWrapper` needs to be loaded before `ActiveJob::Base` is loaded, but this wasn't
happening in the worker processes. Fix by extracting `JobWrapper` into it's own file and
loading when `Delayed` is loaded earlier in the process.

## [0.5.1] - 2023-10-11
### Changed

- Explicitly test Rails 7.1 and Ruby 3.1, Ruby 3.2 to verify compatability. Previous
gems were compatible, but this release is now verified to be compatible.

## [0.5.0] - 2023-01-20
### Changed
- Reduced handler size by excluding redundant 'job:' key (only 'job_data:' is
Expand Down
2 changes: 1 addition & 1 deletion delayed.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ['lib']
spec.summary = 'a multi-threaded, SQL-driven ActiveJob backend used at Betterment to process millions of background jobs per day'

spec.version = '0.5.2'
spec.version = '0.5.3'
spec.metadata = {
'changelog_uri' => 'https://github.com/betterment/delayed/blob/main/CHANGELOG.md',
'bug_tracker_uri' => 'https://github.com/betterment/delayed/issues',
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/active_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def _enqueue(job, opts = {})
opts.merge!({ queue: job.queue_name, priority: job.priority }.compact)
.merge!(job.provider_attributes || {})

Delayed::Job.enqueue(JobWrapper.new(job.serialize), opts).tap do |dj|
Delayed::Job.enqueue(JobWrapper.new(job), opts).tap do |dj|
job.provider_job_id = dj.id
end
end
Expand Down
12 changes: 10 additions & 2 deletions lib/delayed/job_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@ class JobWrapper # rubocop:disable Betterment/ActiveJobPerformable

delegate_missing_to :job

def initialize(job_data)
@job_data = job_data
def initialize(job_or_data)
# During enqueue the job instance is passed in directly, saves us deserializing
# it to find out how to queue the job.
# During load from the db, we get a data hash passed in so deserialize lazily.
if job_or_data.is_a?(ActiveJob::Base)
@job = job_or_data
@job_data = job_or_data.serialize
else
@job_data = job_or_data
end
end

def display_name
Expand Down
10 changes: 10 additions & 0 deletions spec/delayed/active_job_adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ def perform; end
ActiveJob::Base.queue_adapter = adapter_was
end

it "does not invoke #deserialize during enqueue" do # rubocop:disable RSpec/NoExpectationExample
JobClass.include(Module.new do
def deserialize(*)
raise "uh oh, deserialize called during enqueue!"
end
end)

JobClass.perform_later
end

it 'serializes a JobWrapper in the handler with expected fields' do
Timecop.freeze('2023-01-20T18:52:29Z') do
JobClass.perform_later
Expand Down

0 comments on commit f2e5edc

Please sign in to comment.