Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduled jobs breaking with new_unique_for method #101

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/sidekiq_unique_jobs/client/extensions.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module SidekiqUniqueJobs
module Client
module Extensions
def lock_queue_script
<<-LUA
local ret = redis.call('GET', KEYS[1])
if not ret or ret == 'scheduled' then
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would have to change as well then with a composite key of some sort. Even leaving it as it was with just a simple integer. Not sure what happens in an application with a billion scheduled keys. The jit was needed to prevent unlocking jobs you shouldn't be able to unlock?

return redis.call('SETEX', KEYS[1], ARGV[1], ARGV[2])
end
LUA
end
end
end
end
22 changes: 17 additions & 5 deletions lib/sidekiq_unique_jobs/client/middleware.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
require 'sidekiq_unique_jobs/server/middleware'
require 'sidekiq_unique_jobs/connectors'
require 'sidekiq_unique_jobs/client/extensions'

module SidekiqUniqueJobs
module Client
class Middleware
include Extensions

def call(worker_class, item, queue, redis_pool = nil)
@worker_class = SidekiqUniqueJobs.worker_class_constantize(worker_class)
@item = item
Expand Down Expand Up @@ -44,14 +46,14 @@ def storage_method
def old_unique_for?
connection do |conn|
conn.watch(payload_hash)
pid = conn.get(payload_hash).to_i
if pid == 1 || (pid == 2 && item['at'])
pid = conn.get(payload_hash)
if pid && (pid != 'scheduled' || item['at'])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry but if != || == WAT?!

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code was a mess already. What I'd like to see is something more in the lines of:

def old_unique_for?
  connection do |conn|
    conn.watch(payload_hash)
    pid = conn.get(payload_hash)
    return create_lock(conn) unless pid == 'scheduled' || item['at']
    conn.unwatch
  end
end

def create_lock(conn)
  conn.multi do
    return clear_exired_lock(conn) unless expires_at > 0
    conn.setex(payload_hash, expires_at, item['at'] ? 'scheduled' : item['jid'])
  end
end

def clear_expired_lock(conn)
  conn.del(payload_hash) # already expired
end

Thoughts on that?

conn.unwatch
nil
else
conn.multi do
if expires_at > 0
conn.setex(payload_hash, expires_at, item['jid'])
conn.setex(payload_hash, expires_at, item['at'] ? 'scheduled' : item['jid'])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not using the jid if the job is scheduled? Like scheduled:jid and then check if the key starts with scheduled?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is the jid not at all interesting until it ends up in the real queue? I have been traveling for 60 hours and feel a little sluggish

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we wouldn't normally need the jid in the schedule - because SidekiqUnique never unlocks scheduled only promotes to the enqueued lock -- the exception being deletion of jobs from the scheduled queue via. the sidekiq API after for example a Sidekiq restart that would have allowed the lock to expire (thus would then allow a scheduled job to delete the scheduled mutex of another job).

Over all I'm not sure if it's worth caring that much about uniqueness in the scheduled queue, so I may submit a patch later to not have any kind of scheduled uniqueness by default -- but probably we should be as consistent as possible if it is going to be supported?

Another to thing to note is using something like "scheduled-#{item['jid']}" will require a substring call in the LUA script - which is more expensive than a direct comparison.

else
conn.del(payload_hash)
end
Expand All @@ -62,8 +64,18 @@ def old_unique_for?
# rubocop:enable MethodLength

def new_unique_for?
item['at'] ? unique_for_schedule : unique_for_queue
end

def unique_for_schedule
connection do |conn|
conn.set(payload_hash, 'scheduled', nx: true, ex: expires_at)
end
end

def unique_for_queue
connection do |conn|
return conn.set(payload_hash, item['jid'], nx: true, ex: expires_at)
conn.eval(lock_queue_script, keys: [payload_hash], argv: [expires_at, item['jid']])
end
end

Expand Down
3 changes: 2 additions & 1 deletion lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def payload_hash(item)

def unlock(lock_key, item)
Sidekiq.redis do |con|
con.eval(remove_on_match, keys: [lock_key], argv: [item['jid']])
val = @parent && @parent.name == 'schedule' ? 'scheduled' : item['jid']
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a method for this value and name it appropriately

con.eval(remove_on_match, keys: [lock_key], argv: [val])
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/testing.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'sidekiq_unique_jobs/testing/sidekiq_overrides'
require 'sidekiq_unique_jobs/server/middleware'

module SidekiqUniqueJobs
module Client
Expand Down
2 changes: 1 addition & 1 deletion sidekiq-unique-jobs.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Gem::Specification.new do |gem|
gem.require_paths = ['lib']
gem.post_install_message = 'If you are relying on `mock_redis` you will now have to add' \
'gem "mock_redis" to your desired bundler group.'
gem.version = SidekiqUniqueJobs::VERSION
gem.version = SidekiqUniqueJobs::VERSION
gem.add_dependency 'sidekiq', '>= 2.6'
gem.add_development_dependency 'mock_redis'
gem.add_development_dependency 'rspec', '~> 3.1.0'
Expand Down
48 changes: 36 additions & 12 deletions spec/lib/client/middleware_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,47 @@ def perform(_)
end

