Skip to content

Commit

Permalink
OnConflict::Replace: yield when lock was achieved (#640)
Browse files Browse the repository at this point in the history
* OnConflict, return nil

Close #629

The problem originally was that in some situations, a string was returned (for the lock). This should never happen, any conflict strategy must return nil to avoid pretending to be successful.

* Refactor (thanks reek for pointing it out)

* Ensure the replace strategy yields

This should be considered a success

* Adds documentation
  • Loading branch information
mhenrixon authored Sep 27, 2021
1 parent 3c1efa6 commit d4914fb
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 67 deletions.
2 changes: 1 addition & 1 deletion .reek.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ detectors:
exclude:
- Sidekiq::JobSet::UniqueExtension#delete_by_value
- Sidekiq::ScheduledSet::UniqueExtension#delete
- SidekiqUniqueJobs::Lock::BaseLock#call_strategy
- SidekiqUniqueJobs::Lock::BaseLock#strategy_for
- SidekiqUniqueJobs::Orphans::Manager#start
- SidekiqUniqueJobs::Orphans::RubyReaper#active?
- SidekiqUniqueJobs::Redis::Hash#entries
Expand Down
53 changes: 30 additions & 23 deletions lib/sidekiq_unique_jobs/lock/base_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ def locksmith
# @return [Integer] the current locking attempt
attr_reader :attempt

#
# Eases testing by allowing the lock implementation to add the missing
# keys to the job hash.
#
#
# @return [void] the return value should be irrelevant
#
def prepare_item
return if item.key?(LOCK_DIGEST)

Expand All @@ -100,34 +107,22 @@ def prepare_item
end

#
# Handle when lock failed
# Call whatever strategry that has been configured
#
# @param [Symbol] origin either `:client` or `:server`
# @param [Symbol] origin: the origin `:client` or `:server`
#
# @return [void]
# @return [void] the return value is irrelevant
#
def lock_failed(origin: :client)
reflect(:lock_failed, item)
call_strategy(origin: origin)
nil
end

# @yieldparam [void] if a new job id was set and a block is given
# @yieldreturn [void] the yield is irrelevant, it only provides a mechanism in
# one specific situation to yield back to the middleware.
def call_strategy(origin:)
@attempt += 1
new_job_id = nil
strategy = strategy_for(origin)
@attempt += 1

case origin
when :client
client_strategy.call { lock if replace? }
when :server
server_strategy.call { lock if replace? }
else
raise SidekiqUniqueJobs::InvalidArgument,
"either `for: :server` or `for: :client` needs to be specified"
end
end

def replace?
client_strategy.replace? && attempt < 2
strategy.call { new_job_id = lock if strategy.replace? && @attempt < 2 }
yield if new_job_id && block_given?
end

def unlock_and_callback
Expand All @@ -144,6 +139,18 @@ def callback_safely
raise
end

def strategy_for(origin)
case origin
when :client
client_strategy
when :server
server_strategy
else
raise SidekiqUniqueJobs::InvalidArgument,
"#origin needs to be either `:server` or `:client`"
end
end

def client_strategy
@client_strategy ||=
OnConflict.find_strategy(lock_config.on_client_conflict).new(item, redis_pool)
Expand Down
12 changes: 9 additions & 3 deletions lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ class UntilAndWhileExecuting < BaseLock
#
# @yield to the caller when given a block
#
def lock(origin: :client)
return lock_failed(origin: origin) unless (token = locksmith.lock)
return yield token if block_given?
def lock(origin: :client, &block)
unless (token = locksmith.lock)
reflect(:lock_failed, item)
call_strategy(origin: origin, &block)

return
end

yield if block

token
end
Expand Down
12 changes: 9 additions & 3 deletions lib/sidekiq_unique_jobs/lock/until_executed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ class UntilExecuted < BaseLock
#
# @yield to the caller when given a block
#
def lock
return lock_failed(origin: :client) unless (token = locksmith.lock)
return yield token if block_given?
def lock(&block)
unless (token = locksmith.lock)
reflect(:lock_failed, item)
call_strategy(origin: :client, &block)

return
end

yield if block

token
end
Expand Down
14 changes: 10 additions & 4 deletions lib/sidekiq_unique_jobs/lock/until_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ class UntilExecuting < BaseLock
#
# @return [String, nil] the locked jid when properly locked, else nil.
#
def lock
return lock_failed unless (job_id = locksmith.lock)
return yield job_id if block_given?
def lock(&block)
unless (token = locksmith.lock)
reflect(:lock_failed, item)
call_strategy(origin: :client, &block)

job_id
return
end

yield if block

token
end

# Executes in the Sidekiq server process
Expand Down
14 changes: 10 additions & 4 deletions lib/sidekiq_unique_jobs/lock/until_expired.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ class UntilExpired < UntilExecuted
#
# @yield to the caller when given a block
#
def lock
return lock_failed unless (job_id = locksmith.lock)
return yield job_id if block_given?
def lock(&block)
unless (token = locksmith.lock)
reflect(:lock_failed, item)
call_strategy(origin: :client, &block)

job_id
return
end

yield if block

token
end

# Executes in the Sidekiq server process
Expand Down
10 changes: 6 additions & 4 deletions lib/sidekiq_unique_jobs/lock/while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Lock
#
# @author Mikael Henriksson <[email protected]>
class WhileExecuting < BaseLock
RUN_SUFFIX ||= ":RUN"
RUN_SUFFIX = ":RUN"

include SidekiqUniqueJobs::OptionsWithFallback
include SidekiqUniqueJobs::Logging::Middleware
Expand All @@ -30,22 +30,24 @@ def initialize(item, callback, redis_pool = nil)
# @return [true] always returns true
def lock
job_id = item[JID]
yield job_id if block_given?
yield if block_given?

job_id
end

# Executes in the Sidekiq server process.
# These jobs are locked in the server process not from the client
# @yield to the worker class perform method
def execute
def execute(&block)
with_logging_context do
call_strategy(origin: :server) unless locksmith.execute do
executed = locksmith.execute do
yield
callback_safely if locksmith.unlock
ensure
locksmith.unlock
end

call_strategy(origin: :server, &block) unless executed
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/middleware/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def call(*, &block)
private

def lock
lock_instance.lock do |_locked_jid|
lock_instance.lock do
reflect(:locked, item)
return yield
end
Expand Down
20 changes: 0 additions & 20 deletions spec/sidekiq_unique_jobs/lock/base_lock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,6 @@
end
end

describe "#replace?" do
subject(:replace?) { lock.send(:replace?) }

context "when strategy is not :replace" do
let(:strategy) { :log }

it { is_expected.to eq(false) }
end

context "when attempt is less than 2" do
it { is_expected.to eq(true) }
end

context "when attempt is equal to 2" do
before { lock.instance_variable_set(:@attempt, 2) }

it { is_expected.to eq(false) }
end
end

describe "#callback_safely" do
subject(:callback_safely) { lock.send(:callback_safely) }

Expand Down
9 changes: 5 additions & 4 deletions spec/sidekiq_unique_jobs/on_conflict/replace_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
end

it "logs important information" do
call
expect(call).to eq(nil)

expect(strategy).to have_received(:log_info).with("Deleted job: #{jid}")
expect(strategy).to have_received(:log_info).with("Deleted `9` keys for #{lock_digest}")
Expand All @@ -56,7 +56,7 @@
end

it "logs important information" do
call
expect(call).to eq(nil)

expect(strategy).to have_received(:log_info).with("Deleted job: #{jid}")
expect(strategy).not_to have_received(:log_info).with("Deleted `` keys for #{lock_digest}")
Expand All @@ -74,7 +74,7 @@
end

it "does not call block" do
call
expect(call).to eq(nil)
expect(block).not_to have_received(:call)
end
end
Expand Down Expand Up @@ -107,6 +107,7 @@

it "removes the job from the scheduled set" do
expect { call }.to change { schedule_count }.from(1).to(0)
expect(call).to eq(nil)
expect(block).to have_received(:call)
end
end
Expand All @@ -116,7 +117,7 @@

it "removes the job from the queue" do
expect { call }.to change { queue_count(:customqueue) }.from(1).to(0)

expect(call).to eq(nil)
expect(block).to have_received(:call)
end
end
Expand Down
20 changes: 20 additions & 0 deletions spec/support/workers/until_and_while_executing_reject_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true

# :nocov:

class UntilAndWhileExecutingRejectJob
include Sidekiq::Worker

sidekiq_options lock: :until_and_while_executing,
queue: :working,
on_conflict: {
client: :reject,
server: :reject,
}

def self.lock_args(args)
[args[0]]
end

def perform(key); end
end
20 changes: 20 additions & 0 deletions spec/support/workers/until_and_while_executing_replace_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true

# :nocov:

class UntilAndWhileExecutingReplaceJob
include Sidekiq::Worker

sidekiq_options lock: :until_and_while_executing,
queue: :working,
on_conflict: {
client: :replace,
server: :reschedule,
}

def self.lock_args(args)
[args[0]]
end

def perform(key); end
end
40 changes: 40 additions & 0 deletions spec/workers/until_and_while_executing_reject_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true

RSpec.describe UntilAndWhileExecutingRejectJob do
it_behaves_like "sidekiq with options" do
let(:options) do
{
"queue" => :working,
"retry" => true,
"lock" => :until_and_while_executing,
"on_conflict" => { client: :reject, server: :reject },
}
end
end

it "rejects the job successfully" do
Sidekiq::Testing.disable! do
set = Sidekiq::ScheduledSet.new

described_class.perform_at(Time.now + 30, 1)
expect(set.size).to eq(1)

expect(described_class.perform_at(Time.now + 30, 1)).to be_nil

set.each(&:delete)
end
end

it "rejects job successfully when using perform_in" do
Sidekiq::Testing.disable! do
set = Sidekiq::ScheduledSet.new

described_class.perform_in(30, 1)
expect(set.size).to eq(1)

expect(described_class.perform_in(30, 1)).to be_nil

set.each(&:delete)
end
end
end
Loading

0 comments on commit d4914fb

Please sign in to comment.