Skip to content

Commit

Permalink
Adds legacy support (#281)
Browse files Browse the repository at this point in the history
* Allow old until_timeout jobs to keep working

* Unlock and delete legacy keys

* Invert the logic a little

Allow the NEW lock to be created if the old job would have been the one that
created the lock.

* Fix the old acquire_lock (OMG!!) so broken...

* Add coverage for automatic lock migration

* Rubo👮
  • Loading branch information
mhenrixon authored Jun 26, 2018
1 parent 82f11a0 commit 792745f
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 73 deletions.
17 changes: 8 additions & 9 deletions lib/sidekiq_unique_jobs/locksmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ class Locksmith # rubocop:disable ClassLength
include SidekiqUniqueJobs::Connection

def initialize(item, redis_pool = nil)
@concurrency = 1 # removed in a0cff5bc42edbe7190d6ede7e7f845074d2d7af6
@unique_digest = item[UNIQUE_DIGEST_KEY]
@expiration = item[LOCK_EXPIRATION_KEY]
@jid = item[JID_KEY]
@redis_pool = redis_pool
@concurrency = 1 # removed in a0cff5bc42edbe7190d6ede7e7f845074d2d7af6
@expiration = item[LOCK_EXPIRATION_KEY]
@jid = item[JID_KEY]
@unique_digest = item[UNIQUE_DIGEST_KEY]
@redis_pool = redis_pool
end

def create
Scripts.call(
:create,
redis_pool,
keys: [exists_key, grabbed_key, available_key, version_key],
keys: [exists_key, grabbed_key, available_key, version_key, unique_digest],
argv: [jid, expiration, API_VERSION, concurrency],
)
end
Expand All @@ -43,7 +43,7 @@ def delete!
Scripts.call(
:delete,
redis_pool,
keys: [exists_key, grabbed_key, available_key, version_key],
keys: [exists_key, grabbed_key, available_key, version_key, unique_digest],
)
end

Expand Down Expand Up @@ -73,7 +73,7 @@ def signal(token = nil)
Scripts.call(
:signal,
redis_pool,
keys: [exists_key, grabbed_key, available_key, version_key],
keys: [exists_key, grabbed_key, available_key, version_key, unique_digest],
argv: [token, expiration],
)
end
Expand All @@ -90,7 +90,6 @@ def grab_token(timeout = nil)
else
token = conn.lpop(available_key)
end

yield token if token == jid
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/options_with_fallback.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module OptionsWithFallback
until_executed: SidekiqUniqueJobs::Lock::UntilExecuted,
until_executing: SidekiqUniqueJobs::Lock::UntilExecuting,
until_expired: SidekiqUniqueJobs::Lock::UntilExpired,
until_timeout: SidekiqUniqueJobs::Lock::UntilExpired,
while_executing: SidekiqUniqueJobs::Lock::WhileExecuting,
while_executing_reject: SidekiqUniqueJobs::Lock::WhileExecutingReject,
}.freeze
Expand Down
7 changes: 5 additions & 2 deletions redis/acquire_lock.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
local unique_key = KEYS[1]
local job_id = ARGV[1]
local expires = ARGV[2]
local expires = tonumber(ARGV[2])
local stored_jid = redis.pcall('get', unique_key)

if stored_jid then
Expand All @@ -11,7 +11,10 @@ if stored_jid then
end
end

if redis.pcall('set', unique_key, job_id, 'nx', 'ex', expires) then
if redis.call('SET', unique_key, job_id, 'nx') then
if expires then
redis.call('EXPIRE', unique_key, expires)
end
return 1
else
return 0
Expand Down
62 changes: 40 additions & 22 deletions redis/create.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,55 @@ local exists_key = KEYS[1]
local grabbed_key = KEYS[2]
local available_key = KEYS[3]
local version_key = KEYS[4]
local unique_digest = KEYS[5]

local exists_token = ARGV[1]
local job_id = ARGV[1]
local expiration = tonumber(ARGV[2])
local api_version = ARGV[3]
local concurrency = tonumber(ARGV[4])
local stored_token = redis.call('GETSET', exists_key, exists_token)

local stored_token = redis.call('GETSET', exists_key, job_id)

redis.log(redis.LOG_DEBUG, "create.lua - starting...")

