Skip to content

Commit

Permalink
Decrement Redis Counter for Errored Sidekiq Jobs (#24)
Browse files Browse the repository at this point in the history
* decrement counter for errored sidekiq jobs

* add rubocop as development dependency

* corrects to styling and deleting some unnecessary specs

* remove more test/styling

---------

Co-authored-by: [email protected] <[email protected]>
  • Loading branch information
hkim3162 and hkim3163 authored Jun 26, 2023
1 parent 0206e28 commit e64bf1b
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 77 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ For each queue, the following metrics will be reported:
2. **shared.sidekiq._queue_.latency**: gauge of how long the oldest job has been in the queue

For each worker, the following metrics and tags will be reported:
1. **sidekiq.worker_metrics.inqueue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement
1. **sidekiq.worker_metrics.in_queue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement

## DogStatsD Keys
For each job, the following metrics and tags will be reported:
Expand All @@ -111,7 +111,7 @@ For each queue, the following metrics and tags will be reported:
2. **sidekiq.queue.latency (tags: {queue: _queue_})**: gauge of how long the oldest job has been in the queue

For each worker, the following metrics and tags will be reported:
1. **sidekiq.worker_metrics.inqueue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement
1. **sidekiq.worker_metrics.in_queue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement

## Worker
There is a worker, `Sidekiq::Instrument::Worker`, that submits gauges
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq/instrument/middleware/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ module Sidekiq::Instrument
class ServerMiddleware
include Sidekiq::Instrument::MetricNames

def call(worker, job, queue, &block)
def call(worker, _job, _queue, &block)
Statter.statsd.increment(metric_name(worker, 'dequeue'))
Statter.dogstatsd&.increment('sidekiq.dequeue', worker_dog_options(worker))

start_time = Time.now
WorkerMetrics.trace_workers_decrement_counter(worker.class.to_s.underscore)
yield block
execution_time_ms = (Time.now - start_time) * 1000
Statter.statsd.measure(metric_name(worker, 'runtime'), execution_time_ms)
Expand All @@ -22,6 +21,7 @@ def call(worker, job, queue, &block)
Statter.dogstatsd&.increment('sidekiq.error', worker_dog_options(worker))
raise e
ensure
WorkerMetrics.trace_workers_decrement_counter(worker.class.to_s.underscore)
Statter.dogstatsd&.flush(sync: true)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/instrument/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Sidekiq
module Instrument
VERSION = '0.6.1'
VERSION = '0.6.2'
end
end
8 changes: 5 additions & 3 deletions lib/sidekiq/instrument/worker.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

require 'sidekiq'
require 'sidekiq/api'

Expand All @@ -13,7 +15,7 @@ class Worker
workers_size: :workers,
enqueued: :pending,
failed: nil
}
}.freeze

def perform
info = Sidekiq::Stats.new
Expand Down Expand Up @@ -64,8 +66,8 @@ def send_worker_metrics
return unless WorkerMetrics.enabled

WorkerMetrics.workers_in_queue.each do |key, value|
Statter.statsd.gauge("shared.sidekiq.worker_metrics.inqueue.#{key}", value)
Statter.dogstatsd&.gauge("shared.sidekiq.worker_metrics.inqueue", value, tags: ["worker:#{key}"])
Statter.statsd.gauge("shared.sidekiq.worker_metrics.in_queue.#{key}", value)
Statter.dogstatsd&.gauge('shared.sidekiq.worker_metrics.in_queue', value, tags: ["worker:#{key}"])
end
end
end
Expand Down
1 change: 1 addition & 0 deletions sidekiq-instrument.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'bundler', '~> 2.0', '>= 2.0.2'
spec.add_development_dependency 'rake', '~> 12.0'
spec.add_development_dependency 'rspec', '~> 3.0'
spec.add_development_dependency 'rubocop', '~> 1.0'
spec.add_development_dependency 'pry-byebug', '~> 3.4'
spec.add_development_dependency 'simplecov'
spec.add_development_dependency 'simplecov-cobertura'
Expand Down
52 changes: 23 additions & 29 deletions spec/sidekiq-instrument/client_middleware_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
# frozen_string_literal: true

require 'sidekiq/instrument/middleware/client'

RSpec.describe Sidekiq::Instrument::ClientMiddleware do
describe '#call' do
let(:worker_metric_name) do
'sidekiq_instrument_trace_workers::in_queue'
end

before(:all) do
Sidekiq.configure_client do |c|
c.client_middleware do |chain|
Expand All @@ -10,6 +16,10 @@
end
end

before(:each) do
Redis.new.flushall
end

after(:all) do
Sidekiq.configure_client do |c|
c.client_middleware do |chain|
Expand All @@ -20,50 +30,34 @@

context 'without statsd_metric_name' do
it 'increments the StatsD enqueue counter' do
expect {
expect do
MyWorker.perform_async
}.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.enqueue')
end.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.enqueue')
end

