Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move until_expired digests to separate zset #721

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .reek.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ detectors:
FeatureEnvy:
exclude:
- SidekiqUniqueJobs::BatchDelete#batch_delete
- SidekiqUniqueJobs::Cli#list_entries
- SidekiqUniqueJobs::Digests#page
- SidekiqUniqueJobs::Digests#page
- SidekiqUniqueJobs::InvalidUniqueArguments#initialize
Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
require "sidekiq_unique_jobs/on_conflict"
require "sidekiq_unique_jobs/changelog"
require "sidekiq_unique_jobs/digests"
require "sidekiq_unique_jobs/expiring_digests"

require "sidekiq_unique_jobs/config"
require "sidekiq_unique_jobs/sidekiq_unique_jobs"
Expand Down
41 changes: 33 additions & 8 deletions lib/sidekiq_unique_jobs/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ def self.banner(command, _namespace = nil, _subcommand = false) # rubocop:disabl
option :count, aliases: :c, type: :numeric, default: 1000, desc: "The max number of digests to return"
# :nodoc:
def list(pattern = "*")
entries = digests.entries(pattern: pattern, count: options[:count])
say "Found #{entries.size} digests matching '#{pattern}':"
print_in_columns(entries.sort) if entries.any?
max_count = options[:count]
say "Searching for regular digests"
list_entries(digests.entries(pattern: pattern, count: max_count), pattern)
say "Searching for expiring digests"
list_entries(expiring_digests.entries(pattern: pattern, count: max_count), pattern)
end

desc "del PATTERN", "deletes unique digests from redis by pattern"
Expand All @@ -32,11 +34,9 @@ def list(pattern = "*")
def del(pattern)
max_count = options[:count]
if options[:dry_run]
result = digests.entries(pattern: pattern, count: max_count)
say "Would delete #{result.size} digests matching '#{pattern}'"
count_entries_for_del(max_count, pattern)
else
deleted_count = digests.delete_by_pattern(pattern, count: max_count)
say "Deleted #{deleted_count} digests matching '#{pattern}'"
del_entries(max_count, pattern)
end
end

Expand All @@ -51,12 +51,17 @@ def console
console_class.start
end

no_commands do
no_commands do # rubocop:disable Metrics/BlockLength
# :nodoc:
def digests
@digests ||= SidekiqUniqueJobs::Digests.new
end

# :nodoc:
def expiring_digests
@expiring_digests ||= SidekiqUniqueJobs::ExpiringDigests.new
end

# :nodoc:
def console_class
require "pry"
Expand All @@ -65,6 +70,26 @@ def console_class
require "irb"
IRB
end

# :nodoc:
def list_entries(entries, pattern)
say "Found #{entries.size} digests matching '#{pattern}':"
print_in_columns(entries.sort) if entries.any?
end

# :nodoc:
def count_entries_for_del(max_count, pattern)
count = digests.entries(pattern: pattern, count: max_count).size +
expiring_digests.entries(pattern: pattern, count: max_count).size
say "Would delete #{count} digests matching '#{pattern}'"
end

# :nodoc:
def del_entries(max_count, pattern)
deleted_count = digests.delete_by_pattern(pattern, count: max_count).to_i +
expiring_digests.delete_by_pattern(pattern, count: max_count).to_i
say "Deleted #{deleted_count} digests matching '#{pattern}'"
end
end
end
end
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module SidekiqUniqueJobs
CREATED_AT = "created_at"
DEAD_VERSION = "uniquejobs:dead"
DIGESTS = "uniquejobs:digests"
EXPIRING_DIGESTS = "uniquejobs:expiring_digests"
ERRORS = "errors"
JID = "jid"
LIMIT = "limit"
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq_unique_jobs/digests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class Digests < Redis::SortedSet
# @return [String] the default pattern to use for matching
SCAN_PATTERN = "*"

def initialize
super(DIGESTS)
def initialize(digests_key = DIGESTS)
super(digests_key)
end

#
Expand Down
14 changes: 14 additions & 0 deletions lib/sidekiq_unique_jobs/expiring_digests.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
#
# Class ExpiringDigests provides access to the expiring digests used by until_expired locks
#
# @author Mikael Henriksson <[email protected]>
#
class ExpiringDigests < Digests
def initialize
super(EXPIRING_DIGESTS)
end
end
end
21 changes: 13 additions & 8 deletions lib/sidekiq_unique_jobs/key.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,25 @@ class Key
# @!attribute [r] digests
# @return [String] the zset with locked digests
attr_reader :digests
#
# @!attribute [r] expiring_digests
# @return [String] the zset with locked expiring_digests
attr_reader :expiring_digests

