From fd97a38875cf3eaa2081972f2bbf4405bffc09af Mon Sep 17 00:00:00 2001 From: Nathan Griffith Date: Thu, 27 Jun 2024 12:35:43 -0400 Subject: [PATCH] Fix spin-loop/cleanup failure mode within run loop (#42) This ensures that exceptions raised in thread callback hooks are rescued and properly mark jobs as failed. This is also a good opportunity to change the `num` argument (of `work_off(num)`) to mean number of jobs (give or take a few due to `max_claims`), not number of iterations. Previously (before threading was introduced) I think it meant number of jobs (though jobs and iterations were 1:1). I would not have done this before the refactor, because there was no guarantee that one of `success` or `failure` would be incremented (the thread might crash for many reasons). Now, we only increment `success` and treat `total - success` as the "failure" number when we return from the method. Fixes #23 and #41 This is also a prereq for a resolution I'm cooking up for #36 --- .github/workflows/ci.yml | 2 +- Gemfile | 2 +- Gemfile.lock | 2 +- gemfiles/rails_5_2.gemfile | 2 +- gemfiles/rails_6_0.gemfile | 2 +- gemfiles/rails_6_1.gemfile | 2 +- gemfiles/rails_7_0.gemfile | 2 +- gemfiles/rails_7_1.gemfile | 2 +- gemfiles/rails_main.gemfile | 2 +- lib/delayed/worker.rb | 50 ++++++++++++++++++------------------- 10 files changed, 33 insertions(+), 35 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 46e75337..85056040 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,7 @@ jobs: strategy: fail-fast: false matrix: - ruby: ['2.6', '2.7', '3.0', '3.1', '3.2'] + ruby: ['2.7', '3.0', '3.1', '3.2'] gemfile: - gemfiles/rails_5_2.gemfile - gemfiles/rails_6_0.gemfile diff --git a/Gemfile b/Gemfile index 509ad67b..2848b1e9 100644 --- a/Gemfile +++ b/Gemfile @@ -11,6 +11,6 @@ gem 'mysql2' gem 'pg' gem 'rake' gem 'rspec' -gem 'sqlite3' +gem 'sqlite3', '~> 1.7.3' gem 'timecop' gem 'zeitwerk' diff --git a/Gemfile.lock b/Gemfile.lock index 3d2aa2ed..f5e0eb1d 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -216,7 +216,7 @@ DEPENDENCIES pg rake rspec - sqlite3 + sqlite3 (~> 1.7.3) timecop zeitwerk diff --git a/gemfiles/rails_5_2.gemfile b/gemfiles/rails_5_2.gemfile index da3da842..55b092b9 100644 --- a/gemfiles/rails_5_2.gemfile +++ b/gemfiles/rails_5_2.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" diff --git a/gemfiles/rails_6_0.gemfile b/gemfiles/rails_6_0.gemfile index bff971b3..d6bea622 100644 --- a/gemfiles/rails_6_0.gemfile +++ b/gemfiles/rails_6_0.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" diff --git a/gemfiles/rails_6_1.gemfile b/gemfiles/rails_6_1.gemfile index b6a59735..6d6f5077 100644 --- a/gemfiles/rails_6_1.gemfile +++ b/gemfiles/rails_6_1.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" diff --git a/gemfiles/rails_7_0.gemfile b/gemfiles/rails_7_0.gemfile index 30055212..5c98e898 100644 --- a/gemfiles/rails_7_0.gemfile +++ b/gemfiles/rails_7_0.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" diff --git a/gemfiles/rails_7_1.gemfile b/gemfiles/rails_7_1.gemfile index b1431c05..f15f0eb7 100644 --- a/gemfiles/rails_7_1.gemfile +++ b/gemfiles/rails_7_1.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" diff --git a/gemfiles/rails_main.gemfile b/gemfiles/rails_main.gemfile index 4fcdd256..7742de24 100644 --- a/gemfiles/rails_main.gemfile +++ b/gemfiles/rails_main.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" gem "actionview", github: "rails/rails", glob: "actionview/*.gemspec" diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index e96bc945..f9751742 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -89,22 +89,17 @@ def on_exit! # Exit early if interrupted. def work_off(num = 100) success = Concurrent::AtomicFixnum.new(0) - failure = Concurrent::AtomicFixnum.new(0) + total = 0 - num.times do + while total < num jobs = reserve_jobs break if jobs.empty? + total += jobs.length pool = Concurrent::FixedThreadPool.new(jobs.length) jobs.each do |job| pool.post do - run_thread_callbacks(job) do - if run_job(job) - success.increment - else - failure.increment - end - end + success.increment if run_job(job) end end @@ -114,7 +109,7 @@ def work_off(num = 100) break if stop? # leave if we're exiting end - [success, failure].map(&:value) + [success.value, total - success.value] end def run_thread_callbacks(job, &block) @@ -122,30 +117,33 @@ def run_thread_callbacks(job, &block) end def run(job) - metadata = { - status: 'RUNNING', - name: job.name, - run_at: job.run_at, - created_at: job.created_at, - priority: job.priority, - queue: job.queue, - attempts: job.attempts, - enqueued_for: (Time.current - job.created_at).round, - } - job_say job, metadata.to_json - run_time = Benchmark.realtime do - Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do - job.invoke_job + run_thread_callbacks(job) do + metadata = { + status: 'RUNNING', + name: job.name, + run_at: job.run_at, + created_at: job.created_at, + priority: job.priority, + queue: job.queue, + attempts: job.attempts, + enqueued_for: (Time.current - job.created_at).round, + } + job_say job, metadata.to_json + run_time = Benchmark.realtime do + Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do + job.invoke_job + end + job.destroy end - job.destroy + job_say job, format('COMPLETED after %.4f seconds', run_time) end - job_say job, format('COMPLETED after %.4f seconds', run_time) true # did work rescue DeserializationError => e job_say job, "FAILED permanently with #{e.class.name}: #{e.message}", 'error' job.error = e failed(job) + false # work failed rescue Exception => e # rubocop:disable Lint/RescueException self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, e) } false # work failed