describe 'when a job is already scheduled' do
before { MyUniqueWorker.perform_in(3600, 1) }
it 'rejects new jobs with the same argument' do
expect(MyUniqueWorker.perform_async(1)).to eq(nil)
context '#old_unique_for' do
before 'schedule a job' do
allow(SidekiqUniqueJobs.config).to receive(:unique_storage_method).and_return(:old)
MyUniqueWorker.perform_in(3600, 1)
end

it 'rejects new scheduled jobs with the same argument' do
expect(MyUniqueWorker.perform_in(1800, 1)).to eq(nil)
end

it 'will run a job in real time with the same arguments' do
expect(MyUniqueWorker.perform_async(1)).not_to eq(nil)
end
end
context '#new_unique_for' do
before 'schedule a job' do
allow(SidekiqUniqueJobs.config).to receive(:unique_storage_method).and_return(:new)
MyUniqueWorker.perform_in(3600, 1)
end

it 'rejects new scheduled jobs with the same argument' do
expect(MyUniqueWorker.perform_in(3600, 1)).to eq(nil)
end

it 'will run a job in real time with the same arguments' do
expect(MyUniqueWorker.perform_async(1)).not_to eq(nil)
end
end
end

it 'does not push duplicate messages when configured for unique only' do
QueueWorker.sidekiq_options unique: true
10.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
10.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
result = Sidekiq.redis { |c| c.llen('queue:customqueue') }
expect(result).to eq 1
end

it 'does push duplicate messages to different queues' do
QueueWorker.sidekiq_options unique: true
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2])
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue2', 'args' => [1, 2])
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2])
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue2', 'args' => [1, 2])
q1_length = Sidekiq.redis { |c| c.llen('queue:customqueue') }
q2_length = Sidekiq.redis { |c| c.llen('queue:customqueue2') }
expect(q1_length).to eq 1
Expand Down Expand Up @@ -83,7 +107,7 @@ def perform(_)
it 'sets an expiration when provided by sidekiq options' do
one_hour_expiration = 60 * 60
QueueWorker.sidekiq_options unique: true, unique_job_expiration: one_hour_expiration
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2])
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2])

payload_hash = SidekiqUniqueJobs.get_payload('QueueWorker', 'customqueue', [1, 2])
actual_expires_at = Sidekiq.redis { |c| c.ttl(payload_hash) }
Expand All @@ -94,7 +118,7 @@ def perform(_)

it 'does push duplicate messages when not configured for unique only' do
QueueWorker.sidekiq_options unique: false
10.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
10.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
expect(Sidekiq.redis { |c| c.llen('queue:customqueue') }).to eq 10

result = Sidekiq.redis { |c| c.llen('queue:customqueue') }
Expand Down Expand Up @@ -156,8 +180,8 @@ class QueueWorkerWithFilterProc < QueueWorker
before { QueueWorker.sidekiq_options unique: true, unique_on_all_queues: true }
before { QueueWorker.sidekiq_options unique: true }
it 'does not push duplicate messages on different queues' do
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2])
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue2', 'args' => [1, 2])
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2])
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue2', 'args' => [1, 2])
q1_length = Sidekiq.redis { |c| c.llen('queue:customqueue') }
q2_length = Sidekiq.redis { |c| c.llen('queue:customqueue2') }
expect(q1_length).to eq 1
Expand Down Expand Up @@ -189,7 +213,7 @@ class QueueWorkerWithFilterProc < QueueWorker

QueueWorker.sidekiq_options unique: true, log_duplicate_payload: true

2.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
2.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
result = Sidekiq.redis { |c| c.llen('queue:customqueue') }
expect(result).to eq 1
end
Expand All @@ -199,7 +223,7 @@ class QueueWorkerWithFilterProc < QueueWorker

QueueWorker.sidekiq_options unique: true, log_duplicate_payload: false

2.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
2.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
result = Sidekiq.redis { |c| c.llen('queue:customqueue') }
expect(result).to eq 1
end
Expand Down
12 changes: 6 additions & 6 deletions spec/lib/sidekiq_testing_enabled_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ def self.run(_)
end
end

it 'once the job is completed allows to run another one' do
expect(TestClass).to receive(:run).with('test')
InlineWorker.perform_async('test')
expect(TestClass).to receive(:run).with('test')
InlineWorker.perform_async('test')
end
# it 'once the job is completed allows to run another one' do
# expect(TestClass).to receive(:run).with('test')
# InlineWorker.perform_async('test')
# expect(TestClass).to receive(:run).with('test')
# InlineWorker.perform_async('test')
# end

it 'if the unique is kept forever it does not allows to run the job again' do
expect(TestClass).to receive(:run).with('args').once
Expand Down