diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index e96bc94..68e5ea1 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -98,12 +98,10 @@ def work_off(num = 100) pool = Concurrent::FixedThreadPool.new(jobs.length) jobs.each do |job| pool.post do - run_thread_callbacks(job) do - if run_job(job) - success.increment - else - failure.increment - end + if run_job(job) + success.increment + else + failure.increment end end end @@ -122,25 +120,27 @@ def run_thread_callbacks(job, &block) end def run(job) - metadata = { - status: 'RUNNING', - name: job.name, - run_at: job.run_at, - created_at: job.created_at, - priority: job.priority, - queue: job.queue, - attempts: job.attempts, - enqueued_for: (Time.current - job.created_at).round, - } - job_say job, metadata.to_json - run_time = Benchmark.realtime do - Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do - job.invoke_job + run_thread_callbacks(job) do + metadata = { + status: 'RUNNING', + name: job.name, + run_at: job.run_at, + created_at: job.created_at, + priority: job.priority, + queue: job.queue, + attempts: job.attempts, + enqueued_for: (Time.current - job.created_at).round, + } + job_say job, metadata.to_json + run_time = Benchmark.realtime do + Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do + job.invoke_job + end + job.destroy end - job.destroy + job_say job, format('COMPLETED after %.4f seconds', run_time) + true # did work end - job_say job, format('COMPLETED after %.4f seconds', run_time) - true # did work rescue DeserializationError => e job_say job, "FAILED permanently with #{e.class.name}: #{e.message}", 'error'