Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert v5 locks when needed #375

Merged
merged 2 commits into from
Feb 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions lib/sidekiq_unique_jobs/locksmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Locksmith
# @option item [String] :unique_digest the unique digest (See: {UniqueArgs#unique_digest})
# @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection
def initialize(item, redis_pool = nil)
@concurrency = 1 # removed in a0cff5bc42edbe7190d6ede7e7f845074d2d7af6
# @concurrency = 1 # removed in a0cff5bc42edbe7190d6ede7e7f845074d2d7af6
@ttl = item[LOCK_EXPIRATION_KEY]
@jid = item[JID_KEY]
@unique_digest = item[UNIQUE_DIGEST_KEY]
Expand All @@ -23,20 +23,6 @@ def initialize(item, redis_pool = nil)
@redis_pool = redis_pool
end

# Checks if the exists key is created in redis
# @return [true, false]
def exists?
redis(redis_pool) { |conn| conn.exists(exists_key) }
end

# The number of available resourced for this lock
# @return [Integer] the number of available resources
def available_count
return concurrency unless exists?

redis(redis_pool) { |conn| conn.llen(available_key) }
end

# Deletes the lock unless it has a ttl set
def delete
return if ttl
Expand Down Expand Up @@ -100,12 +86,19 @@ def unlock!(token = nil)
# @return [true, false]
def locked?(token = nil)
token ||= jid
Scripts.call(
:convert_legacy_lock,
redis_pool,
keys: [grabbed_key, unique_digest],
argv: [token],
)

redis(redis_pool) { |conn| conn.hexists(grabbed_key, token) }
end

private

attr_reader :concurrency, :unique_digest, :ttl, :jid, :redis_pool, :lock_type
attr_reader :unique_digest, :ttl, :jid, :redis_pool, :lock_type

def grab_token(timeout = nil)
redis(redis_pool) do |conn|
Expand Down
21 changes: 21 additions & 0 deletions redis/convert_legacy_lock.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
local grabbed_key = KEYS[1]
local unique_digest = KEYS[2]

local job_id = ARGV[1]

local function current_time()
local time = redis.call('time')
local s = time[1]
local ms = time[2]
local number = tonumber((s .. '.' .. ms))

return number
end

local old_token = redis.call('GET', unique_digest)
if old_token then
if old_token == job_id or old_token == '2' then
redis.call('DEL', unique_digest)
redis.call('HSET', grabbed_key, job_id, current_time())
end
end
5 changes: 1 addition & 4 deletions redis/lock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ local function current_time()
return number
end

-- redis.log(redis.LOG_DEBUG, "create.lua - investigate possibility of locking jid: " .. job_id)

local stored_token = redis.call('GET', exists_key)
local stored_token = redis.call('GET', exists_key)
if stored_token and stored_token ~= job_id then
-- redis.log(redis.LOG_DEBUG, "create.lua - jid: " .. job_id .. " - returning existing jid: " .. stored_token)
return stored_token
end

Expand Down
12 changes: 6 additions & 6 deletions spec/integration/sidekiq_unique_jobs/legacy_lock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@
)
end

it "creates grabbed keys when needed" do
expect { locksmith_one.locked? }.to change { unique_keys }
.to include("uniquejobs:test_mutex_key:GRABBED")
end

specify { expect(locksmith_one.locked?).to eq(true) }
specify { expect(old_lock).to eq(1) }
specify { expect(unique_keys).to include(unique_digest) }

Expand All @@ -81,12 +87,6 @@
context "when lock_expiration is set" do
let(:lock_expiration) { 10 }

it "can signal to expire the lock after 10" do
locksmith_one.unlock(jid_one)

expect(ttl(unique_digest)).to be_within(1).of(10)
end

it "cannot soft delete the lock" do
expect(locksmith_one.delete).to eq(nil)
expect(unique_keys).to include(unique_digest)
Expand Down
12 changes: 0 additions & 12 deletions spec/integration/sidekiq_unique_jobs/locksmith_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@
let(:item_two) { item_one.merge("jid" => jid_two) }

shared_examples_for "a lock" do
it "does not exist from the start" do
expect(locksmith_one.exists?).to eq(false)
locksmith_one.lock
expect(locksmith_one.exists?).to eq(true)
end

it "is unlocked from the start" do
expect(locksmith_one.locked?).to eq(false)
end
Expand Down Expand Up @@ -103,12 +97,6 @@
end
expect(locksmith_one.locked?).to eq false
end

it "does something" do
expect(locksmith_one.available_count).to eq(1)
locksmith_one.lock(0)
expect(locksmith_one.available_count).to eq(0)
end
end

describe "lock with expiration" do
Expand Down