#
# Initialize a new Key
#
# @param [String] digest the digest to use as key
#
def initialize(digest)
@digest = digest
@queued = suffixed_key("QUEUED")
@primed = suffixed_key("PRIMED")
@locked = suffixed_key("LOCKED")
@info = suffixed_key("INFO")
@changelog = CHANGELOGS
@digests = DIGESTS
@digest = digest
@queued = suffixed_key("QUEUED")
@primed = suffixed_key("PRIMED")
@locked = suffixed_key("LOCKED")
@info = suffixed_key("INFO")
@changelog = CHANGELOGS
@digests = DIGESTS
@expiring_digests = EXPIRING_DIGESTS
end

#
Expand Down Expand Up @@ -81,7 +86,7 @@ def ==(other)
# @return [Array] an ordered array with all keys
#
def to_a
[digest, queued, primed, locked, info, changelog, digests]
[digest, queued, primed, locked, info, changelog, digests, expiring_digests]
end

private
Expand Down
24 changes: 15 additions & 9 deletions lib/sidekiq_unique_jobs/lua/lock.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
-------- BEGIN keys ---------
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local expiring_digests = KEYS[8]
-------- END keys ---------


Expand Down Expand Up @@ -57,8 +58,13 @@ if limit_exceeded then
return nil
end

log_debug("ZADD", digests, current_time, digest)
redis.call("ZADD", digests, current_time, digest)
if lock_type == "until_expired" and pttl and pttl > 0 then
log_debug("ZADD", expiring_digests, current_time + pttl, digest)
redis.call("ZADD", expiring_digests, current_time + pttl, digest)
else
log_debug("ZADD", digests, current_time, digest)
redis.call("ZADD", digests, current_time, digest)
end

log_debug("HSET", locked, job_id, current_time)
redis.call("HSET", locked, job_id, current_time)
Expand Down
34 changes: 31 additions & 3 deletions lib/sidekiq_unique_jobs/lua/reap_orphans.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
redis.replicate_commands()

-------- BEGIN keys ---------
local digests_set = KEYS[1]
local schedule_set = KEYS[2]
local retry_set = KEYS[3]
local digests_set = KEYS[1]
local expiring_digests_set = KEYS[2]
local schedule_set = KEYS[3]
local retry_set = KEYS[4]
-------- END keys ---------

-------- BEGIN argv ---------
Expand Down Expand Up @@ -90,5 +91,32 @@ repeat
index = index + per
until index >= total or del_count >= reaper_count

if del_count < reaper_count then
index = 0
total = redis.call("ZCOUNT", expiring_digests_set, 0, current_time)
repeat
local digests = redis.call("ZRANGEBYSCORE", expiring_digests_set, 0, current_time, "LIMIT", index, index + per -1)

for _, digest in pairs(digests) do
local queued = digest .. ":QUEUED"
local primed = digest .. ":PRIMED"
local locked = digest .. ":LOCKED"
local info = digest .. ":INFO"
local run_digest = digest .. ":RUN"
local run_queued = digest .. ":RUN:QUEUED"
local run_primed = digest .. ":RUN:PRIMED"
local run_locked = digest .. ":RUN:LOCKED"
local run_info = digest .. ":RUN:INFO"

redis.call(del_cmd, digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info)

redis.call("ZREM", expiring_digests_set, digest)
del_count = del_count + 1
end

index = index + per
until index >= total or del_count >= reaper_count
end

log_debug("END")
return del_count
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/orphans/lua_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def call
call_script(
:reap_orphans,
conn,
keys: [DIGESTS, SCHEDULE, RETRY, PROCESSES],
keys: [DIGESTS, EXPIRING_DIGESTS, SCHEDULE, RETRY, PROCESSES],
argv: [reaper_count, (Time.now - reaper_timeout).to_f],
)
end
Expand Down
5 changes: 5 additions & 0 deletions lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ def call
return if queues_very_full?

BatchDelete.call(orphans, conn)
BatchDelete.call(expired_digests, conn)
end

def expired_digests
conn.zrangebyscore(EXPIRING_DIGESTS, 0, @start_time)
end

#
Expand Down
25 changes: 22 additions & 3 deletions lib/sidekiq_unique_jobs/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module SidekiqUniqueJobs
#
# @author Mikael Henriksson <[email protected]>
module Web
def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
app.helpers do
include Web::Helpers
end
Expand Down Expand Up @@ -49,8 +49,25 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
erb(unique_template(:locks))
end

