Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes #310

Merged
merged 7 commits into from
Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ For sidekiq versions before 5.1 a `sidekiq_retries_exhausted` block is required
```ruby
class MyWorker
sidekiq_retries_exhausted do |msg, _ex|
SidekiqUniqueJobs::Digests.delete_by(digest: msg['unique_digest']) if msg['unique_digest']
SidekiqUniqueJobs::Digests.del(digest: msg['unique_digest']) if msg['unique_digest']
end
end
```
Expand All @@ -372,7 +372,7 @@ Starting in v5.1, Sidekiq can also fire a global callback when a job dies:
# this goes in your initializer
Sidekiq.configure_server do |config|
config.death_handlers << ->(job, _ex) do
SidekiqUniqueJobs::Digests.delete_by(digest: job['unique_digest']) if job['unique_digest']
SidekiqUniqueJobs::Digests.del(digest: job['unique_digest']) if job['unique_digest']
end
end
```
Expand Down
17 changes: 17 additions & 0 deletions lib/sidekiq_unique_jobs/digests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@ def all(pattern: SCAN_PATTERN, count: DEFAULT_COUNT)
redis { |conn| conn.sscan_each(UNIQUE_SET, match: pattern, count: count).to_a }
end

# Paginate unique digests
#
# @param [String] pattern a pattern to match with
# @param [Integer] page the current cursor position
# @param [Integer] count the maximum number to match
# @return [Array<String>] with unique digests
def page(pattern: SCAN_PATTERN, cursor: 0, page_size: 100)
redis do |conn|
total_size, digests = conn.multi do
conn.scard(UNIQUE_SET)
conn.sscan(UNIQUE_SET, cursor, match: pattern, count: page_size)
end

[total_size, digests[0], digests[1]]
end
end

# Get a total count of unique digests
#
# @return [Integer] number of digests
Expand Down
12 changes: 11 additions & 1 deletion lib/sidekiq_unique_jobs/locksmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ def create
keys: [exists_key, grabbed_key, available_key, version_key, UNIQUE_SET, unique_digest],
argv: [jid, expiration, API_VERSION, concurrency],
)
expire
end

def expire
Scripts.call(
:expire,
redis_pool,
keys: [exists_key, available_key, version_key],
argv: [expiration],
)
end

# Checks if the exists key is created in redis
Expand Down Expand Up @@ -86,7 +96,7 @@ def unlock
signal(jid)
end

# Removes the lock keys from Redis
# Checks if this instance is considered locked
# @param [String] token the unique token to check for a lock.
# nil will default to the jid provided in the initializer
# @return [true, false]
Expand Down
6 changes: 4 additions & 2 deletions lib/sidekiq_unique_jobs/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength
end

app.get '/unique_digests' do
@total_size = Digests.count
@filter = params[:filter] || '*'
@filter = '*' if @filter == ''
@count = (params[:count] || 100).to_i
@unique_digests = Digests.all(pattern: @filter, count: @count)
@current_cursor = params[:cursor]
@prev_cursor = params[:prev_cursor]
@total_size, @next_cursor, @unique_digests =
Digests.page(pattern: @filter, cursor: @current_cursor, page_size: @count)

erb(unique_template(:unique_digests))
end
Expand Down
14 changes: 14 additions & 0 deletions lib/sidekiq_unique_jobs/web/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ def unique_template(name)
File.open(File.join(VIEW_PATH, "#{name}.erb")).read
end

SAFE_CPARAMS = %w[cursor prev_cursor].freeze

def cparams(options)
# stringify
options.keys.each do |key|
options[key.to_s] = options.delete(key)
end

params.merge(options).map do |key, value|
next unless SAFE_CPARAMS.include?(key)
"#{key}=#{CGI.escape(value.to_s)}"
end.compact.join('&')
end

def redirect_to(subpath)
if respond_to?(:to)
# Sinatra-based web UI
Expand Down
10 changes: 10 additions & 0 deletions lib/sidekiq_unique_jobs/web/views/_paging.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<ul class="pagination pull-right flip">
<% if @prev_cursor %>
<li>
<a href="<%= url %>?<%= cparams(cursor: @prev_cursor, prev_cursor: @next_cursor) %>">Previous <%= @count %></a>
</li>
<% end %>
<li>
<a href="<%= url %>?<%= cparams(cursor: @next_cursor, prev_cursor: @current_cursor) %>">Next <%= @count %></a>
</li>
</ul>
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/web/views/unique_digests.erb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</form>
<% if @unique_digests.size > 0 && @total_size > @count.to_i %>
<div class="col-sm-4">
<%= erb :_paging, locals: { url: "#{root_path}unique_digests" } %>
<%= erb unique_template(:_paging), locals: { url: "#{root_path}unique_digests" } %>
</div>
<% end %>
</header>
Expand Down
6 changes: 0 additions & 6 deletions redis/create.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,4 @@ end
redis.call('GETSET', version_key, api_version)
redis.call('PERSIST', exists_key)

if expiration then
redis.call('EXPIRE', available_key, expiration)
redis.call('EXPIRE', exists_key, expiration)
redis.call('EXPIRE', version_key, expiration)
end

return job_id
14 changes: 14 additions & 0 deletions redis/expire.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- redis.replicate_commands();

local exists_key = KEYS[1]
local available_key = KEYS[2]
local version_key = KEYS[3]

local expiration = tonumber(ARGV[1])

if expiration then
redis.log(redis.LOG_DEBUG, "create.lua - expiring locks because expiration: " .. tostring(expiration))
redis.call('EXPIRE', available_key, expiration)
redis.call('EXPIRE', exists_key, expiration)
redis.call('EXPIRE', version_key, expiration)
end
3 changes: 2 additions & 1 deletion redis/signal.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ local token = ARGV[1]
local expiration = tonumber(ARGV[2])

redis.call('HDEL', grabbed_key, token)
redis.call('SREM', unique_keys, token)
redis.call('SREM', unique_keys, unique_digest)
local available_count = redis.call('LPUSH', available_key, token)

if expiration then
Expand All @@ -21,3 +21,4 @@ if expiration then
end

return available_count

Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
process_one.lock
process_one.execute {}
expect(process_one.locked?).to eq(true)
expect(ttl('uniquejobs:da6005926a8457526e998f0033901dfc:EXISTS')).to eq(1)
expect(ttl('uniquejobs:da6005926a8457526e998f0033901dfc:VERSION')).to eq(1)
end
end

Expand Down
40 changes: 39 additions & 1 deletion spec/integration/sidekiq_unique_jobs/locksmith_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
let(:jid_one) { 'maaaahjid' }
let(:jid_two) { 'jidmayhem' }
let(:lock_expiration) { nil }
let(:unique_digest) { 'test_mutex_key' }
let(:unique_digest) { 'uniquejobs:randomvalue' }
let(:item_one) do
{
'jid' => jid_one,
Expand Down Expand Up @@ -115,6 +115,44 @@

it_behaves_like 'a lock'

it 'creates the expected keys' do
locksmith_one.lock
expect(unique_digests).to match_array(['uniquejobs:randomvalue'])
expect(unique_keys).to match_array(%w[
uniquejobs:randomvalue:EXISTS
uniquejobs:randomvalue:GRABBED
uniquejobs:randomvalue:VERSION
])
end

it 'expires the expected keys' do
locksmith_one.lock
expect(unique_digests).to match_array(['uniquejobs:randomvalue'])
expect(unique_keys).to match_array(%w[
uniquejobs:randomvalue:EXISTS
uniquejobs:randomvalue:GRABBED
uniquejobs:randomvalue:VERSION
])
locksmith_one.signal

expect(unique_digests).to match_array([])
expect(ttl('uniquejobs:randomvalue:EXISTS')).to eq(1)
expect(ttl('uniquejobs:randomvalue:VERSION')).to eq(1)
end

it 'deletes the expected keys' do
locksmith_one.lock
expect(unique_digests).to match_array(['uniquejobs:randomvalue'])
expect(unique_keys).to match_array(%w[
uniquejobs:randomvalue:EXISTS
uniquejobs:randomvalue:GRABBED
uniquejobs:randomvalue:VERSION
])
locksmith_one.delete!
expect(unique_digests).to match_array([])
expect(unique_keys).to match_array(%w[])
end

it 'expires keys' do
Sidekiq.redis(&:flushdb)
locksmith_one.create
Expand Down
9 changes: 9 additions & 0 deletions spec/integration/sidekiq_unique_jobs/web_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ def app
expect(last_response.body).to match("/unique_digests/#{another_digest}")
end

it 'can paginate digests' do
110.times do |idx|
expect(MyUniqueJob.perform_async(1, idx)).not_to eq(nil)
end

get '/unique_digests'
expect(last_response.status).to eq(200)
end

it 'can display digest' do
expect(MyUniqueJob.perform_async(1, 2)).not_to eq(nil)

Expand Down
8 changes: 8 additions & 0 deletions spec/support/sidekiq_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def ttl(key)
redis { |conn| conn.ttl(key) }
end

def unique_digests
smembers('unique:keys')
end

def smembers(key)
redis { |conn| conn.smembers(key) }
end

def unique_keys
keys('uniquejobs:*')
end
Expand Down