Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make deletion compatible with redis-namespace #452

Merged
merged 1 commit into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,8 @@ For sidekiq versions before 5.1 a `sidekiq_retries_exhausted` block is required
```ruby
class MyWorker
sidekiq_retries_exhausted do |msg, _ex|
SidekiqUniqueJobs::Digests.del(digest: msg['unique_digest']) if msg['unique_digest']
digest = msg['unique_digest']
SidekiqUniqueJobs::Digests.delete_by_digest(digest) if digest
end
end
```
Expand All @@ -612,7 +613,8 @@ Starting in v5.1, Sidekiq can also fire a global callback when a job dies:
# this goes in your initializer
Sidekiq.configure_server do |config|
config.death_handlers << ->(job, _ex) do
SidekiqUniqueJobs::Digests.del(digest: job['unique_digest']) if job['unique_digest']
digest = msg['unique_digest']
SidekiqUniqueJobs::Digests.delete_by_digest(digest) if digest
end
end
```
Expand Down
16 changes: 10 additions & 6 deletions lib/sidekiq_unique_jobs/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ def self.banner(command, _namespace = nil, _subcommand = false)
desc "list PATTERN", "list all unique digests and their expiry time"
option :count, aliases: :c, type: :numeric, default: 1000, desc: "The max number of digests to return"
def list(pattern = "*")
digests = SidekiqUniqueJobs::Digests.new.entries(pattern: pattern, count: options[:count])
say "Found #{digests.size} digests matching '#{pattern}':"
print_in_columns(digests.sort) if digests.any?
entries = digests.entries(pattern: pattern, count: options[:count])
say "Found #{entries.size} digests matching '#{pattern}':"
print_in_columns(entries.sort) if entries.any?
end

desc "del PATTERN", "deletes unique digests from redis by pattern"
Expand All @@ -27,10 +27,10 @@ def list(pattern = "*")
def del(pattern)
max_count = options[:count]
if options[:dry_run]
digests = SidekiqUniqueJobs::Digests.new.entries(pattern: pattern, count: max_count)
say "Would delete #{digests.size} digests matching '#{pattern}'"
result = digests.entries(pattern: pattern, count: max_count)
say "Would delete #{result.size} digests matching '#{pattern}'"
else
deleted_count = SidekiqUniqueJobs::Digests.new.del(pattern: pattern, count: max_count)
deleted_count = digests.delete_by_pattern(pattern, count: max_count)
say "Deleted #{deleted_count} digests matching '#{pattern}'"
end
end
Expand All @@ -46,6 +46,10 @@ def console
end

no_commands do
def digests
@digests ||= SidekiqUniqueJobs::Digests.new
end

def console_class
require "pry"
Pry
Expand Down
82 changes: 34 additions & 48 deletions lib/sidekiq_unique_jobs/digests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,44 @@ def add(digest)
redis { |conn| conn.zadd(key, now_f, digest) }
end

# Deletes unique digests by pattern
#
# Deletes unique digest either by a digest or pattern
#
# @overload call_script(digest: "abcdefab")
# Call script with digest
# @param [String] digest: a digest to delete
# @overload call_script(pattern: "*", count: 1_000)
# Call script with pattern
# @param [String] pattern: "*" a pattern to match
# @param [String] count: DEFAULT_COUNT the number of keys to delete
#
# @raise [ArgumentError] when given neither pattern nor digest
#
# @param [String] pattern a key pattern to match with
# @param [Integer] count the maximum number
# @return [Array<String>] with unique digests
def delete_by_pattern(pattern, count: DEFAULT_COUNT)
result, elapsed = timed do
digests = entries(pattern: pattern, count: count).keys
redis { |conn| BatchDelete.call(digests, conn) }
end

log_info("#{__method__}(#{pattern}, count: #{count}) completed in #{elapsed}ms")

result
end

# Delete unique digests by digest
# Also deletes the :AVAILABLE, :EXPIRED etc keys
#
def del(digest: nil, pattern: nil, count: DEFAULT_COUNT)
return delete_by_pattern(pattern, count: count) if pattern
return delete_by_digest(digest) if digest
# @param [String] digest a unique digest to delete
def delete_by_digest(digest) # rubocop:disable Metrics/MethodLength
result, elapsed = timed do
call_script(:delete_by_digest, [
digest,
"#{digest}:QUEUED",
"#{digest}:PRIMED",
"#{digest}:LOCKED",
"#{digest}:RUN",
"#{digest}:RUN:QUEUED",
"#{digest}:RUN:PRIMED",
"#{digest}:RUN:LOCKED",
key,
])
end

log_info("#{__method__}(#{digest}) completed in #{elapsed}ms")

raise ArgumentError, "##{__method__} requires either a :digest or a :pattern"
result
end

#
Expand Down Expand Up @@ -92,37 +110,5 @@ def page(cursor: 0, pattern: SCAN_PATTERN, page_size: 100)
]
end
end

private

# Deletes unique digests by pattern
#
# @param [String] pattern a key pattern to match with
# @param [Integer] count the maximum number
# @return [Array<String>] with unique digests
def delete_by_pattern(pattern, count: DEFAULT_COUNT)
result, elapsed = timed do
digests = entries(pattern: pattern, count: count).keys
redis { |conn| BatchDelete.call(digests, conn) }
end

log_info("#{__method__}(#{pattern}, count: #{count}) completed in #{elapsed}ms")

result
end

# Delete unique digests by digest
# Also deletes the :AVAILABLE, :EXPIRED etc keys
#
# @param [String] digest a unique digest to delete
def delete_by_digest(digest)
result, elapsed = timed do
call_script(:delete_by_digest, [digest, key])
end

log_info("#{__method__}(#{digest}) completed in #{elapsed}ms")

result
end
end
end
22 changes: 9 additions & 13 deletions lib/sidekiq_unique_jobs/lua/delete_by_digest.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
-------- BEGIN keys ---------
local digest = KEYS[1]
local digests = KEYS[2]
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local run_digest = KEYS[5]
local run_queued = KEYS[6]
local run_primed = KEYS[7]
local run_locked = KEYS[8]
local digests = KEYS[9]
-------- END keys ---------

-------- BEGIN injected arguments --------
Expand All @@ -15,17 +22,6 @@ local redisversion = tostring(ARGV[5])
<%= include_partial "shared/_common.lua" %>
---------- END local functions ----------

-------- BEGIN Variables --------
local queued = digest .. ":QUEUED"
local primed = digest .. ":PRIMED"
local locked = digest .. ":LOCKED"
local run_digest = digest .. ":RUN"
local run_queued = digest .. ":RUN:QUEUED"
local run_primed = digest .. ":RUN:PRIMED"
local run_locked = digest .. ":RUN:LOCKED"
-------- END Variables --------


-------- BEGIN delete_by_digest.lua --------
local counter = 0
local redis_version = toversion(redisversion)
Expand Down
6 changes: 5 additions & 1 deletion lib/sidekiq_unique_jobs/on_conflict/replace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ def delete_job_by_digest
# @return [Integer] the number of keys deleted
#
def delete_lock
call_script(:delete_by_digest, keys: [unique_digest, DIGESTS])
digests.delete_by_digest(unique_digest)
end

def digests
@digests ||= SidekiqUniqueJobs::Digests.new
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq_unique_jobs/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
end

app.get "/locks/delete_all" do
digests.del(pattern: "*", count: digests.count)
digests.delete_by_pattern("*", count: digests.count)
redirect_to :locks
end

Expand All @@ -44,7 +44,7 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
end

app.get "/locks/:digest/delete" do
digests.del(digest: params[:digest])
digests.delete_by_digest(params[:digest])
redirect_to :locks
end

Expand Down
1 change: 0 additions & 1 deletion myapp/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ inherit_from: ../.rubocop.yml

require:
- rubocop-performance
- rubocop-rails
- rubocop-rspec

inherit_mode:
Expand Down
3 changes: 2 additions & 1 deletion myapp/config/initializers/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
config.error_handlers << Proc.new {|ex,ctx_hash| p ex, ctx_hash }

config.death_handlers << ->(job, _ex) do
SidekiqUniqueJobs::Digests.del(digest: job['unique_digest']) if job['unique_digest']
digest = job['unique_digest']
SidekiqUniqueJobs::Digests.delete_by_digest(digest) if digest
end

# # accepts :expiration (optional)
Expand Down
69 changes: 32 additions & 37 deletions spec/sidekiq_unique_jobs/digests_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,57 +30,52 @@
it { is_expected.to match_array(expected_keys) }
end

describe "#del" do
subject(:del) { digests.del(digest: digest, pattern: pattern, count: count) }
describe "#delete_by_digest" do
subject(:delete_by_digest) { digests.delete_by_digest(digest) }

let(:digest) { nil }
let(:pattern) { nil }
let(:count) { 1000 }
let(:digest) { "uniquejobs:62c11d32fd69c691802579682409a483" }

before do
allow(digests).to receive(:log_info)
end

context "when given nothing" do
let(:digest) { nil }
let(:pattern) { nil }
it "deletes just the specific digest" do
expect { delete_by_digest }.to change { digests.entries.size }.by(-1)
end

it "logs performance info" do
delete_by_digest

it { expect { del }.to raise_error(ArgumentError, "#del requires either a :digest or a :pattern") }
expect(digests).to have_received(:log_info)
.with(
a_string_starting_with("delete_by_digest(#{digest})")
.and(matching(/completed in (\d+(\.\d+)?)ms/)),
)
end
end

context "when given a pattern" do
let(:pattern) { "*" }
describe "#delete_by_pattern" do
subject(:delete_by_pattern) { digests.delete_by_pattern(pattern, count: count) }

it "deletes all matching digests" do
expect(del).to be_a(Integer)
expect(digests.entries).to match_array([])
end
let(:pattern) { "*" }
let(:count) { 1000 }

it "logs performance info" do
del
expect(digests)
.to have_received(:log_info).with(
a_string_starting_with("delete_by_pattern(*, count: 1000)")
.and(matching(/completed in (\d+(\.\d+)?)ms/)),
)
end
before do
allow(digests).to receive(:log_info)
end

context "when given a digest" do
let(:digest) { "uniquejobs:62c11d32fd69c691802579682409a483" }

it "deletes just the specific digest" do
expect { del }.to change { digests.entries.size }.by(-1)
end
it "deletes all matching digests" do
expect(delete_by_pattern).to be_a(Integer)
expect(digests.entries).to match_array([])
end

it "logs performance info" do
del
expect(digests).to have_received(:log_info)
.with(
a_string_starting_with("delete_by_digest(#{digest})")
.and(matching(/completed in (\d+(\.\d+)?)ms/)),
)
end
it "logs performance info" do
delete_by_pattern
expect(digests)
.to have_received(:log_info).with(
a_string_starting_with("delete_by_pattern(*, count: 1000)")
.and(matching(/completed in (\d+(\.\d+)?)ms/)),
)
end
end
end
21 changes: 17 additions & 4 deletions spec/sidekiq_unique_jobs/lua/delete_by_digest_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,21 @@
require "spec_helper"

RSpec.describe "delete_by_digest.lua" do
subject(:delete_by_digest) { call_script(:delete_by_digest, [digest, SidekiqUniqueJobs::DIGESTS]) }
subject(:delete_by_digest) { call_script(:delete_by_digest, keys) }

let(:keys) do
[
key.digest,
key.queued,
key.primed,
key.locked,
run_key.digest,
run_key.queued,
run_key.primed,
run_key.locked,
SidekiqUniqueJobs::DIGESTS,
]
end
let(:job_id) { "jobid" }
let(:digest) { "uniquejobs:digest" }
let(:key) { SidekiqUniqueJobs::Key.new(digest) }
Expand All @@ -14,9 +27,9 @@
let(:locked) { redlock.locked }
let(:run_key) { SidekiqUniqueJobs::Key.new("#{digest}:RUN") }
let(:run_redlock) { SidekiqUniqueJobs::Lock.new(run_key) }
let(:run_queued) { redlock.queued }
let(:run_primed) { redlock.primed }
let(:run_locked) { redlock.locked }
let(:run_queued) { run_redlock.queued }
let(:run_primed) { run_redlock.primed }
let(:run_locked) { run_redlock.locked }
let(:lock_ttl) { nil }
let(:lock_type) { :until_executed }
let(:lock_limit) { 1 }
Expand Down