Skip to content

Commit

Permalink
Move until_expired digests to separate zset
Browse files Browse the repository at this point in the history
  • Loading branch information
francesmcmullin committed Jun 30, 2022
1 parent efb3d89 commit b490edd
Show file tree
Hide file tree
Showing 19 changed files with 163 additions and 38 deletions.
1 change: 1 addition & 0 deletions .reek.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ detectors:
FeatureEnvy:
exclude:
- SidekiqUniqueJobs::BatchDelete#batch_delete
- SidekiqUniqueJobs::Cli#list_entries
- SidekiqUniqueJobs::Digests#page
- SidekiqUniqueJobs::Digests#page
- SidekiqUniqueJobs::InvalidUniqueArguments#initialize
Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
require "sidekiq_unique_jobs/on_conflict"
require "sidekiq_unique_jobs/changelog"
require "sidekiq_unique_jobs/digests"
require "sidekiq_unique_jobs/expiring_digests"

require "sidekiq_unique_jobs/config"
require "sidekiq_unique_jobs/sidekiq_unique_jobs"
Expand Down
41 changes: 33 additions & 8 deletions lib/sidekiq_unique_jobs/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ def self.banner(command, _namespace = nil, _subcommand = false) # rubocop:disabl
option :count, aliases: :c, type: :numeric, default: 1000, desc: "The max number of digests to return"
# :nodoc:
def list(pattern = "*")
entries = digests.entries(pattern: pattern, count: options[:count])
say "Found #{entries.size} digests matching '#{pattern}':"
print_in_columns(entries.sort) if entries.any?
max_count = options[:count]
say "Searching for regular digests"
list_entries(digests.entries(pattern: pattern, count: max_count), pattern)
say "Searching for expiring digests"
list_entries(expiring_digests.entries(pattern: pattern, count: max_count), pattern)
end

desc "del PATTERN", "deletes unique digests from redis by pattern"
Expand All @@ -32,11 +34,9 @@ def list(pattern = "*")
def del(pattern)
max_count = options[:count]
if options[:dry_run]
result = digests.entries(pattern: pattern, count: max_count)
say "Would delete #{result.size} digests matching '#{pattern}'"
count_entries_for_del(max_count, pattern)
else
deleted_count = digests.delete_by_pattern(pattern, count: max_count)
say "Deleted #{deleted_count} digests matching '#{pattern}'"
del_entries(max_count, pattern)
end
end

Expand All @@ -51,12 +51,17 @@ def console
console_class.start
end

no_commands do
no_commands do # rubocop:disable Metrics/BlockLength
# :nodoc:
def digests
@digests ||= SidekiqUniqueJobs::Digests.new
end

# :nodoc:
def expiring_digests
@expiring_digests ||= SidekiqUniqueJobs::ExpiringDigests.new
end

# :nodoc:
def console_class
require "pry"
Expand All @@ -65,6 +70,26 @@ def console_class
require "irb"
IRB
end

# :nodoc:
def list_entries(entries, pattern)
say "Found #{entries.size} digests matching '#{pattern}':"
print_in_columns(entries.sort) if entries.any?
end

# :nodoc:
def count_entries_for_del(max_count, pattern)
count = digests.entries(pattern: pattern, count: max_count).size +
expiring_digests.entries(pattern: pattern, count: max_count).size
say "Would delete #{count} digests matching '#{pattern}'"
end

# :nodoc:
def del_entries(max_count, pattern)
deleted_count = digests.delete_by_pattern(pattern, count: max_count).to_i +
expiring_digests.delete_by_pattern(pattern, count: max_count).to_i
say "Deleted #{deleted_count} digests matching '#{pattern}'"
end
end
end
end
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module SidekiqUniqueJobs
CREATED_AT = "created_at"
DEAD_VERSION = "uniquejobs:dead"
DIGESTS = "uniquejobs:digests"
EXPIRING_DIGESTS = "uniquejobs:expiring_digests"
ERRORS = "errors"
JID = "jid"
LIMIT = "limit"
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq_unique_jobs/digests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class Digests < Redis::SortedSet
# @return [String] the default pattern to use for matching
SCAN_PATTERN = "*"

def initialize
super(DIGESTS)
def initialize(digests_key = DIGESTS)
super(digests_key)
end