it 'increments the DogStatsD enqueue counter' do
expect(Sidekiq::Instrument::Statter.dogstatsd).to receive(:increment).with('sidekiq.enqueue', { tags: ['queue:default', 'worker:my_worker'] }).once
expect(
Sidekiq::Instrument::Statter.dogstatsd
).to receive(:increment).with('sidekiq.enqueue', { tags: ['queue:default', 'worker:my_worker'] }).once
MyWorker.perform_async
end
end

context 'with statsd_metric_name' do
it 'increments the enqueue counter' do
expect {
expect do
MyOtherWorker.perform_async
}.to trigger_statsd_increment('my_other_worker.enqueue')
end.to trigger_statsd_increment('my_other_worker.enqueue')
end
end

context 'with WorkerMetrics.enabled true' do
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel worker_metric_name ,'my_other_worker'
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('1')
end
end

context 'with WorkerMetrics.enabled true and redis_config not provided' do
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel worker_metric_name ,'my_other_worker'
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('1')
it 'increments the in_queue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('1')
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('2')
end
end

Expand Down
94 changes: 62 additions & 32 deletions spec/sidekiq-instrument/server_middleware_spec.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
# frozen_string_literal: true

require 'sidekiq/instrument/middleware/server'

RSpec.describe Sidekiq::Instrument::ServerMiddleware do
describe '#call' do
let(:expected_dog_options) { { tags: ['queue:default', 'worker:my_worker'] } }
let(:worker_metric_name) do
'sidekiq_instrument_trace_workers::in_queue'
end

before(:all) do
Sidekiq::Testing.server_middleware do |chain|
chain.add described_class
end
end

before(:each) do
Redis.new.flushall
end

after(:all) do
Sidekiq::Testing.server_middleware do |chain|
chain.remove described_class
Expand All @@ -18,20 +27,22 @@

context 'when a job succeeds' do
it 'increments StatsD dequeue counter' do
expect {
expect do
MyWorker.perform_async
}.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.dequeue')
end.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.dequeue')
end

it 'increments DogStatsD dequeue counter' do
expect(Sidekiq::Instrument::Statter.dogstatsd).to receive(:increment).with('sidekiq.dequeue', expected_dog_options).once
expect(
Sidekiq::Instrument::Statter.dogstatsd
).to receive(:increment).with('sidekiq.dequeue', expected_dog_options).once
MyWorker.perform_async
end

it 'measures StatsD job runtime' do
expect {
expect do
MyWorker.perform_async
}.to trigger_statsd_measure('shared.sidekiq.default.MyWorker.runtime')
end.to trigger_statsd_measure('shared.sidekiq.default.MyWorker.runtime')
end

it 'measures DogStatsD job runtime' do
Expand All @@ -40,53 +51,72 @@
end

context 'with WorkerMetrics.enabled true' do
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel worker_metric_name ,'my_other_worker'
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('-1')
it 'decrements the in_queue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel(worker_metric_name, 'my_other_worker')
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('-1')
end
end

context 'with WorkerMetrics.enabled true, and redis_config not given' do
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel worker_metric_name ,'my_other_worker'
context 'with WorkerMetrics.enabled true and an errored job' do
it 'decrements the in_queue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('-1')
begin
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('-1')
rescue StandardError
nil
end
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('-2')
end
end
end

context 'when a job fails' do
before { allow_any_instance_of(MyWorker).to receive(:perform).and_raise('foo') }
before do
allow_any_instance_of(MyWorker).to receive(:perform).and_raise('foo')
end

it 'increments the StatsD failure counter' do
expect {
MyWorker.perform_async rescue nil
}.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.error')
expect do
MyWorker.perform_async
rescue StandardError
nil
end.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.error')
end

it 'increments the DogStatsD failure counter' do
expect(Sidekiq::Instrument::Statter.dogstatsd).to receive(:increment).with('sidekiq.dequeue', expected_dog_options).once
expect(
Sidekiq::Instrument::Statter.dogstatsd
).to receive(:increment).with('sidekiq.dequeue', expected_dog_options).once
expect(Sidekiq::Instrument::Statter.dogstatsd).not_to receive(:time)
expect(Sidekiq::Instrument::Statter.dogstatsd).to receive(:increment).with('sidekiq.error', expected_dog_options).once
MyWorker.perform_async rescue nil
expect(
Sidekiq::Instrument::Statter.dogstatsd
).to receive(:increment).with('sidekiq.error', expected_dog_options).once

begin
MyWorker.perform_async
rescue StandardError
nil
end
end

it 're-raises the error' do
expect { MyWorker.perform_async }.to raise_error 'foo'
end

it 'calls the decrement counter' do
expect(
Sidekiq::Instrument::WorkerMetrics
).to receive(:trace_workers_decrement_counter).with('my_worker').once
begin
MyWorker.perform_async
rescue StandardError
nil
end
end
end

context 'without optional DogStatsD client' do
Expand Down
Loading

0 comments on commit e64bf1b

Please sign in to comment.