app.get "/expiring_locks" do
@filter = params[:filter] || "*"
@filter = "*" if @filter == ""
@count = (params[:count] || 100).to_i
@current_cursor = params[:cursor]
@prev_cursor = params[:prev_cursor]

@total_size, @next_cursor, @locks = expiring_digests.page(
cursor: @current_cursor,
pattern: @filter,
page_size: @count,
)

erb(unique_template(:locks))
end

app.get "/locks/delete_all" do
digests.delete_by_pattern("*", count: digests.count)
expiring_digests.delete_by_pattern("*", count: digests.count)
redirect_to :locks
end

Expand All @@ -63,6 +80,7 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize

app.get "/locks/:digest/delete" do
digests.delete_by_digest(params[:digest])
expiring_digests.delete_by_digest(params[:digest])
redirect_to :locks
end

Expand All @@ -82,8 +100,9 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
require "sidekiq/web" unless defined?(Sidekiq::Web)

Sidekiq::Web.register(SidekiqUniqueJobs::Web)
Sidekiq::Web.tabs["Locks"] = "locks"
Sidekiq::Web.tabs["Changelogs"] = "changelogs"
Sidekiq::Web.tabs["Locks"] = "locks"
Sidekiq::Web.tabs["Expiring Locks"] = "expiring_locks"
Sidekiq::Web.tabs["Changelogs"] = "changelogs"
Sidekiq::Web.settings.locales << File.join(File.dirname(__FILE__), "locales")
rescue NameError, LoadError => ex
SidekiqUniqueJobs.logger.error(ex)
Expand Down
10 changes: 10 additions & 0 deletions lib/sidekiq_unique_jobs/web/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ def digests
@digests ||= SidekiqUniqueJobs::Digests.new
end

#
# The collection of digests
#
#
# @return [SidekiqUniqueJobs::ExpiringDigests] the sorted set with expiring digests
#
def expiring_digests
@expiring_digests ||= SidekiqUniqueJobs::ExpiringDigests.new
end

#
# The collection of changelog entries
#
Expand Down
9 changes: 8 additions & 1 deletion spec/sidekiq_unique_jobs/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,14 @@
subject(:list) { capture(:stdout) { described_class.start(%w[list * --count 1000]) } }

context "when no digests exist" do
it { is_expected.to eq("Found 0 digests matching '#{pattern}':\n") }
it do
expect(list).to eq <<~HEADER
Searching for regular digests
Found 0 digests matching '#{pattern}':
Searching for expiring digests
Found 0 digests matching '#{pattern}':
HEADER
end
end

context "when a key exists" do
Expand Down
1 change: 1 addition & 0 deletions spec/sidekiq_unique_jobs/key_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#{digest_one}:INFO
uniquejobs:changelog
uniquejobs:digests
uniquejobs:expiring_digests
],
)
end
Expand Down
2 changes: 1 addition & 1 deletion spec/sidekiq_unique_jobs/lua/lock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
it { expect { lock }.to change { hget(key.locked, job_id_one) }.from(nil) }
it { expect { lock }.to change { llen(key.queued) }.by(0) }
it { expect { lock }.to change { llen(key.primed) }.by(-1) }
it { expect { lock }.to change { zcard("uniquejobs:digests") }.by(1) }
it { expect { lock }.to change { zcard("uniquejobs:expiring_digests") }.by(1) }
end

context "when given lock_ttl nil" do
Expand Down
1 change: 1 addition & 0 deletions spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
let(:redis_keys) do
[
SidekiqUniqueJobs::DIGESTS,
SidekiqUniqueJobs::EXPIRING_DIGESTS,
SidekiqUniqueJobs::SCHEDULE,
SidekiqUniqueJobs::RETRY,
]
Expand Down
2 changes: 1 addition & 1 deletion spec/sidekiq_unique_jobs/lua/unlock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
it { expect { unlock }.to change { llen(key.primed) }.by(0) }
it { expect { unlock }.not_to change { ttl(key.locked) } }
it { expect { unlock }.not_to change { hget(key.locked, job_id_one) } }
it { expect { unlock }.to change { zcard(key.digests) }.by(-1) }
it { expect { unlock }.not_to change { zcard(key.expiring_digests) } }
end

context "when given lock_ttl nil" do
Expand Down
Loading