#
Expand Down
14 changes: 14 additions & 0 deletions lib/sidekiq_unique_jobs/expiring_digests.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
#
# Class ExpiringDigests provides access to the expiring digests used by until_expired locks
#
# @author Mikael Henriksson <[email protected]>
#
class ExpiringDigests < Digests
def initialize
super(EXPIRING_DIGESTS)
end
end
end
21 changes: 13 additions & 8 deletions lib/sidekiq_unique_jobs/key.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,25 @@ class Key
# @!attribute [r] digests
# @return [String] the zset with locked digests
attr_reader :digests
#
# @!attribute [r] expiring_digests
# @return [String] the zset with locked expiring_digests
attr_reader :expiring_digests

#
# Initialize a new Key
#
# @param [String] digest the digest to use as key
#
def initialize(digest)
@digest = digest
@queued = suffixed_key("QUEUED")
@primed = suffixed_key("PRIMED")
@locked = suffixed_key("LOCKED")
@info = suffixed_key("INFO")
@changelog = CHANGELOGS
@digests = DIGESTS
@digest = digest
@queued = suffixed_key("QUEUED")
@primed = suffixed_key("PRIMED")
@locked = suffixed_key("LOCKED")
@info = suffixed_key("INFO")
@changelog = CHANGELOGS
@digests = DIGESTS
@expiring_digests = EXPIRING_DIGESTS
end

#
Expand Down Expand Up @@ -81,7 +86,7 @@ def ==(other)
# @return [Array] an ordered array with all keys
#
def to_a
[digest, queued, primed, locked, info, changelog, digests]
[digest, queued, primed, locked, info, changelog, digests, expiring_digests]
end

private
Expand Down
24 changes: 15 additions & 9 deletions lib/sidekiq_unique_jobs/lua/lock.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
-------- BEGIN keys ---------
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local expiring_digests = KEYS[8]
-------- END keys ---------


Expand Down Expand Up @@ -57,8 +58,13 @@ if limit_exceeded then
return nil
end

log_debug("ZADD", digests, current_time, digest)
redis.call("ZADD", digests, current_time, digest)
if lock_type == "until_expired" and pttl and pttl > 0 then
log_debug("ZADD", expiring_digests, current_time + pttl, digest)
redis.call("ZADD", expiring_digests, current_time + pttl, digest)
else
log_debug("ZADD", digests, current_time, digest)
redis.call("ZADD", digests, current_time, digest)
end

log_debug("HSET", locked, job_id, current_time)
redis.call("HSET", locked, job_id, current_time)
Expand Down
34 changes: 31 additions & 3 deletions lib/sidekiq_unique_jobs/lua/reap_orphans.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
redis.replicate_commands()

-------- BEGIN keys ---------
local digests_set = KEYS[1]
local schedule_set = KEYS[2]
local retry_set = KEYS[3]
local digests_set = KEYS[1]
local expiring_digests_set = KEYS[2]
local schedule_set = KEYS[3]
local retry_set = KEYS[4]
-------- END keys ---------

-------- BEGIN argv ---------
Expand Down Expand Up @@ -90,5 +91,32 @@ repeat
index = index + per
until index >= total or del_count >= reaper_count

if del_count < reaper_count then
index = 0
total = redis.call("ZCOUNT", expiring_digests_set, 0, current_time)
repeat
local digests = redis.call("ZRANGEBYSCORE", expiring_digests_set, 0, current_time, "LIMIT", index, index + per -1)

for _, digest in pairs(digests) do
local queued = digest .. ":QUEUED"
local primed = digest .. ":PRIMED"
local locked = digest .. ":LOCKED"
local info = digest .. ":INFO"
local run_digest = digest .. ":RUN"
local run_queued = digest .. ":RUN:QUEUED"
local run_primed = digest .. ":RUN:PRIMED"
local run_locked = digest .. ":RUN:LOCKED"
local run_info = digest .. ":RUN:INFO"

redis.call(del_cmd, digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info)

redis.call("ZREM", expiring_digests_set, digest)
del_count = del_count + 1
end

index = index + per
until index >= total or del_count >= reaper_count
end

log_debug("END")
return del_count
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/orphans/lua_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def call
call_script(
:reap_orphans,
conn,
keys: [DIGESTS, SCHEDULE, RETRY, PROCESSES],
keys: [DIGESTS, EXPIRING_DIGESTS, SCHEDULE, RETRY, PROCESSES],
argv: [reaper_count, (Time.now - reaper_timeout).to_f],
)
end
Expand Down
5 changes: 5 additions & 0 deletions lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ def call
return if queues_very_full?

BatchDelete.call(orphans, conn)
BatchDelete.call(expired_digests, conn)
end

def expired_digests
conn.zrangebyscore(EXPIRING_DIGESTS, 0, @start_time)
end

