Skip to content

Commit

Permalink
use scheduled-jid for locking schedule queue
Browse files Browse the repository at this point in the history
 * split old_unique_for? into unique_schedule_old and unique_enqueue_old
  • Loading branch information
pik committed Aug 19, 2015
1 parent 1830400 commit c062cac
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 27 deletions.
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/client/extensions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Extensions
def lock_queue_script
<<-LUA
local ret = redis.call('GET', KEYS[1])
if not ret or ret == 'scheduled' then
if not ret or string.sub(ret, 1, 9) == 'scheduled' then
return redis.call('SETEX', KEYS[1], ARGV[1], ARGV[2])
end
LUA
Expand Down
42 changes: 26 additions & 16 deletions lib/sidekiq_unique_jobs/client/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,38 +42,48 @@ def storage_method
SidekiqUniqueJobs.config.unique_storage_method
end

# rubocop:disable MethodLength
def old_unique_for?
item['at'] ? unique_schedule_old : unique_enqueue_old
end

def unique_enqueue_old
connection do |conn|
conn.watch(payload_hash)
pid = conn.get(payload_hash)
if pid && (pid != 'scheduled' || item['at'])
conn.unwatch
nil
else
lock_val = conn.get(payload_hash)
if !lock_val || lock_val[0..8] == 'scheduled'.freeze
conn.multi do
if expires_at > 0
conn.setex(payload_hash, expires_at, item['at'] ? 'scheduled' : item['jid'])
else
conn.del(payload_hash)
end
conn.setex(payload_hash, expires_at, item['jid'])
end
else
conn.unwatch
false
end
end
end

def unique_schedule_old
connection do |conn|
conn.watch(payload_hash)
if expires_at < 0 || conn.get(payload_hash)
conn.unwatch
false
else
conn.setex(payload_hash, expires_at, "scheduled-#{item['jid']}")
end
end
end
# rubocop:enable MethodLength

def new_unique_for?
item['at'] ? unique_for_schedule : unique_for_queue
item['at'] ? unique_schedule : unique_enqueue
end

def unique_for_schedule
def unique_schedule
connection do |conn|
conn.set(payload_hash, 'scheduled', nx: true, ex: expires_at)
conn.set(payload_hash, "scheduled-#{item['jid']}", nx: true, ex: expires_at)
end
end

def unique_for_queue
def unique_enqueue
connection do |conn|
conn.eval(lock_queue_script, keys: [payload_hash], argv: [expires_at, item['jid']])
end
Expand Down
9 changes: 9 additions & 0 deletions lib/sidekiq_unique_jobs/server/extensions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ def remove_on_match
end
LUA
end

def remove_scheduled_on_match
<<-LUA
local ret = redis.call('GET', KEYS[1])
if ret and string.sub(ret, 11, -1) == ARGV[1] then
redis.call('DEL', KEYS[1])
end
LUA
end
end
end
end
7 changes: 5 additions & 2 deletions lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ def payload_hash(item)

def unlock(lock_key, item)
Sidekiq.redis do |con|
val = @parent && @parent.name == 'schedule' ? 'scheduled' : item['jid']
con.eval(remove_on_match, keys: [lock_key], argv: [val])
if @parent && @parent.name == 'schedule'.freeze
con.eval(remove_scheduled_on_match, keys: [lock_key], argv: [item['jid']])
else
con.eval(remove_on_match, keys: [lock_key], argv: [item['jid']])
end
end
end
end
Expand Down
18 changes: 10 additions & 8 deletions spec/lib/sidekiq_unique_ext_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ def perform
Sidekiq.redis(&:flushdb)
end

it 'deletes uniqueness locks on clear' do
params = { foo: 'bar' }
payload_hash = SidekiqUniqueJobs.get_payload('JustAWorker', 'testqueue', [params])
JustAWorker.perform_in(60 * 60 * 3, foo: 'bar')
set = Sidekiq::JobSet.new('schedule')
set.clear
Sidekiq.redis do |c|
expect(c.exists(payload_hash)).to be_falsy
context 'scheduled jobs' do
it 'deletes uniqueness locks on clear' do
params = { foo: 'bar' }
payload_hash = SidekiqUniqueJobs.get_payload('JustAWorker', 'testqueue', [params])
JustAWorker.perform_in(60 * 60 * 3, foo: 'bar')
set = Sidekiq::JobSet.new('schedule')
set.clear
Sidekiq.redis do |c|
expect(c.exists(payload_hash)).to be_falsy
end
end
end
end

0 comments on commit c062cac

Please sign in to comment.