diff --git a/README.md b/README.md index 1593a92b4..0811c711f 100644 --- a/README.md +++ b/README.md @@ -601,7 +601,8 @@ For sidekiq versions before 5.1 a `sidekiq_retries_exhausted` block is required ```ruby class MyWorker sidekiq_retries_exhausted do |msg, _ex| - SidekiqUniqueJobs::Digests.del(digest: msg['unique_digest']) if msg['unique_digest'] + digest = msg['unique_digest'] + SidekiqUniqueJobs::Digests.delete_by_digest(digest) if digest end end ``` @@ -612,7 +613,8 @@ Starting in v5.1, Sidekiq can also fire a global callback when a job dies: # this goes in your initializer Sidekiq.configure_server do |config| config.death_handlers << ->(job, _ex) do - SidekiqUniqueJobs::Digests.del(digest: job['unique_digest']) if job['unique_digest'] + digest = msg['unique_digest'] + SidekiqUniqueJobs::Digests.delete_by_digest(digest) if digest end end ``` diff --git a/lib/sidekiq_unique_jobs/cli.rb b/lib/sidekiq_unique_jobs/cli.rb index 9b0b8548a..f2a90a0da 100644 --- a/lib/sidekiq_unique_jobs/cli.rb +++ b/lib/sidekiq_unique_jobs/cli.rb @@ -16,9 +16,9 @@ def self.banner(command, _namespace = nil, _subcommand = false) desc "list PATTERN", "list all unique digests and their expiry time" option :count, aliases: :c, type: :numeric, default: 1000, desc: "The max number of digests to return" def list(pattern = "*") - digests = SidekiqUniqueJobs::Digests.new.entries(pattern: pattern, count: options[:count]) - say "Found #{digests.size} digests matching '#{pattern}':" - print_in_columns(digests.sort) if digests.any? + entries = digests.entries(pattern: pattern, count: options[:count]) + say "Found #{entries.size} digests matching '#{pattern}':" + print_in_columns(entries.sort) if entries.any? end desc "del PATTERN", "deletes unique digests from redis by pattern" @@ -27,10 +27,10 @@ def list(pattern = "*") def del(pattern) max_count = options[:count] if options[:dry_run] - digests = SidekiqUniqueJobs::Digests.new.entries(pattern: pattern, count: max_count) - say "Would delete #{digests.size} digests matching '#{pattern}'" + result = digests.entries(pattern: pattern, count: max_count) + say "Would delete #{result.size} digests matching '#{pattern}'" else - deleted_count = SidekiqUniqueJobs::Digests.new.del(pattern: pattern, count: max_count) + deleted_count = digests.delete_by_pattern(pattern, count: max_count) say "Deleted #{deleted_count} digests matching '#{pattern}'" end end @@ -46,6 +46,10 @@ def console end no_commands do + def digests + @digests ||= SidekiqUniqueJobs::Digests.new + end + def console_class require "pry" Pry diff --git a/lib/sidekiq_unique_jobs/digests.rb b/lib/sidekiq_unique_jobs/digests.rb index 5788fdf7a..b23b92c95 100644 --- a/lib/sidekiq_unique_jobs/digests.rb +++ b/lib/sidekiq_unique_jobs/digests.rb @@ -27,26 +27,44 @@ def add(digest) redis { |conn| conn.zadd(key, now_f, digest) } end + # Deletes unique digests by pattern # - # Deletes unique digest either by a digest or pattern - # - # @overload call_script(digest: "abcdefab") - # Call script with digest - # @param [String] digest: a digest to delete - # @overload call_script(pattern: "*", count: 1_000) - # Call script with pattern - # @param [String] pattern: "*" a pattern to match - # @param [String] count: DEFAULT_COUNT the number of keys to delete - # - # @raise [ArgumentError] when given neither pattern nor digest - # + # @param [String] pattern a key pattern to match with + # @param [Integer] count the maximum number # @return [Array] with unique digests + def delete_by_pattern(pattern, count: DEFAULT_COUNT) + result, elapsed = timed do + digests = entries(pattern: pattern, count: count).keys + redis { |conn| BatchDelete.call(digests, conn) } + end + + log_info("#{__method__}(#{pattern}, count: #{count}) completed in #{elapsed}ms") + + result + end + + # Delete unique digests by digest + # Also deletes the :AVAILABLE, :EXPIRED etc keys # - def del(digest: nil, pattern: nil, count: DEFAULT_COUNT) - return delete_by_pattern(pattern, count: count) if pattern - return delete_by_digest(digest) if digest + # @param [String] digest a unique digest to delete + def delete_by_digest(digest) # rubocop:disable Metrics/MethodLength + result, elapsed = timed do + call_script(:delete_by_digest, [ + digest, + "#{digest}:QUEUED", + "#{digest}:PRIMED", + "#{digest}:LOCKED", + "#{digest}:RUN", + "#{digest}:RUN:QUEUED", + "#{digest}:RUN:PRIMED", + "#{digest}:RUN:LOCKED", + key, + ]) + end + + log_info("#{__method__}(#{digest}) completed in #{elapsed}ms") - raise ArgumentError, "##{__method__} requires either a :digest or a :pattern" + result end # @@ -92,37 +110,5 @@ def page(cursor: 0, pattern: SCAN_PATTERN, page_size: 100) ] end end - - private - - # Deletes unique digests by pattern - # - # @param [String] pattern a key pattern to match with - # @param [Integer] count the maximum number - # @return [Array] with unique digests - def delete_by_pattern(pattern, count: DEFAULT_COUNT) - result, elapsed = timed do - digests = entries(pattern: pattern, count: count).keys - redis { |conn| BatchDelete.call(digests, conn) } - end - - log_info("#{__method__}(#{pattern}, count: #{count}) completed in #{elapsed}ms") - - result - end - - # Delete unique digests by digest - # Also deletes the :AVAILABLE, :EXPIRED etc keys - # - # @param [String] digest a unique digest to delete - def delete_by_digest(digest) - result, elapsed = timed do - call_script(:delete_by_digest, [digest, key]) - end - - log_info("#{__method__}(#{digest}) completed in #{elapsed}ms") - - result - end end end diff --git a/lib/sidekiq_unique_jobs/lua/delete_by_digest.lua b/lib/sidekiq_unique_jobs/lua/delete_by_digest.lua index a70d753cf..081184030 100644 --- a/lib/sidekiq_unique_jobs/lua/delete_by_digest.lua +++ b/lib/sidekiq_unique_jobs/lua/delete_by_digest.lua @@ -1,6 +1,13 @@ -------- BEGIN keys --------- -local digest = KEYS[1] -local digests = KEYS[2] +local digest = KEYS[1] +local queued = KEYS[2] +local primed = KEYS[3] +local locked = KEYS[4] +local run_digest = KEYS[5] +local run_queued = KEYS[6] +local run_primed = KEYS[7] +local run_locked = KEYS[8] +local digests = KEYS[9] -------- END keys --------- -------- BEGIN injected arguments -------- @@ -15,17 +22,6 @@ local redisversion = tostring(ARGV[5]) <%= include_partial "shared/_common.lua" %> ---------- END local functions ---------- --------- BEGIN Variables -------- -local queued = digest .. ":QUEUED" -local primed = digest .. ":PRIMED" -local locked = digest .. ":LOCKED" -local run_digest = digest .. ":RUN" -local run_queued = digest .. ":RUN:QUEUED" -local run_primed = digest .. ":RUN:PRIMED" -local run_locked = digest .. ":RUN:LOCKED" --------- END Variables -------- - - -------- BEGIN delete_by_digest.lua -------- local counter = 0 local redis_version = toversion(redisversion) diff --git a/lib/sidekiq_unique_jobs/on_conflict/replace.rb b/lib/sidekiq_unique_jobs/on_conflict/replace.rb index e322227b7..20e645e61 100644 --- a/lib/sidekiq_unique_jobs/on_conflict/replace.rb +++ b/lib/sidekiq_unique_jobs/on_conflict/replace.rb @@ -64,7 +64,11 @@ def delete_job_by_digest # @return [Integer] the number of keys deleted # def delete_lock - call_script(:delete_by_digest, keys: [unique_digest, DIGESTS]) + digests.delete_by_digest(unique_digest) + end + + def digests + @digests ||= SidekiqUniqueJobs::Digests.new end end end diff --git a/lib/sidekiq_unique_jobs/web.rb b/lib/sidekiq_unique_jobs/web.rb index cbc1b3135..2cf3d710c 100644 --- a/lib/sidekiq_unique_jobs/web.rb +++ b/lib/sidekiq_unique_jobs/web.rb @@ -32,7 +32,7 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize end app.get "/locks/delete_all" do - digests.del(pattern: "*", count: digests.count) + digests.delete_by_pattern("*", count: digests.count) redirect_to :locks end @@ -44,7 +44,7 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize end app.get "/locks/:digest/delete" do - digests.del(digest: params[:digest]) + digests.delete_by_digest(params[:digest]) redirect_to :locks end diff --git a/myapp/.rubocop.yml b/myapp/.rubocop.yml index e4c316ece..709f7ad1c 100644 --- a/myapp/.rubocop.yml +++ b/myapp/.rubocop.yml @@ -2,7 +2,6 @@ inherit_from: ../.rubocop.yml require: - rubocop-performance - - rubocop-rails - rubocop-rspec inherit_mode: diff --git a/myapp/config/initializers/sidekiq.rb b/myapp/config/initializers/sidekiq.rb index c15188539..0be3f09e6 100644 --- a/myapp/config/initializers/sidekiq.rb +++ b/myapp/config/initializers/sidekiq.rb @@ -10,7 +10,8 @@ config.error_handlers << Proc.new {|ex,ctx_hash| p ex, ctx_hash } config.death_handlers << ->(job, _ex) do - SidekiqUniqueJobs::Digests.del(digest: job['unique_digest']) if job['unique_digest'] + digest = job['unique_digest'] + SidekiqUniqueJobs::Digests.delete_by_digest(digest) if digest end # # accepts :expiration (optional) diff --git a/spec/sidekiq_unique_jobs/digests_spec.rb b/spec/sidekiq_unique_jobs/digests_spec.rb index 5ffd681c8..15e42d81a 100644 --- a/spec/sidekiq_unique_jobs/digests_spec.rb +++ b/spec/sidekiq_unique_jobs/digests_spec.rb @@ -30,57 +30,52 @@ it { is_expected.to match_array(expected_keys) } end - describe "#del" do - subject(:del) { digests.del(digest: digest, pattern: pattern, count: count) } + describe "#delete_by_digest" do + subject(:delete_by_digest) { digests.delete_by_digest(digest) } - let(:digest) { nil } - let(:pattern) { nil } - let(:count) { 1000 } + let(:digest) { "uniquejobs:62c11d32fd69c691802579682409a483" } before do allow(digests).to receive(:log_info) end - context "when given nothing" do - let(:digest) { nil } - let(:pattern) { nil } + it "deletes just the specific digest" do + expect { delete_by_digest }.to change { digests.entries.size }.by(-1) + end + + it "logs performance info" do + delete_by_digest - it { expect { del }.to raise_error(ArgumentError, "#del requires either a :digest or a :pattern") } + expect(digests).to have_received(:log_info) + .with( + a_string_starting_with("delete_by_digest(#{digest})") + .and(matching(/completed in (\d+(\.\d+)?)ms/)), + ) end + end - context "when given a pattern" do - let(:pattern) { "*" } + describe "#delete_by_pattern" do + subject(:delete_by_pattern) { digests.delete_by_pattern(pattern, count: count) } - it "deletes all matching digests" do - expect(del).to be_a(Integer) - expect(digests.entries).to match_array([]) - end + let(:pattern) { "*" } + let(:count) { 1000 } - it "logs performance info" do - del - expect(digests) - .to have_received(:log_info).with( - a_string_starting_with("delete_by_pattern(*, count: 1000)") - .and(matching(/completed in (\d+(\.\d+)?)ms/)), - ) - end + before do + allow(digests).to receive(:log_info) end - context "when given a digest" do - let(:digest) { "uniquejobs:62c11d32fd69c691802579682409a483" } - - it "deletes just the specific digest" do - expect { del }.to change { digests.entries.size }.by(-1) - end + it "deletes all matching digests" do + expect(delete_by_pattern).to be_a(Integer) + expect(digests.entries).to match_array([]) + end - it "logs performance info" do - del - expect(digests).to have_received(:log_info) - .with( - a_string_starting_with("delete_by_digest(#{digest})") - .and(matching(/completed in (\d+(\.\d+)?)ms/)), - ) - end + it "logs performance info" do + delete_by_pattern + expect(digests) + .to have_received(:log_info).with( + a_string_starting_with("delete_by_pattern(*, count: 1000)") + .and(matching(/completed in (\d+(\.\d+)?)ms/)), + ) end end end diff --git a/spec/sidekiq_unique_jobs/lua/delete_by_digest_spec.rb b/spec/sidekiq_unique_jobs/lua/delete_by_digest_spec.rb index cd07abb95..f3c3c0f64 100644 --- a/spec/sidekiq_unique_jobs/lua/delete_by_digest_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/delete_by_digest_spec.rb @@ -3,8 +3,21 @@ require "spec_helper" RSpec.describe "delete_by_digest.lua" do - subject(:delete_by_digest) { call_script(:delete_by_digest, [digest, SidekiqUniqueJobs::DIGESTS]) } + subject(:delete_by_digest) { call_script(:delete_by_digest, keys) } + let(:keys) do + [ + key.digest, + key.queued, + key.primed, + key.locked, + run_key.digest, + run_key.queued, + run_key.primed, + run_key.locked, + SidekiqUniqueJobs::DIGESTS, + ] + end let(:job_id) { "jobid" } let(:digest) { "uniquejobs:digest" } let(:key) { SidekiqUniqueJobs::Key.new(digest) } @@ -14,9 +27,9 @@ let(:locked) { redlock.locked } let(:run_key) { SidekiqUniqueJobs::Key.new("#{digest}:RUN") } let(:run_redlock) { SidekiqUniqueJobs::Lock.new(run_key) } - let(:run_queued) { redlock.queued } - let(:run_primed) { redlock.primed } - let(:run_locked) { redlock.locked } + let(:run_queued) { run_redlock.queued } + let(:run_primed) { run_redlock.primed } + let(:run_locked) { run_redlock.locked } let(:lock_ttl) { nil } let(:lock_type) { :until_executed } let(:lock_limit) { 1 }