Skip to content

Commit

Permalink
Fix spin-loop/cleanup failure mode within run loop
Browse files Browse the repository at this point in the history
This ensures that exceptions raised in thread callback hooks are rescued
and properly mark jobs as failed.

Fixes #23 and #41
  • Loading branch information
smudge committed Jun 26, 2024
1 parent 24b75f5 commit 23c05f9
Showing 1 changed file with 23 additions and 23 deletions.
46 changes: 23 additions & 23 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,10 @@ def work_off(num = 100)
pool = Concurrent::FixedThreadPool.new(jobs.length)
jobs.each do |job|
pool.post do
run_thread_callbacks(job) do
if run_job(job)
success.increment
else
failure.increment
end
if run_job(job)
success.increment
else
failure.increment
end
end
end
Expand All @@ -122,25 +120,27 @@ def run_thread_callbacks(job, &block)
end

def run(job)
metadata = {
status: 'RUNNING',
name: job.name,
run_at: job.run_at,
created_at: job.created_at,
priority: job.priority,
queue: job.queue,
attempts: job.attempts,
enqueued_for: (Time.current - job.created_at).round,
}
job_say job, metadata.to_json
run_time = Benchmark.realtime do
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do
job.invoke_job
run_thread_callbacks(job) do
metadata = {
status: 'RUNNING',
name: job.name,
run_at: job.run_at,
created_at: job.created_at,
priority: job.priority,
queue: job.queue,
attempts: job.attempts,
enqueued_for: (Time.current - job.created_at).round,
}
job_say job, metadata.to_json
run_time = Benchmark.realtime do
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do
job.invoke_job
end
job.destroy
end
job.destroy
job_say job, format('COMPLETED after %.4f seconds', run_time)
true # did work
end
job_say job, format('COMPLETED after %.4f seconds', run_time)
true # did work
rescue DeserializationError => e
job_say job, "FAILED permanently with #{e.class.name}: #{e.message}", 'error'

Expand Down

0 comments on commit 23c05f9

Please sign in to comment.