Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track maximum runtime of currently running jobs #17

Merged
merged 3 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ end
- Number of jobs in dead set (“morgue”): `sidekiq_jobs_dead_count`
- Active workers count: `sidekiq_active_processes`
- Active processes count: `sidekiq_active_workers_count`
- Maximum runtime of currently executing jobs: `sidekiq_running_job_runtime` (useful for detection of hung jobs, segmented by queue and class name)

## Custom tags

Expand Down
20 changes: 20 additions & 0 deletions lib/yabeda/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ module Sidekiq
gauge :jobs_dead_count, tags: [], comment: "The number of jobs exceeded their retry count."
gauge :active_processes, tags: [], comment: "The number of active Sidekiq worker processes."
gauge :queue_latency, tags: %i[queue], comment: "The queue latency, the difference in seconds since the oldest job in the queue was enqueued"
gauge :running_job_runtime, tags: %i[queue worker], aggregation: :max, unit: :seconds,
comment: "How long currently running jobs are running (useful for detection of hung jobs)"

histogram :job_latency, comment: "The job latency, the difference in seconds between enqueued and running time",
unit: :seconds, per: :job,
Expand All @@ -59,6 +61,8 @@ module Sidekiq
sidekiq_queue_latency.set({ queue: queue.name }, queue.latency)
end

Yabeda::Sidekiq.track_max_job_runtime

# That is quite slow if your retry set is large
# I don't want to enable it by default
# retries_by_queues =
Expand Down Expand Up @@ -105,6 +109,22 @@ def custom_tags(worker, job)

worker.method(:yabeda_tags).arity.zero? ? worker.yabeda_tags : worker.yabeda_tags(*job["args"])
end

# Hash of hashes containing all currently running jobs' start timestamps
# to calculate maximum durations of currently running not yet completed jobs
# { { queue: "default", worker: "SomeJob" } => { "jid1" => 100500, "jid2" => 424242 } }
attr_accessor :jobs_started_at

def track_max_job_runtime
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
::Yabeda::Sidekiq.jobs_started_at.each do |labels, jobs|
oldest_job_started_at = jobs.values.min
oldest_job_duration = oldest_job_started_at ? (now - oldest_job_started_at).round(3) : 0
Yabeda.sidekiq.running_job_runtime.set(labels, oldest_job_duration)
end
end
end

self.jobs_started_at = Concurrent::Hash.new { |hash, key| hash[key] = Concurrent::Hash.new }
end
end
2 changes: 2 additions & 0 deletions lib/yabeda/sidekiq/server_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def call(worker, job, queue)
begin
job_instance = ::Sidekiq::Job.new(job)
Yabeda.sidekiq_job_latency.measure(labels, job_instance.latency)
Yabeda::Sidekiq.jobs_started_at[labels][job["jid"]] = start
Yabeda.with_tags(**custom_tags) do
yield
end
Expand All @@ -22,6 +23,7 @@ def call(worker, job, queue)
ensure
Yabeda.sidekiq_job_runtime.measure(labels, elapsed(start))
Yabeda.sidekiq_jobs_executed_total.increment(labels)
Yabeda::Sidekiq.jobs_started_at[labels].delete(job["jid"])
end
end
# rubocop: enable Metrics/AbcSize, Metrics/MethodLength:
Expand Down
9 changes: 9 additions & 0 deletions spec/support/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ def perform(*_args)
end
end

class SampleLongRunningJob
include Sidekiq::Worker

def perform(*_args)
sleep 0.05
"Phew, I'm done!"
end
end

class SampleComplexJob
include Sidekiq::Worker

Expand Down
1 change: 1 addition & 0 deletions spec/support/sidekiq_inline_middlewares.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ def push(job)
return super unless Sidekiq::Testing.inline?

job = Sidekiq.load_json(Sidekiq.dump_json(job))
job["jid"] ||= SecureRandom.hex(12)
job_class = Sidekiq::Testing.constantize(job["class"])
job_instance = job_class.new
queue = (job_instance.sidekiq_options_hash || {}).fetch("queue", "default")
Expand Down
38 changes: 38 additions & 0 deletions spec/yabeda/sidekiq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,43 @@
expect(Yabeda.sidekiq.jobs_dead_count.values).to eq({ {} => 3 })
expect(Yabeda.sidekiq.jobs_scheduled_count.values).to eq({ {} => 2 })
end

it "measures maximum runtime of currently running jobs" do
Yabeda.sidekiq.running_job_runtime.values.clear # This is a hack
described_class.jobs_started_at.clear

Sidekiq::Testing.inline! do
workers = []
workers.push(Thread.new { SampleLongRunningJob.perform_async })
sleep 0.012 # Ruby can sleep less than requested
workers.push(Thread.new { SampleLongRunningJob.perform_async })

Yabeda.collectors.each(&:call)
expect(Yabeda.sidekiq.running_job_runtime.values).to include(
{ queue: "default", worker: "SampleLongRunningJob" } => (be >= 0.010),
)

sleep 0.012 # Ruby can sleep less than requested
begin
FailingActiveJob.perform_later
rescue StandardError
nil
end
Yabeda.collectors.each(&:call)

expect(Yabeda.sidekiq.running_job_runtime.values).to include(
{ queue: "default", worker: "SampleLongRunningJob" } => (be >= 0.020),
{ queue: "default", worker: "FailingActiveJob" } => 0,
)

# When all jobs are completed, metric should respond with zero
workers.map(&:join)
Yabeda.collectors.each(&:call)
expect(Yabeda.sidekiq.running_job_runtime.values).to include(
{ queue: "default", worker: "SampleLongRunningJob" } => 0,
{ queue: "default", worker: "FailingActiveJob" } => 0,
)
end
end
end
end