From 76df9e270621d26df70105be0614412893bbbf47 Mon Sep 17 00:00:00 2001 From: Dmitry Salahutdinov Date: Fri, 17 Jul 2020 17:06:14 +0500 Subject: [PATCH 1/8] Monitor worker job runtime --- README.md | 1 + lib/yabeda/sidekiq.rb | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/README.md b/README.md index c46b4c6..3f44e50 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ end - Number of jobs in dead set (“morgue”): `sidekiq_jobs_dead_count` - Active workers count: `sidekiq_active_processes` - Active processes count: `sidekiq_active_workers_count` + - Runtime of the actual running worker jobs `sidekiq_worker_runtime` ## Custom tags diff --git a/lib/yabeda/sidekiq.rb b/lib/yabeda/sidekiq.rb index d6a48c5..af77699 100644 --- a/lib/yabeda/sidekiq.rb +++ b/lib/yabeda/sidekiq.rb @@ -33,6 +33,7 @@ module Sidekiq gauge :jobs_dead_count, tags: [], comment: "The number of jobs exceeded their retry count." gauge :active_processes, tags: [], comment: "The number of active Sidekiq worker processes." gauge :queue_latency, tags: %i[queue], comment: "The queue latency, the difference in seconds since the oldest job in the queue was enqueued" + gauge :worker_runtime, tags: %i[queue worker jid], comment: "The actual worker job runtime" histogram :job_latency, comment: "The job latency, the difference in seconds between enqueued and running time", unit: :seconds, per: :job, @@ -59,6 +60,16 @@ module Sidekiq sidekiq_queue_latency.set({ queue: queue.name }, queue.latency) end + now = Time.zone.now + ::Sidekiq::Workers.new.each do |process, thread, msg| + payload = msg['payload'] + + sidekiq_worker_runtime.set( + {queue: payload['queue'], worker: payload['class'], jid: payload['jid']}, + now - Time.at(msg['run_at']) + ) + end + # That is quite slow if your retry set is large # I don't want to enable it by default # retries_by_queues = From d1bb3a80f9449c1f5e53a69d0de8abe053b2576d Mon Sep 17 00:00:00 2001 From: Dmitry Salahutdinov Date: Fri, 17 Jul 2020 17:16:08 +0500 Subject: [PATCH 2/8] remove jid tag --- lib/yabeda/sidekiq.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/yabeda/sidekiq.rb b/lib/yabeda/sidekiq.rb index af77699..e6ca012 100644 --- a/lib/yabeda/sidekiq.rb +++ b/lib/yabeda/sidekiq.rb @@ -33,7 +33,7 @@ module Sidekiq gauge :jobs_dead_count, tags: [], comment: "The number of jobs exceeded their retry count." gauge :active_processes, tags: [], comment: "The number of active Sidekiq worker processes." gauge :queue_latency, tags: %i[queue], comment: "The queue latency, the difference in seconds since the oldest job in the queue was enqueued" - gauge :worker_runtime, tags: %i[queue worker jid], comment: "The actual worker job runtime" + gauge :worker_runtime, tags: %i[queue worker], comment: "The actual worker job runtime" histogram :job_latency, comment: "The job latency, the difference in seconds between enqueued and running time", unit: :seconds, per: :job, @@ -65,7 +65,7 @@ module Sidekiq payload = msg['payload'] sidekiq_worker_runtime.set( - {queue: payload['queue'], worker: payload['class'], jid: payload['jid']}, + {queue: payload['queue'], worker: payload['class']}, now - Time.at(msg['run_at']) ) end From 168d795e3e473c545da111e3e1b2749ac0a27314 Mon Sep 17 00:00:00 2001 From: Dmitry Salahutdinov Date: Fri, 16 Oct 2020 16:32:52 +0500 Subject: [PATCH 3/8] Fix --- lib/yabeda/sidekiq.rb | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/lib/yabeda/sidekiq.rb b/lib/yabeda/sidekiq.rb index e6ca012..c96dacb 100644 --- a/lib/yabeda/sidekiq.rb +++ b/lib/yabeda/sidekiq.rb @@ -33,7 +33,7 @@ module Sidekiq gauge :jobs_dead_count, tags: [], comment: "The number of jobs exceeded their retry count." gauge :active_processes, tags: [], comment: "The number of active Sidekiq worker processes." gauge :queue_latency, tags: %i[queue], comment: "The queue latency, the difference in seconds since the oldest job in the queue was enqueued" - gauge :worker_runtime, tags: %i[queue worker], comment: "The actual worker job runtime" + gauge :job_max_runtime, tags: %i[queue worker], comment: "The actual job runtime" histogram :job_latency, comment: "The job latency, the difference in seconds between enqueued and running time", unit: :seconds, per: :job, @@ -60,16 +60,7 @@ module Sidekiq sidekiq_queue_latency.set({ queue: queue.name }, queue.latency) end - now = Time.zone.now - ::Sidekiq::Workers.new.each do |process, thread, msg| - payload = msg['payload'] - - sidekiq_worker_runtime.set( - {queue: payload['queue'], worker: payload['class']}, - now - Time.at(msg['run_at']) - ) - end - + track_job_max_runtime # That is quite slow if your retry set is large # I don't want to enable it by default # retries_by_queues = @@ -98,6 +89,8 @@ module Sidekiq end class << self + att_accessor :job_tags + def labelize(worker, job, queue) { queue: queue, worker: worker_class(worker, job), **custom_tags(worker, job).to_h } end @@ -115,5 +108,23 @@ def custom_tags(worker, job) worker.method(:yabeda_tags).arity.zero? ? worker.yabeda_tags : worker.yabeda_tags(*job["args"]) end end + + self.job_tags = [] + + def track_job_max_runtime + now = Time.now.utc + workers_runtime = ::Sidekiq::Workers.new.each_with_object do |result, (process, thread, msg)| + payload = msg['payload'] + tags = {queue: payload['queue'], worker: payload['class']} + job_tags << tags unless job_tags.include?(tags) + + duration = now - Time.at(msg['run_at']) + result[tags] = duration if !result[tags] || result[tags] < duration + end + + job_tags.each_with_object({}).each do |result, tags| + job_max_runtime.set(tags, workers_runtime[tags].to_i) + end + end end end From 1f9a5356f2569f4aa899c1398d0bccdb5df87496 Mon Sep 17 00:00:00 2001 From: Andrey Novikov Date: Fri, 16 Apr 2021 15:29:32 +0300 Subject: [PATCH 4/8] Fix typos and rubocop offences --- lib/yabeda/sidekiq.rb | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/lib/yabeda/sidekiq.rb b/lib/yabeda/sidekiq.rb index c96dacb..edd39aa 100644 --- a/lib/yabeda/sidekiq.rb +++ b/lib/yabeda/sidekiq.rb @@ -33,7 +33,7 @@ module Sidekiq gauge :jobs_dead_count, tags: [], comment: "The number of jobs exceeded their retry count." gauge :active_processes, tags: [], comment: "The number of active Sidekiq worker processes." gauge :queue_latency, tags: %i[queue], comment: "The queue latency, the difference in seconds since the oldest job in the queue was enqueued" - gauge :job_max_runtime, tags: %i[queue worker], comment: "The actual job runtime" + gauge :job_max_runtime, tags: %i[queue worker], comment: "The actual job runtime" histogram :job_latency, comment: "The job latency, the difference in seconds between enqueued and running time", unit: :seconds, per: :job, @@ -60,7 +60,7 @@ module Sidekiq sidekiq_queue_latency.set({ queue: queue.name }, queue.latency) end - track_job_max_runtime + Yabeda::Sidekiq.track_job_max_runtime # That is quite slow if your retry set is large # I don't want to enable it by default # retries_by_queues = @@ -89,7 +89,7 @@ module Sidekiq end class << self - att_accessor :job_tags + attr_accessor :job_tags def labelize(worker, job, queue) { queue: queue, worker: worker_class(worker, job), **custom_tags(worker, job).to_h } @@ -111,20 +111,22 @@ def custom_tags(worker, job) self.job_tags = [] - def track_job_max_runtime + # rubocop: disable Metrics/AbcSize + def self.track_job_max_runtime now = Time.now.utc - workers_runtime = ::Sidekiq::Workers.new.each_with_object do |result, (process, thread, msg)| - payload = msg['payload'] - tags = {queue: payload['queue'], worker: payload['class']} + workers_runtime = ::Sidekiq::Workers.new.each_with_object({}) do |result, (_process, _thread, msg)| + payload = msg["payload"] + tags = { queue: payload["queue"], worker: payload["class"] } job_tags << tags unless job_tags.include?(tags) - duration = now - Time.at(msg['run_at']) + duration = now - Time.at(msg["run_at"]) result[tags] = duration if !result[tags] || result[tags] < duration end - job_tags.each_with_object({}).each do |result, tags| + job_tags.each_with_object({}).each do |_result, tags| job_max_runtime.set(tags, workers_runtime[tags].to_i) end end + # rubocop: enable Metrics/AbcSize end end From 9632341ca05e8becf48d754c6d3ba43ab5c37c48 Mon Sep 17 00:00:00 2001 From: Andrey Novikov Date: Fri, 16 Apr 2021 16:55:28 +0300 Subject: [PATCH 5/8] Cleanup previous observations --- lib/yabeda/sidekiq.rb | 26 ++++++++++++------- spec/yabeda/sidekiq_spec.rb | 51 +++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 9 deletions(-) diff --git a/lib/yabeda/sidekiq.rb b/lib/yabeda/sidekiq.rb index 9e073bc..f13ae1e 100644 --- a/lib/yabeda/sidekiq.rb +++ b/lib/yabeda/sidekiq.rb @@ -60,7 +60,7 @@ module Sidekiq sidekiq_queue_latency.set({ queue: queue.name }, queue.latency) end - Yabeda::Sidekiq.track_job_max_runtime + Yabeda::Sidekiq.track_max_job_runtime # That is quite slow if your retry set is large # I don't want to enable it by default # retries_by_queues = @@ -89,7 +89,7 @@ module Sidekiq end class << self - attr_accessor :job_tags + attr_accessor :previous_max_job_runtimes def labelize(worker, job, queue) { queue: queue, worker: worker_class(worker, job) } @@ -111,23 +111,31 @@ def custom_tags(worker, job) end end - self.job_tags = [] + self.previous_max_job_runtimes = Set.new # rubocop: disable Metrics/AbcSize - def self.track_job_max_runtime + def self.track_max_job_runtime now = Time.now.utc - workers_runtime = ::Sidekiq::Workers.new.each_with_object({}) do |result, (_process, _thread, msg)| + job_runtimes = ::Sidekiq::Workers.new.each_with_object({}) do |(_process, _thread, msg), result| payload = msg["payload"] - tags = { queue: payload["queue"], worker: payload["class"] } - job_tags << tags unless job_tags.include?(tags) + tags = { queue: payload["queue"], worker: payload["wrapped"] || payload["class"] } duration = now - Time.at(msg["run_at"]) result[tags] = duration if !result[tags] || result[tags] < duration end - job_tags.each_with_object({}).each do |_result, tags| - job_max_runtime.set(tags, workers_runtime[tags].to_i) + job_runtimes.each do |tags, duration| + Yabeda.sidekiq.job_max_runtime.set(tags, duration) end + + # Reset durations to zero for finished jobs we saw earlier + previous_max_job_runtimes.subtract(job_runtimes.keys) + previous_max_job_runtimes.each do |tags| + Yabeda.sidekiq.job_max_runtime.set(tags, 0) + end + + # Populate previous runtimes for the next time + self.previous_max_job_runtimes = Set.new(job_runtimes.keys) end # rubocop: enable Metrics/AbcSize end diff --git a/spec/yabeda/sidekiq_spec.rb b/spec/yabeda/sidekiq_spec.rb index 08ae6cc..0e305ad 100644 --- a/spec/yabeda/sidekiq_spec.rb +++ b/spec/yabeda/sidekiq_spec.rb @@ -54,6 +54,33 @@ { queue: "default", worker: "FailingPlainJob" } => kind_of(Numeric), ) end + + it "measures maximum runtime" do + allow(Sidekiq::Workers).to receive(:new) do + [ + [ + "server:pid:wtf", "tid1", { + "queue" => "default", + "payload" => { "queue" => "default", "class" => "SamplePlainJob" }, + "run_at" => Time.now.to_i, + }, + ], + [ + "server:pid:wtf", "tid2", { + "queue" => "default", + "payload" => { "queue" => "default", "class" => "SamplePlainJob" }, + "run_at" => Time.now.to_i - 5, + }, + ], + ] + end + + Yabeda.collectors.each(&:call) + + expect(Yabeda.sidekiq.job_max_runtime.values).to include( + { queue: "default", worker: "SamplePlainJob" } => (be >= 5), + ) + end end describe "ActiveJob jobs" do @@ -96,6 +123,30 @@ { queue: "default", worker: "FailingActiveJob" } => kind_of(Numeric), ) end + + it "measures maximum runtime" do + allow(Sidekiq::Workers).to receive(:new) do + [ + [ + "server:pid:wtf", "tid1", { + "queue" => "default", + "payload" => { + "queue" => "default", + "class" => "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", + "wrapped" => "SamplePlainJob", + }, + "run_at" => Time.now.to_i - 42, + }, + ], + ] + end + + Yabeda.collectors.each(&:call) + + expect(Yabeda.sidekiq.job_max_runtime.values).to include( + { queue: "default", worker: "SamplePlainJob" } => (be >= 42), + ) + end end describe "#yabeda_tags worker method" do From ebbdea1141f14fbc8dc59ac8d0d780e053a60ee1 Mon Sep 17 00:00:00 2001 From: Andrey Novikov Date: Fri, 16 Apr 2021 17:15:29 +0300 Subject: [PATCH 6/8] Simplify cleanup logic --- lib/yabeda/sidekiq.rb | 19 +++++----------- spec/yabeda/sidekiq_spec.rb | 45 ++++++++++++++++++++++++++----------- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/lib/yabeda/sidekiq.rb b/lib/yabeda/sidekiq.rb index f13ae1e..881a2b9 100644 --- a/lib/yabeda/sidekiq.rb +++ b/lib/yabeda/sidekiq.rb @@ -89,7 +89,7 @@ module Sidekiq end class << self - attr_accessor :previous_max_job_runtimes + attr_accessor :observed_max_job_runtimes def labelize(worker, job, queue) { queue: queue, worker: worker_class(worker, job) } @@ -111,31 +111,24 @@ def custom_tags(worker, job) end end - self.previous_max_job_runtimes = Set.new + self.observed_max_job_runtimes = Concurrent::Set.new # rubocop: disable Metrics/AbcSize def self.track_max_job_runtime now = Time.now.utc - job_runtimes = ::Sidekiq::Workers.new.each_with_object({}) do |(_process, _thread, msg), result| + current_max_job_runtimes = ::Sidekiq::Workers.new.each_with_object({}) do |(_process, _thread, msg), result| payload = msg["payload"] tags = { queue: payload["queue"], worker: payload["wrapped"] || payload["class"] } + observed_max_job_runtimes << tags duration = now - Time.at(msg["run_at"]) result[tags] = duration if !result[tags] || result[tags] < duration end - job_runtimes.each do |tags, duration| + observed_max_job_runtimes.each do |tags| + duration = current_max_job_runtimes[tags].to_i # will set not currently observed jobs duration to zero Yabeda.sidekiq.job_max_runtime.set(tags, duration) end - - # Reset durations to zero for finished jobs we saw earlier - previous_max_job_runtimes.subtract(job_runtimes.keys) - previous_max_job_runtimes.each do |tags| - Yabeda.sidekiq.job_max_runtime.set(tags, 0) - end - - # Populate previous runtimes for the next time - self.previous_max_job_runtimes = Set.new(job_runtimes.keys) end # rubocop: enable Metrics/AbcSize end diff --git a/spec/yabeda/sidekiq_spec.rb b/spec/yabeda/sidekiq_spec.rb index 0e305ad..4ea0e9a 100644 --- a/spec/yabeda/sidekiq_spec.rb +++ b/spec/yabeda/sidekiq_spec.rb @@ -56,29 +56,48 @@ end it "measures maximum runtime" do + invoked = 0 allow(Sidekiq::Workers).to receive(:new) do [ [ - "server:pid:wtf", "tid1", { - "queue" => "default", - "payload" => { "queue" => "default", "class" => "SamplePlainJob" }, - "run_at" => Time.now.to_i, - }, + [ + "server:pid:wtf", "tid1", { + "queue" => "default", + "payload" => { "queue" => "default", "class" => "FailingPlainJob" }, + "run_at" => Time.now.to_i - 10, + }, + ], ], [ - "server:pid:wtf", "tid2", { - "queue" => "default", - "payload" => { "queue" => "default", "class" => "SamplePlainJob" }, - "run_at" => Time.now.to_i - 5, - }, + [ + "server:pid:wtf", "tid1", { + "queue" => "default", + "payload" => { "queue" => "default", "class" => "SamplePlainJob" }, + "run_at" => Time.now.to_i, + }, + ], + [ + "server:pid:wtf", "tid2", { + "queue" => "default", + "payload" => { "queue" => "default", "class" => "SamplePlainJob" }, + "run_at" => Time.now.to_i - 5, + }, + ], ], - ] + ][(invoked += 1) - 1] end Yabeda.collectors.each(&:call) + expect(Yabeda.sidekiq.job_max_runtime.values).to include( + { queue: "default", worker: "FailingPlainJob" } => (be >= 10), + ) + + Yabeda.collectors.each(&:call) + expect(Yabeda.sidekiq.job_max_runtime.values).to include( { queue: "default", worker: "SamplePlainJob" } => (be >= 5), + { queue: "default", worker: "FailingPlainJob" } => 0, ) end end @@ -133,7 +152,7 @@ "payload" => { "queue" => "default", "class" => "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", - "wrapped" => "SamplePlainJob", + "wrapped" => "SampleActiveJob", }, "run_at" => Time.now.to_i - 42, }, @@ -144,7 +163,7 @@ Yabeda.collectors.each(&:call) expect(Yabeda.sidekiq.job_max_runtime.values).to include( - { queue: "default", worker: "SamplePlainJob" } => (be >= 42), + { queue: "default", worker: "SampleActiveJob" } => (be >= 42), ) end end From cf9c2dc49659a1931066491ba720ae52b634e744 Mon Sep 17 00:00:00 2001 From: Andrey Novikov Date: Fri, 16 Apr 2021 17:23:22 +0300 Subject: [PATCH 7/8] Fix specs when Redis isn't available --- spec/yabeda/sidekiq_spec.rb | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/spec/yabeda/sidekiq_spec.rb b/spec/yabeda/sidekiq_spec.rb index 4ea0e9a..eb6ccd8 100644 --- a/spec/yabeda/sidekiq_spec.rb +++ b/spec/yabeda/sidekiq_spec.rb @@ -1,6 +1,24 @@ # frozen_string_literal: true RSpec.describe Yabeda::Sidekiq do + before do + allow(Sidekiq::Stats).to receive(:new).and_return( + OpenStruct.new({ + workers_size: 0, + scheduled_size: 0, + dead_size: 0, + processes_size: 0, + retry_size: 0, + processed: 0, + failed: 0, + queues: { "default" => 0 }, + }) + ) + allow(Sidekiq::Queue).to receive(:all).and_return([ + OpenStruct.new({ name: "default", latency: 0 }), + ]) + end + it "has a version number" do expect(Yabeda::Sidekiq::VERSION).not_to be nil end From c906a273fba454bc70f6b6a2431f9119bf526a35 Mon Sep 17 00:00:00 2001 From: Andrey Novikov Date: Fri, 16 Apr 2021 17:26:04 +0300 Subject: [PATCH 8/8] Fix rubocop --- spec/yabeda/sidekiq_spec.rb | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/spec/yabeda/sidekiq_spec.rb b/spec/yabeda/sidekiq_spec.rb index eb6ccd8..4e11822 100644 --- a/spec/yabeda/sidekiq_spec.rb +++ b/spec/yabeda/sidekiq_spec.rb @@ -3,7 +3,7 @@ RSpec.describe Yabeda::Sidekiq do before do allow(Sidekiq::Stats).to receive(:new).and_return( - OpenStruct.new({ + OpenStruct.new( workers_size: 0, scheduled_size: 0, dead_size: 0, @@ -12,11 +12,13 @@ processed: 0, failed: 0, queues: { "default" => 0 }, - }) + ), + ) + allow(Sidekiq::Queue).to receive(:all).and_return( + [ + OpenStruct.new({ name: "default", latency: 0 }), + ], ) - allow(Sidekiq::Queue).to receive(:all).and_return([ - OpenStruct.new({ name: "default", latency: 0 }), - ]) end it "has a version number" do