From de116e8ca61ff6380e3632698f2e2a2e34b6d2bf Mon Sep 17 00:00:00 2001 From: Andrey Novikov Date: Mon, 19 Apr 2021 18:41:26 +0300 Subject: [PATCH 1/3] Track maximum runtime of currently running jobs --- README.md | 1 + lib/yabeda/sidekiq.rb | 20 +++++++++++++ lib/yabeda/sidekiq/server_middleware.rb | 2 ++ spec/support/jobs.rb | 9 ++++++ spec/support/sidekiq_inline_middlewares.rb | 1 + spec/yabeda/sidekiq_spec.rb | 34 ++++++++++++++++++++++ 6 files changed, 67 insertions(+) diff --git a/README.md b/README.md index 746644b..8f0112f 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,7 @@ end - Number of jobs in dead set (“morgue”): `sidekiq_jobs_dead_count` - Active workers count: `sidekiq_active_processes` - Active processes count: `sidekiq_active_workers_count` + - Maximum runtime of currently executing jobs: `sidekiq_running_job_runtime` (useful for detection of hung jobs, segmented by queue and class name) ## Custom tags diff --git a/lib/yabeda/sidekiq.rb b/lib/yabeda/sidekiq.rb index a6913e7..1f5a8fc 100644 --- a/lib/yabeda/sidekiq.rb +++ b/lib/yabeda/sidekiq.rb @@ -33,6 +33,8 @@ module Sidekiq gauge :jobs_dead_count, tags: [], comment: "The number of jobs exceeded their retry count." gauge :active_processes, tags: [], comment: "The number of active Sidekiq worker processes." gauge :queue_latency, tags: %i[queue], comment: "The queue latency, the difference in seconds since the oldest job in the queue was enqueued" + gauge :running_job_runtime, tags: %i[queue worker], aggregation: :max, unit: :seconds, + comment: "How long currently running jobs are running (useful for detection of hung jobs)" histogram :job_latency, comment: "The job latency, the difference in seconds between enqueued and running time", unit: :seconds, per: :job, @@ -59,6 +61,8 @@ module Sidekiq sidekiq_queue_latency.set({ queue: queue.name }, queue.latency) end + Yabeda::Sidekiq.track_max_job_runtime + # That is quite slow if your retry set is large # I don't want to enable it by default # retries_by_queues = @@ -105,6 +109,22 @@ def custom_tags(worker, job) worker.method(:yabeda_tags).arity.zero? ? worker.yabeda_tags : worker.yabeda_tags(*job["args"]) end + + # Hash of hashes containing all currently running jobs' start timestamps + # to calculate maximum durations of currently running not yet completed jobs + # { { queue: "default", worker: "SomeJob" } => { "jid1" => 100500, "jid2" => 424242 } } + attr_accessor :jobs_started_at + + def track_max_job_runtime + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + ::Yabeda::Sidekiq.jobs_started_at.each do |labels, jobs| + oldest_job_started_at = jobs.values.min + oldest_job_duration = oldest_job_started_at ? (now - oldest_job_started_at).round(3) : 0 + Yabeda.sidekiq.running_job_runtime.set(labels, oldest_job_duration) + end + end end + + self.jobs_started_at = Concurrent::Hash.new { |hash, key| hash[key] = Concurrent::Hash.new } end end diff --git a/lib/yabeda/sidekiq/server_middleware.rb b/lib/yabeda/sidekiq/server_middleware.rb index dba580d..d1fad9d 100644 --- a/lib/yabeda/sidekiq/server_middleware.rb +++ b/lib/yabeda/sidekiq/server_middleware.rb @@ -12,6 +12,7 @@ def call(worker, job, queue) begin job_instance = ::Sidekiq::Job.new(job) Yabeda.sidekiq_job_latency.measure(labels, job_instance.latency) + Yabeda::Sidekiq.jobs_started_at[labels][job["jid"]] = start Yabeda.with_tags(**custom_tags) do yield end @@ -22,6 +23,7 @@ def call(worker, job, queue) ensure Yabeda.sidekiq_job_runtime.measure(labels, elapsed(start)) Yabeda.sidekiq_jobs_executed_total.increment(labels) + Yabeda::Sidekiq.jobs_started_at[labels].delete(job["jid"]) end end # rubocop: enable Metrics/AbcSize, Metrics/MethodLength: diff --git a/spec/support/jobs.rb b/spec/support/jobs.rb index d3f8bcb..01345f7 100644 --- a/spec/support/jobs.rb +++ b/spec/support/jobs.rb @@ -8,6 +8,15 @@ def perform(*_args) end end +class SampleLongRunningJob + include Sidekiq::Worker + + def perform(*_args) + sleep 0.05 + "Phew, I'm done!" + end +end + class SampleComplexJob include Sidekiq::Worker diff --git a/spec/support/sidekiq_inline_middlewares.rb b/spec/support/sidekiq_inline_middlewares.rb index c9263e0..82c0777 100644 --- a/spec/support/sidekiq_inline_middlewares.rb +++ b/spec/support/sidekiq_inline_middlewares.rb @@ -7,6 +7,7 @@ def push(job) return super unless Sidekiq::Testing.inline? job = Sidekiq.load_json(Sidekiq.dump_json(job)) + job["jid"] ||= SecureRandom.hex(12) job_class = Sidekiq::Testing.constantize(job["class"]) job_instance = job_class.new queue = (job_instance.sidekiq_options_hash || {}).fetch("queue", "default") diff --git a/spec/yabeda/sidekiq_spec.rb b/spec/yabeda/sidekiq_spec.rb index a5d12d1..26a095d 100644 --- a/spec/yabeda/sidekiq_spec.rb +++ b/spec/yabeda/sidekiq_spec.rb @@ -167,5 +167,39 @@ expect(Yabeda.sidekiq.jobs_dead_count.values).to eq({ {} => 3 }) expect(Yabeda.sidekiq.jobs_scheduled_count.values).to eq({ {} => 2 }) end + + it "measures maximum runtime of currently running jobs" do + Yabeda.sidekiq.running_job_runtime.values.clear # This is a hack + Yabeda::Sidekiq.jobs_started_at.clear + + Sidekiq::Testing.inline! do + workers = [] + workers.push(Thread.new { SampleLongRunningJob.perform_async }) + sleep 0.01 + workers.push(Thread.new { SampleLongRunningJob.perform_async }) + + Yabeda.collectors.each(&:call) + expect(Yabeda.sidekiq.running_job_runtime.values).to include( + { queue: "default", worker: "SampleLongRunningJob" } => (be >= 0.01), + ) + + sleep 0.01 + FailingActiveJob.perform_later rescue nil + Yabeda.collectors.each(&:call) + + expect(Yabeda.sidekiq.running_job_runtime.values).to include( + { queue: "default", worker: "SampleLongRunningJob" } => (be >= 0.02), + { queue: "default", worker: "FailingActiveJob" } => 0, + ) + + # When all jobs are completed, metric should respond with zero + workers.map(&:join) + Yabeda.collectors.each(&:call) + expect(Yabeda.sidekiq.running_job_runtime.values).to include( + { queue: "default", worker: "SampleLongRunningJob" } => 0, + { queue: "default", worker: "FailingActiveJob" } => 0, + ) + end + end end end From b45ed7d9b02b9134c86a300776040c3f6d034e2c Mon Sep 17 00:00:00 2001 From: Andrey Novikov Date: Mon, 19 Apr 2021 18:49:29 +0300 Subject: [PATCH 2/3] :cop: --- spec/yabeda/sidekiq_spec.rb | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/spec/yabeda/sidekiq_spec.rb b/spec/yabeda/sidekiq_spec.rb index 26a095d..b011820 100644 --- a/spec/yabeda/sidekiq_spec.rb +++ b/spec/yabeda/sidekiq_spec.rb @@ -170,7 +170,7 @@ it "measures maximum runtime of currently running jobs" do Yabeda.sidekiq.running_job_runtime.values.clear # This is a hack - Yabeda::Sidekiq.jobs_started_at.clear + described_class.jobs_started_at.clear Sidekiq::Testing.inline! do workers = [] @@ -184,12 +184,16 @@ ) sleep 0.01 - FailingActiveJob.perform_later rescue nil + begin + FailingActiveJob.perform_later + rescue StandardError + nil + end Yabeda.collectors.each(&:call) expect(Yabeda.sidekiq.running_job_runtime.values).to include( { queue: "default", worker: "SampleLongRunningJob" } => (be >= 0.02), - { queue: "default", worker: "FailingActiveJob" } => 0, + { queue: "default", worker: "FailingActiveJob" } => 0, ) # When all jobs are completed, metric should respond with zero @@ -197,7 +201,7 @@ Yabeda.collectors.each(&:call) expect(Yabeda.sidekiq.running_job_runtime.values).to include( { queue: "default", worker: "SampleLongRunningJob" } => 0, - { queue: "default", worker: "FailingActiveJob" } => 0, + { queue: "default", worker: "FailingActiveJob" } => 0, ) end end From d924d32edd3baa415602610745c336b9855ef4fe Mon Sep 17 00:00:00 2001 From: Andrey Novikov Date: Mon, 19 Apr 2021 18:56:02 +0300 Subject: [PATCH 3/3] sleep isn't reliable at all: it can sleep less than requested! --- spec/yabeda/sidekiq_spec.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/yabeda/sidekiq_spec.rb b/spec/yabeda/sidekiq_spec.rb index b011820..6605b23 100644 --- a/spec/yabeda/sidekiq_spec.rb +++ b/spec/yabeda/sidekiq_spec.rb @@ -175,15 +175,15 @@ Sidekiq::Testing.inline! do workers = [] workers.push(Thread.new { SampleLongRunningJob.perform_async }) - sleep 0.01 + sleep 0.012 # Ruby can sleep less than requested workers.push(Thread.new { SampleLongRunningJob.perform_async }) Yabeda.collectors.each(&:call) expect(Yabeda.sidekiq.running_job_runtime.values).to include( - { queue: "default", worker: "SampleLongRunningJob" } => (be >= 0.01), + { queue: "default", worker: "SampleLongRunningJob" } => (be >= 0.010), ) - sleep 0.01 + sleep 0.012 # Ruby can sleep less than requested begin FailingActiveJob.perform_later rescue StandardError @@ -192,7 +192,7 @@ Yabeda.collectors.each(&:call) expect(Yabeda.sidekiq.running_job_runtime.values).to include( - { queue: "default", worker: "SampleLongRunningJob" } => (be >= 0.02), + { queue: "default", worker: "SampleLongRunningJob" } => (be >= 0.020), { queue: "default", worker: "FailingActiveJob" } => 0, )