#
Expand Down
25 changes: 22 additions & 3 deletions lib/sidekiq_unique_jobs/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module SidekiqUniqueJobs
#
# @author Mikael Henriksson <[email protected]>
module Web
def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
app.helpers do
include Web::Helpers
end
Expand Down Expand Up @@ -49,8 +49,25 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
erb(unique_template(:locks))
end

app.get "/expiring_locks" do
@filter = params[:filter] || "*"
@filter = "*" if @filter == ""
@count = (params[:count] || 100).to_i
@current_cursor = params[:cursor]
@prev_cursor = params[:prev_cursor]

@total_size, @next_cursor, @locks = expiring_digests.page(
cursor: @current_cursor,
pattern: @filter,
page_size: @count,
)

erb(unique_template(:locks))
end

app.get "/locks/delete_all" do
digests.delete_by_pattern("*", count: digests.count)
expiring_digests.delete_by_pattern("*", count: digests.count)
redirect_to :locks
end

Expand All @@ -63,6 +80,7 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize

app.get "/locks/:digest/delete" do
digests.delete_by_digest(params[:digest])
expiring_digests.delete_by_digest(params[:digest])
redirect_to :locks
end

Expand All @@ -82,8 +100,9 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
require "sidekiq/web" unless defined?(Sidekiq::Web)

Sidekiq::Web.register(SidekiqUniqueJobs::Web)
Sidekiq::Web.tabs["Locks"] = "locks"
Sidekiq::Web.tabs["Changelogs"] = "changelogs"
Sidekiq::Web.tabs["Locks"] = "locks"
Sidekiq::Web.tabs["Expiring Locks"] = "expiring_locks"
Sidekiq::Web.tabs["Changelogs"] = "changelogs"
Sidekiq::Web.settings.locales << File.join(File.dirname(__FILE__), "locales")
rescue NameError, LoadError => ex
SidekiqUniqueJobs.logger.error(ex)
Expand Down
10 changes: 10 additions & 0 deletions lib/sidekiq_unique_jobs/web/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ def digests
@digests ||= SidekiqUniqueJobs::Digests.new
end

#
# The collection of digests
#
#
# @return [SidekiqUniqueJobs::ExpiringDigests] the sorted set with expiring digests
#
def expiring_digests
@expiring_digests ||= SidekiqUniqueJobs::ExpiringDigests.new
end

#
# The collection of changelog entries
#
Expand Down
9 changes: 8 additions & 1 deletion spec/sidekiq_unique_jobs/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,14 @@
subject(:list) { capture(:stdout) { described_class.start(%w[list * --count 1000]) } }

context "when no digests exist" do
it { is_expected.to eq("Found 0 digests matching '#{pattern}':\n") }
it do
expect(list).to eq <<~HEADER
Searching for regular digests
Found 0 digests matching '#{pattern}':
Searching for expiring digests
Found 0 digests matching '#{pattern}':
HEADER
end
end

context "when a key exists" do
Expand Down
1 change: 1 addition & 0 deletions spec/sidekiq_unique_jobs/key_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#{digest_one}:INFO
uniquejobs:changelog
uniquejobs:digests
uniquejobs:expiring_digests
],
)
end
Expand Down
2 changes: 1 addition & 1 deletion spec/sidekiq_unique_jobs/lua/lock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
it { expect { lock }.to change { hget(key.locked, job_id_one) }.from(nil) }
it { expect { lock }.to change { llen(key.queued) }.by(0) }
it { expect { lock }.to change { llen(key.primed) }.by(-1) }
it { expect { lock }.to change { zcard("uniquejobs:digests") }.by(1) }
it { expect { lock }.to change { zcard("uniquejobs:expiring_digests") }.by(1) }
end

context "when given lock_ttl nil" do
Expand Down
1 change: 1 addition & 0 deletions spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
let(:redis_keys) do
[
SidekiqUniqueJobs::DIGESTS,
SidekiqUniqueJobs::EXPIRING_DIGESTS,
SidekiqUniqueJobs::SCHEDULE,
SidekiqUniqueJobs::RETRY,
]
Expand Down
2 changes: 1 addition & 1 deletion spec/sidekiq_unique_jobs/lua/unlock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
it { expect { unlock }.to change { llen(key.primed) }.by(0) }
it { expect { unlock }.not_to change { ttl(key.locked) } }
it { expect { unlock }.not_to change { hget(key.locked, job_id_one) } }
it { expect { unlock }.to change { zcard(key.digests) }.by(-1) }
it { expect { unlock }.not_to change { zcard(key.expiring_digests) } }
end

context "when given lock_ttl nil" do
Expand Down
Loading

0 comments on commit b490edd

Please sign in to comment.