Skip to content

Commit

Permalink
Separate client and server on_conflict
Browse files Browse the repository at this point in the history
Close #402
  • Loading branch information
mhenrixon committed Nov 28, 2019
1 parent 233112a commit ad43437
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 33 deletions.
37 changes: 15 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,15 @@
- [While Executing](#while-executing)
- [Custom Locks](#custom-locks)
- [Conflict Strategy](#conflict-strategy)
- [log](#log)
- [raise](#raise)
- [reject](#reject)
- [replace](#replace)
- [Reschedule](#reschedule)
- [Custom Strategies](#custom-strategies)
- [Usage](#usage)
- [Finer Control over Uniqueness](#finer-control-over-uniqueness)
- [After Unlock Callback](#after-unlock-callback)
- [Logging](#logging)
- [Cleanup Dead Locks](#cleanup-dead-locks)
- [Other Sidekiq gems](#other-sidekiq-gems)
- [sidekiq-global_id](#sidekiq-global_id)
- [Debugging](#debugging)
- [Sidekiq Web](#sidekiq-web)
- [Show Locks](#show-locks)
- [Show Lock](#show-lock)
- [Communication](#communication)
- [Testing](#testing)
- [Unique Sidekiq Configuration](#unique-sidekiq-configuration)
- [Uniqueness](#uniqueness)
- [lib/strategies/my_custom_strategy.rb](#libstrategiesmycustomstrategyrb)
- [For rails application](#for-rails-application)
- [config/initializers/sidekiq_unique_jobs.rb](#configinitializerssidekiquniquejobsrb)
- [For other projects, whenever you prefer](#for-other-projects-whenever-you-prefer)
- [this goes in your initializer](#this-goes-in-your-initializer)
- [app/config/routes.rb](#appconfigroutesrb)
- [app/workers/bad_worker.rb](#appworkersbad_workerrb)
- [spec/workers/bad_worker_spec.rb](#specworkersbadworkerspecrb)
- [OR](#or)
- [Contributing](#contributing)
- [Contributors](#contributors)

Expand Down Expand Up @@ -415,7 +403,12 @@ Please not that if you try to override a default lock, an `ArgumentError` will b

Decides how we handle conflict. We can either reject the job to the dead queue or reschedule it. Both are useful for jobs that absolutely need to run and have been configured to use the lock `WhileExecuting` that is used only by the sidekiq server process.

The last one is log which can be be used with the lock `UntilExecuted` and `UntilExpired`. Now we write a log entry saying the job could not be pushed because it is a duplicate of another job with the same arguments
The last one is log which can be be used with the lock `UntilExecuted` and `UntilExpired`. Now we write a log entry saying the job could not be pushed because it is a duplicate of another job with the same arguments.

It is possible for locks to have different conflict strategy for the client and server. This is useful for `:until_and_while_executing`.

```ruby
sidekiq_options lock: :until_and_while_executing, on_conflict: { client: :log, server: :reject }

### log

Expand Down
19 changes: 14 additions & 5 deletions lib/sidekiq_unique_jobs/lock/base_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def initialize(item, callback, redis_pool = nil)
@redis_pool = redis_pool
@attempt = 0
add_uniqueness_when_missing # Used to ease testing
@lock_config = LockConfig.new(item)
end

#
Expand All @@ -41,7 +42,6 @@ def initialize(item, callback, redis_pool = nil)
# @yield to the caller when given a block
#
def lock(&block)
# TODO: only use replace strategy when server is executing the lock
return call_strategy unless (locked_token = locksmith.lock(&block))

locked_token
Expand Down Expand Up @@ -100,16 +100,19 @@ def add_uniqueness_when_missing

def call_strategy
@attempt += 1
strategy.call { lock if replace? }
client_strategy.call { lock if replace? }
end

def replace?
strategy.replace? && attempt < 2
client_strategy.replace? && attempt < 2
end

# @!attribute [r] item
# @return [Hash<String, Object>] the Sidekiq job hash
attr_reader :item
# @!attribute [r] lock_config
# @return [LockConfig] a lock configuration
attr_reader :lock_config
# @!attribute [r] redis_pool
# @return [Sidekiq::RedisConnection, ConnectionPool, NilClass] the redis connection
attr_reader :redis_pool
Expand All @@ -135,8 +138,14 @@ def callback_safely
raise
end

def strategy
@strategy ||= OnConflict.find_strategy(item[ON_CONFLICT]).new(item, redis_pool)
def client_strategy
@client_strategy ||=
OnConflict.find_strategy(lock_config.on_client_conflict).new(item, redis_pool)
end

def server_strategy
@server_strategy ||=
OnConflict.find_strategy(lock_config.on_server_conflict).new(item, redis_pool)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/lock/while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def lock
# @yield to the worker class perform method
def execute
with_logging_context do
strategy.call unless locksmith.lock do
server_strategy&.call unless locksmith.lock do
yield
callback_safely
end
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq_unique_jobs/lock/while_executing_reject.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ class Lock
class WhileExecutingReject < WhileExecuting
# Overridden with a forced {OnConflict::Reject} strategy
# @return [OnConflict::Reject] a reject strategy
def strategy
@strategy ||= OnConflict.find_strategy(:reject).new(item, redis_pool)
def server_strategy
@server_strategy ||= OnConflict.find_strategy(:reject).new(item, redis_pool)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@

before do
allow(strategy).to receive(:deadset_kill?).and_return(false)
allow(process_two).to receive(:strategy).and_return(strategy)
allow(process_two).to receive(:server_strategy).and_return(strategy)
end

it_behaves_like "rejects job to deadset"
Expand Down
4 changes: 2 additions & 2 deletions spec/sidekiq_unique_jobs/lock/while_executing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
let(:callback_one) { -> { true } }
let(:callback_two) { nil }

let(:strategy_one) { process_one.send(:strategy) }
let(:strategy_two) { process_two.send(:strategy) }
let(:strategy_one) { process_one.send(:server_strategy) }
let(:strategy_two) { process_two.send(:server_strategy) }

before do
allow(strategy_one).to receive(:call).and_call_original
Expand Down

0 comments on commit ad43437

Please sign in to comment.