From 792745f51b1fbd50ebe515389807d9ddc108ea05 Mon Sep 17 00:00:00 2001 From: Mika Hel Date: Tue, 26 Jun 2018 22:11:00 +0200 Subject: [PATCH] Adds legacy support (#281) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Allow old until_timeout jobs to keep working * Unlock and delete legacy keys * Invert the logic a little Allow the NEW lock to be created if the old job would have been the one that created the lock. * Fix the old acquire_lock (OMG!!) so broken... * Add coverage for automatic lock migration * Rubo👮 --- lib/sidekiq_unique_jobs/locksmith.rb | 17 ++- .../options_with_fallback.rb | 1 + redis/acquire_lock.lua | 7 +- redis/create.lua | 62 ++++++---- redis/delete.lua | 3 +- redis/signal.lua | 2 + .../sidekiq_unique_jobs/legacy_lock_spec.rb | 106 ++++++++++++++++++ .../sidekiq_unique_jobs/locksmith_spec.rb | 40 +------ 8 files changed, 165 insertions(+), 73 deletions(-) create mode 100644 spec/integration/sidekiq_unique_jobs/legacy_lock_spec.rb diff --git a/lib/sidekiq_unique_jobs/locksmith.rb b/lib/sidekiq_unique_jobs/locksmith.rb index 5a5b54061..5e301dde6 100644 --- a/lib/sidekiq_unique_jobs/locksmith.rb +++ b/lib/sidekiq_unique_jobs/locksmith.rb @@ -8,18 +8,18 @@ class Locksmith # rubocop:disable ClassLength include SidekiqUniqueJobs::Connection def initialize(item, redis_pool = nil) - @concurrency = 1 # removed in a0cff5bc42edbe7190d6ede7e7f845074d2d7af6 - @unique_digest = item[UNIQUE_DIGEST_KEY] - @expiration = item[LOCK_EXPIRATION_KEY] - @jid = item[JID_KEY] - @redis_pool = redis_pool + @concurrency = 1 # removed in a0cff5bc42edbe7190d6ede7e7f845074d2d7af6 + @expiration = item[LOCK_EXPIRATION_KEY] + @jid = item[JID_KEY] + @unique_digest = item[UNIQUE_DIGEST_KEY] + @redis_pool = redis_pool end def create Scripts.call( :create, redis_pool, - keys: [exists_key, grabbed_key, available_key, version_key], + keys: [exists_key, grabbed_key, available_key, version_key, unique_digest], argv: [jid, expiration, API_VERSION, concurrency], ) end @@ -43,7 +43,7 @@ def delete! Scripts.call( :delete, redis_pool, - keys: [exists_key, grabbed_key, available_key, version_key], + keys: [exists_key, grabbed_key, available_key, version_key, unique_digest], ) end @@ -73,7 +73,7 @@ def signal(token = nil) Scripts.call( :signal, redis_pool, - keys: [exists_key, grabbed_key, available_key, version_key], + keys: [exists_key, grabbed_key, available_key, version_key, unique_digest], argv: [token, expiration], ) end @@ -90,7 +90,6 @@ def grab_token(timeout = nil) else token = conn.lpop(available_key) end - yield token if token == jid end end diff --git a/lib/sidekiq_unique_jobs/options_with_fallback.rb b/lib/sidekiq_unique_jobs/options_with_fallback.rb index 384c80cb7..b764f1388 100644 --- a/lib/sidekiq_unique_jobs/options_with_fallback.rb +++ b/lib/sidekiq_unique_jobs/options_with_fallback.rb @@ -12,6 +12,7 @@ module OptionsWithFallback until_executed: SidekiqUniqueJobs::Lock::UntilExecuted, until_executing: SidekiqUniqueJobs::Lock::UntilExecuting, until_expired: SidekiqUniqueJobs::Lock::UntilExpired, + until_timeout: SidekiqUniqueJobs::Lock::UntilExpired, while_executing: SidekiqUniqueJobs::Lock::WhileExecuting, while_executing_reject: SidekiqUniqueJobs::Lock::WhileExecutingReject, }.freeze diff --git a/redis/acquire_lock.lua b/redis/acquire_lock.lua index dfb7c9f3f..1f057c724 100644 --- a/redis/acquire_lock.lua +++ b/redis/acquire_lock.lua @@ -1,6 +1,6 @@ local unique_key = KEYS[1] local job_id = ARGV[1] -local expires = ARGV[2] +local expires = tonumber(ARGV[2]) local stored_jid = redis.pcall('get', unique_key) if stored_jid then @@ -11,7 +11,10 @@ if stored_jid then end end -if redis.pcall('set', unique_key, job_id, 'nx', 'ex', expires) then +if redis.call('SET', unique_key, job_id, 'nx') then + if expires then + redis.call('EXPIRE', unique_key, expires) + end return 1 else return 0 diff --git a/redis/create.lua b/redis/create.lua index ea6270c61..e264d89ee 100644 --- a/redis/create.lua +++ b/redis/create.lua @@ -4,37 +4,55 @@ local exists_key = KEYS[1] local grabbed_key = KEYS[2] local available_key = KEYS[3] local version_key = KEYS[4] +local unique_digest = KEYS[5] -local exists_token = ARGV[1] +local job_id = ARGV[1] local expiration = tonumber(ARGV[2]) local api_version = ARGV[3] local concurrency = tonumber(ARGV[4]) -local stored_token = redis.call('GETSET', exists_key, exists_token) + +local stored_token = redis.call('GETSET', exists_key, job_id) + +redis.log(redis.LOG_DEBUG, "create.lua - starting...") if stored_token then - redis.log(redis.LOG_DEBUG, "create_locks.lua - returning stored_token : " .. stored_token) + redis.log(redis.LOG_DEBUG, "create.lua - stored_token: " .. stored_token) return stored_token -else - redis.call('EXPIRE', exists_key, 10) - redis.call('DEL', grabbed_key) - redis.call('DEL', available_key) - - if concurrency and concurrency > 1 then - for index = 1, concurrency do - redis.call('RPUSH', available_key, index) - end +end + +---------------------------------------------------------------- +-- TODO: Legacy support (Remove in v6.1) +local old_token = redis.call('GET', unique_digest) +if old_token then + -- redis.log(redis.LOG_DEBUG, "create.lua - " .. unique_digest .. " with " .. old_token) + if old_token == job_id or old_token == '2' then + -- redis.log(redis.LOG_DEBUG, "create.lua - " .. unique_digest .. " with " .. old_token .. " MATCH with " .. job_id) + redis.call('DEL', unique_digest) else - redis.call('RPUSH', available_key, exists_token) + -- redis.log(redis.LOG_DEBUG, "create.lua - " .. unique_digest .. " with " .. old_token .. " MISMATCH with " .. job_id) + return old_token end - redis.call('GETSET', version_key, api_version) - redis.call('PERSIST', exists_key) - - if expiration then - redis.log(redis.LOG_DEBUG, "create_locks.lua - expiring locks in : " .. expiration) - redis.call('EXPIRE', available_key, expiration) - redis.call('EXPIRE', exists_key, expiration) - redis.call('EXPIRE', version_key, expiration) +end +---------------------------------------------------------------- + +redis.call('EXPIRE', exists_key, 10) +redis.call('DEL', grabbed_key) +redis.call('DEL', available_key) + +if concurrency and concurrency > 1 then + for index = 1, concurrency do + redis.call('RPUSH', available_key, index) end +else + redis.call('RPUSH', available_key, job_id) +end +redis.call('GETSET', version_key, api_version) +redis.call('PERSIST', exists_key) - return 1 +if expiration then + redis.call('EXPIRE', available_key, expiration) + redis.call('EXPIRE', exists_key, expiration) + redis.call('EXPIRE', version_key, expiration) end + +return job_id diff --git a/redis/delete.lua b/redis/delete.lua index 12fa57b54..b28ff9c45 100644 --- a/redis/delete.lua +++ b/redis/delete.lua @@ -2,9 +2,10 @@ local exists_key = KEYS[1] local grabbed_key = KEYS[2] local available_key = KEYS[3] local version_key = KEYS[4] +local unique_digest = KEYS[5] -- TODO: Legacy support (Remove in v6.1) -redis.log(redis.LOG_DEBUG, "delete_locks.lua - forcefully deleting locks") redis.call('DEL', exists_key) redis.call('DEL', grabbed_key) redis.call('DEL', available_key) redis.call('DEL', version_key) +redis.call('DEL', unique_digest) -- TODO: Legacy support (Remove in v6.1) diff --git a/redis/signal.lua b/redis/signal.lua index 78a3ba183..86b53e64f 100644 --- a/redis/signal.lua +++ b/redis/signal.lua @@ -2,6 +2,7 @@ local exists_key = KEYS[1] local grabbed_key = KEYS[2] local available_key = KEYS[3] local version_key = KEYS[4] +local unique_digest = KEYS[5] -- TODO: Legacy support (Remove in v6.1) local token = ARGV[1] local expiration = tonumber(ARGV[2]) @@ -14,6 +15,7 @@ if expiration then redis.call('EXPIRE', exists_key, expiration) redis.call('EXPIRE', available_key, expiration) redis.call('EXPIRE', version_key, expiration) + redis.call('EXPIRE', unique_digest, expiration) -- TODO: Legacy support (Remove in v6.1) end return available_count diff --git a/spec/integration/sidekiq_unique_jobs/legacy_lock_spec.rb b/spec/integration/sidekiq_unique_jobs/legacy_lock_spec.rb new file mode 100644 index 000000000..1b8e993ef --- /dev/null +++ b/spec/integration/sidekiq_unique_jobs/legacy_lock_spec.rb @@ -0,0 +1,106 @@ +# frozen_string_literal: true + +require 'spec_helper' + +# rubocop:disable RSpec/FilePath +RSpec.describe SidekiqUniqueJobs::Locksmith, redis: :redis do + let(:locksmith_one) { described_class.new(lock_item) } + let(:lock_expiration) { nil } + let(:redis_pool) { nil } + let(:jid_one) { 'maaaahjid' } + let(:jid_two) { 'anotherjid' } + let(:unique_digest) { 'uniquejobs:test_mutex_key' } + let(:queue) { 'dupsallowed' } + let(:unique) { :until_executed } + let(:worker_class) { UntilExecutedJob } + let(:lock_item) do + { + 'args' => [1], + 'class' => UntilExecutedJob, + 'jid' => jid_one, + 'lock_expiration' => lock_expiration, + 'queue' => queue, + 'unique' => unique, + 'unique_digest' => unique_digest, + } + end + + let(:locksmith_two) { described_class.new(lock_item_two) } + let(:lock_item_two) { lock_item.merge('jid' => jid_two) } + + context 'with a legacy lock' do + before do + result = SidekiqUniqueJobs::Scripts.call( + :acquire_lock, + redis_pool, + keys: [unique_digest], + argv: [lock_value, lock_expiration], + ) + + expect(result).to eq(1) + expect(unique_keys).to include(unique_digest) + end + + context 'when lock_expiration is unset' do + let(:lock_value) { jid_one } + + it 'can signal to expire the lock after 10' do + locksmith_one.signal(jid_one) + + expect(ttl(unique_digest)).to eq(-1) # key exists but has been expired + end + + it 'can soft delete the lock' do + expect(locksmith_one.delete).to eq(nil) + expect(unique_keys).not_to include(unique_digest) + end + + it 'can force delete the lock' do + expect(locksmith_one.delete!).to eq(nil) + expect(unique_keys).not_to include(unique_digest) + end + end + + context 'when lock_expiration is set' do + let(:lock_value) { jid_one } + let(:lock_expiration) { 10 } + + it 'can signal to expire the lock after 10' do + locksmith_one.signal(jid_one) + + expect(ttl(unique_digest)).to be_within(1).of(10) + end + + it 'cannot soft delete the lock' do + expect(locksmith_one.delete).to eq(nil) + expect(unique_keys).to include(unique_digest) + end + + it 'can force delete the lock' do + expect(locksmith_one.delete!).to eq(nil) + expect(unique_keys).not_to include(unique_digest) + end + end + + context 'when the value of unique_digest is 2' do + let(:lock_value) { '2' } + + it 'returns the stored jid' do + expect(locksmith_one.lock(0)).to eq(jid_one) + end + end + + context 'when the value of unique_digest is jid' do + let(:lock_value) { jid_one } + + it 'returns the stored jid' do + expect(locksmith_one.lock(0)).to eq(jid_one) + end + + it 'can not be locked by another jid' do + expect(locksmith_two.lock(0)).to eq(nil) + end + end + end +end +# rubocop:enable RSpec/FilePath diff --git a/spec/integration/sidekiq_unique_jobs/locksmith_spec.rb b/spec/integration/sidekiq_unique_jobs/locksmith_spec.rb index cdf32c8a6..84df1634f 100644 --- a/spec/integration/sidekiq_unique_jobs/locksmith_spec.rb +++ b/spec/integration/sidekiq_unique_jobs/locksmith_spec.rb @@ -181,45 +181,7 @@ # end end - # describe 'lock with staleness checking' do - # let(:lock_stale_client_timeout) { 5 } - - # context 'when redis_version is old' do - # before do - # allow(SidekiqUniqueJobs).to receive(:redis_version).and_return('3.0') - # end - - # it_behaves_like 'a lock' - - # it 'restores resources of stale clients', redis: :redis do - # another_lock_item = lock_item.merge('jid' => 'abcdefab', 'stale_client_timeout' => 1) - # hyper_aggressive_locksmith = described_class.new(another_lock_item) - - # expect(hyper_aggressive_locksmith.lock(1)).to be_truthy - # expect(hyper_aggressive_locksmith.lock(1)).to eq(nil) - # expect(hyper_aggressive_locksmith.lock(1)).to be_truthy - # end - # end - - # context 'when redis_version is new', redis: :redis do - # before do - # allow(SidekiqUniqueJobs).to receive(:redis_version).and_return('3.2') - # end - - # it_behaves_like 'a lock' - - # it 'restores resources of stale clients' do - # another_lock_item = lock_item.merge('jid' => 'abcdefab', 'stale_client_timeout' => 1) - # hyper_aggressive_locksmith = described_class.new(another_lock_item) - - # expect(hyper_aggressive_locksmith.lock(1)).to be_truthy - # expect(hyper_aggressive_locksmith.lock(1)).to eq(nil) - # expect(hyper_aggressive_locksmith.lock(1)).to be_truthy - # end - # end - # end - - describe 'redis time' do + describe 'current_time' do let(:lock_stale_client_timeout) { 5 } before do