if stored_token then
redis.log(redis.LOG_DEBUG, "create_locks.lua - returning stored_token : " .. stored_token)
redis.log(redis.LOG_DEBUG, "create.lua - stored_token: " .. stored_token)
return stored_token
else
redis.call('EXPIRE', exists_key, 10)
redis.call('DEL', grabbed_key)
redis.call('DEL', available_key)

if concurrency and concurrency > 1 then
for index = 1, concurrency do
redis.call('RPUSH', available_key, index)
end
end

----------------------------------------------------------------
-- TODO: Legacy support (Remove in v6.1)
local old_token = redis.call('GET', unique_digest)
if old_token then
-- redis.log(redis.LOG_DEBUG, "create.lua - " .. unique_digest .. " with " .. old_token)
if old_token == job_id or old_token == '2' then
-- redis.log(redis.LOG_DEBUG, "create.lua - " .. unique_digest .. " with " .. old_token .. " MATCH with " .. job_id)
redis.call('DEL', unique_digest)
else
redis.call('RPUSH', available_key, exists_token)
-- redis.log(redis.LOG_DEBUG, "create.lua - " .. unique_digest .. " with " .. old_token .. " MISMATCH with " .. job_id)
return old_token
end
redis.call('GETSET', version_key, api_version)
redis.call('PERSIST', exists_key)

if expiration then
redis.log(redis.LOG_DEBUG, "create_locks.lua - expiring locks in : " .. expiration)
redis.call('EXPIRE', available_key, expiration)
redis.call('EXPIRE', exists_key, expiration)
redis.call('EXPIRE', version_key, expiration)
end
----------------------------------------------------------------

redis.call('EXPIRE', exists_key, 10)
redis.call('DEL', grabbed_key)
redis.call('DEL', available_key)

if concurrency and concurrency > 1 then
for index = 1, concurrency do
redis.call('RPUSH', available_key, index)
end
else
redis.call('RPUSH', available_key, job_id)
end
redis.call('GETSET', version_key, api_version)
redis.call('PERSIST', exists_key)

return 1
if expiration then
redis.call('EXPIRE', available_key, expiration)
redis.call('EXPIRE', exists_key, expiration)
redis.call('EXPIRE', version_key, expiration)
end

return job_id
3 changes: 2 additions & 1 deletion redis/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ local exists_key = KEYS[1]
local grabbed_key = KEYS[2]
local available_key = KEYS[3]
local version_key = KEYS[4]
local unique_digest = KEYS[5] -- TODO: Legacy support (Remove in v6.1)

redis.log(redis.LOG_DEBUG, "delete_locks.lua - forcefully deleting locks")
redis.call('DEL', exists_key)
redis.call('DEL', grabbed_key)
redis.call('DEL', available_key)
redis.call('DEL', version_key)
redis.call('DEL', unique_digest) -- TODO: Legacy support (Remove in v6.1)
2 changes: 2 additions & 0 deletions redis/signal.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ local exists_key = KEYS[1]
local grabbed_key = KEYS[2]
local available_key = KEYS[3]
local version_key = KEYS[4]
local unique_digest = KEYS[5] -- TODO: Legacy support (Remove in v6.1)

local token = ARGV[1]
local expiration = tonumber(ARGV[2])
Expand All @@ -14,6 +15,7 @@ if expiration then
redis.call('EXPIRE', exists_key, expiration)
redis.call('EXPIRE', available_key, expiration)
redis.call('EXPIRE', version_key, expiration)
redis.call('EXPIRE', unique_digest, expiration) -- TODO: Legacy support (Remove in v6.1)
end

return available_count
106 changes: 106 additions & 0 deletions spec/integration/sidekiq_unique_jobs/legacy_lock_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# frozen_string_literal: true

require 'spec_helper'

# rubocop:disable RSpec/FilePath
RSpec.describe SidekiqUniqueJobs::Locksmith, redis: :redis do
let(:locksmith_one) { described_class.new(lock_item) }
let(:lock_expiration) { nil }
let(:redis_pool) { nil }
let(:jid_one) { 'maaaahjid' }
let(:jid_two) { 'anotherjid' }
let(:unique_digest) { 'uniquejobs:test_mutex_key' }
let(:queue) { 'dupsallowed' }
let(:unique) { :until_executed }
let(:worker_class) { UntilExecutedJob }
let(:lock_item) do
{
'args' => [1],
'class' => UntilExecutedJob,
'jid' => jid_one,
'lock_expiration' => lock_expiration,
'queue' => queue,
'unique' => unique,
'unique_digest' => unique_digest,
}
end

