From c062cacd47a69dd90238a3be22a7a34a49a15635 Mon Sep 17 00:00:00 2001 From: pik Date: Wed, 19 Aug 2015 18:26:46 +0800 Subject: [PATCH] use scheduled-jid for locking schedule queue * split old_unique_for? into unique_schedule_old and unique_enqueue_old --- lib/sidekiq_unique_jobs/client/extensions.rb | 2 +- lib/sidekiq_unique_jobs/client/middleware.rb | 42 ++++++++++++------- lib/sidekiq_unique_jobs/server/extensions.rb | 9 ++++ lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb | 7 +++- spec/lib/sidekiq_unique_ext_spec.rb | 18 ++++---- 5 files changed, 51 insertions(+), 27 deletions(-) diff --git a/lib/sidekiq_unique_jobs/client/extensions.rb b/lib/sidekiq_unique_jobs/client/extensions.rb index 8899d0c14..654254c60 100644 --- a/lib/sidekiq_unique_jobs/client/extensions.rb +++ b/lib/sidekiq_unique_jobs/client/extensions.rb @@ -4,7 +4,7 @@ module Extensions def lock_queue_script <<-LUA local ret = redis.call('GET', KEYS[1]) - if not ret or ret == 'scheduled' then + if not ret or string.sub(ret, 1, 9) == 'scheduled' then return redis.call('SETEX', KEYS[1], ARGV[1], ARGV[2]) end LUA diff --git a/lib/sidekiq_unique_jobs/client/middleware.rb b/lib/sidekiq_unique_jobs/client/middleware.rb index 136916452..738f5042b 100644 --- a/lib/sidekiq_unique_jobs/client/middleware.rb +++ b/lib/sidekiq_unique_jobs/client/middleware.rb @@ -42,38 +42,48 @@ def storage_method SidekiqUniqueJobs.config.unique_storage_method end - # rubocop:disable MethodLength def old_unique_for? + item['at'] ? unique_schedule_old : unique_enqueue_old + end + + def unique_enqueue_old connection do |conn| conn.watch(payload_hash) - pid = conn.get(payload_hash) - if pid && (pid != 'scheduled' || item['at']) - conn.unwatch - nil - else + lock_val = conn.get(payload_hash) + if !lock_val || lock_val[0..8] == 'scheduled'.freeze conn.multi do - if expires_at > 0 - conn.setex(payload_hash, expires_at, item['at'] ? 'scheduled' : item['jid']) - else - conn.del(payload_hash) - end + conn.setex(payload_hash, expires_at, item['jid']) end + else + conn.unwatch + false + end + end + end + + def unique_schedule_old + connection do |conn| + conn.watch(payload_hash) + if expires_at < 0 || conn.get(payload_hash) + conn.unwatch + false + else + conn.setex(payload_hash, expires_at, "scheduled-#{item['jid']}") end end end - # rubocop:enable MethodLength def new_unique_for? - item['at'] ? unique_for_schedule : unique_for_queue + item['at'] ? unique_schedule : unique_enqueue end - def unique_for_schedule + def unique_schedule connection do |conn| - conn.set(payload_hash, 'scheduled', nx: true, ex: expires_at) + conn.set(payload_hash, "scheduled-#{item['jid']}", nx: true, ex: expires_at) end end - def unique_for_queue + def unique_enqueue connection do |conn| conn.eval(lock_queue_script, keys: [payload_hash], argv: [expires_at, item['jid']]) end diff --git a/lib/sidekiq_unique_jobs/server/extensions.rb b/lib/sidekiq_unique_jobs/server/extensions.rb index 43a8c742f..abf68e051 100644 --- a/lib/sidekiq_unique_jobs/server/extensions.rb +++ b/lib/sidekiq_unique_jobs/server/extensions.rb @@ -8,6 +8,15 @@ def remove_on_match end LUA end + + def remove_scheduled_on_match + <<-LUA + local ret = redis.call('GET', KEYS[1]) + if ret and string.sub(ret, 11, -1) == ARGV[1] then + redis.call('DEL', KEYS[1]) + end + LUA + end end end end diff --git a/lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb b/lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb index 1dbb3d5c2..4f6fdb00d 100644 --- a/lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb +++ b/lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb @@ -25,8 +25,11 @@ def payload_hash(item) def unlock(lock_key, item) Sidekiq.redis do |con| - val = @parent && @parent.name == 'schedule' ? 'scheduled' : item['jid'] - con.eval(remove_on_match, keys: [lock_key], argv: [val]) + if @parent && @parent.name == 'schedule'.freeze + con.eval(remove_scheduled_on_match, keys: [lock_key], argv: [item['jid']]) + else + con.eval(remove_on_match, keys: [lock_key], argv: [item['jid']]) + end end end end diff --git a/spec/lib/sidekiq_unique_ext_spec.rb b/spec/lib/sidekiq_unique_ext_spec.rb index 63a8e9361..b0a4b179e 100644 --- a/spec/lib/sidekiq_unique_ext_spec.rb +++ b/spec/lib/sidekiq_unique_ext_spec.rb @@ -57,14 +57,16 @@ def perform Sidekiq.redis(&:flushdb) end - it 'deletes uniqueness locks on clear' do - params = { foo: 'bar' } - payload_hash = SidekiqUniqueJobs.get_payload('JustAWorker', 'testqueue', [params]) - JustAWorker.perform_in(60 * 60 * 3, foo: 'bar') - set = Sidekiq::JobSet.new('schedule') - set.clear - Sidekiq.redis do |c| - expect(c.exists(payload_hash)).to be_falsy + context 'scheduled jobs' do + it 'deletes uniqueness locks on clear' do + params = { foo: 'bar' } + payload_hash = SidekiqUniqueJobs.get_payload('JustAWorker', 'testqueue', [params]) + JustAWorker.perform_in(60 * 60 * 3, foo: 'bar') + set = Sidekiq::JobSet.new('schedule') + set.clear + Sidekiq.redis do |c| + expect(c.exists(payload_hash)).to be_falsy + end end end end