let(:locksmith_two) { described_class.new(lock_item_two) }
let(:lock_item_two) { lock_item.merge('jid' => jid_two) }

context 'with a legacy lock' do
before do
result = SidekiqUniqueJobs::Scripts.call(
:acquire_lock,
redis_pool,
keys: [unique_digest],
argv: [lock_value, lock_expiration],
)

expect(result).to eq(1)
expect(unique_keys).to include(unique_digest)
end

context 'when lock_expiration is unset' do
let(:lock_value) { jid_one }

it 'can signal to expire the lock after 10' do
locksmith_one.signal(jid_one)

expect(ttl(unique_digest)).to eq(-1) # key exists but has been expired
end

it 'can soft delete the lock' do
expect(locksmith_one.delete).to eq(nil)
expect(unique_keys).not_to include(unique_digest)
end

it 'can force delete the lock' do
expect(locksmith_one.delete!).to eq(nil)
expect(unique_keys).not_to include(unique_digest)
end
end

context 'when lock_expiration is set' do
let(:lock_value) { jid_one }
let(:lock_expiration) { 10 }

it 'can signal to expire the lock after 10' do
locksmith_one.signal(jid_one)

expect(ttl(unique_digest)).to be_within(1).of(10)
end

it 'cannot soft delete the lock' do
expect(locksmith_one.delete).to eq(nil)
expect(unique_keys).to include(unique_digest)
end

it 'can force delete the lock' do
expect(locksmith_one.delete!).to eq(nil)
expect(unique_keys).not_to include(unique_digest)
end
end

context 'when the value of unique_digest is 2' do
let(:lock_value) { '2' }

it 'returns the stored jid' do
expect(locksmith_one.lock(0)).to eq(jid_one)
end
end

context 'when the value of unique_digest is jid' do
let(:lock_value) { jid_one }

it 'returns the stored jid' do
expect(locksmith_one.lock(0)).to eq(jid_one)
end

it 'can not be locked by another jid' do
expect(locksmith_two.lock(0)).to eq(nil)
end
end
end
end
# rubocop:enable RSpec/FilePath
40 changes: 1 addition & 39 deletions spec/integration/sidekiq_unique_jobs/locksmith_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,45 +181,7 @@
# end
end

# describe 'lock with staleness checking' do
# let(:lock_stale_client_timeout) { 5 }

# context 'when redis_version is old' do
# before do
# allow(SidekiqUniqueJobs).to receive(:redis_version).and_return('3.0')
# end

# it_behaves_like 'a lock'

# it 'restores resources of stale clients', redis: :redis do
# another_lock_item = lock_item.merge('jid' => 'abcdefab', 'stale_client_timeout' => 1)
# hyper_aggressive_locksmith = described_class.new(another_lock_item)

# expect(hyper_aggressive_locksmith.lock(1)).to be_truthy
# expect(hyper_aggressive_locksmith.lock(1)).to eq(nil)
# expect(hyper_aggressive_locksmith.lock(1)).to be_truthy
# end
# end

# context 'when redis_version is new', redis: :redis do
# before do
# allow(SidekiqUniqueJobs).to receive(:redis_version).and_return('3.2')
# end

# it_behaves_like 'a lock'

# it 'restores resources of stale clients' do
# another_lock_item = lock_item.merge('jid' => 'abcdefab', 'stale_client_timeout' => 1)
# hyper_aggressive_locksmith = described_class.new(another_lock_item)

# expect(hyper_aggressive_locksmith.lock(1)).to be_truthy
# expect(hyper_aggressive_locksmith.lock(1)).to eq(nil)
# expect(hyper_aggressive_locksmith.lock(1)).to be_truthy
# end
# end
# end

describe 'redis time' do
describe 'current_time' do
let(:lock_stale_client_timeout) { 5 }

before do
Expand Down

0 comments on commit 792745f

Please sign in to comment.