diff --git a/README.md b/README.md index c5525077d..0e0f65366 100644 --- a/README.md +++ b/README.md @@ -33,27 +33,15 @@ - [While Executing](#while-executing) - [Custom Locks](#custom-locks) - [Conflict Strategy](#conflict-strategy) - - [log](#log) - - [raise](#raise) - - [reject](#reject) - - [replace](#replace) - - [Reschedule](#reschedule) - - [Custom Strategies](#custom-strategies) -- [Usage](#usage) - - [Finer Control over Uniqueness](#finer-control-over-uniqueness) - - [After Unlock Callback](#after-unlock-callback) - - [Logging](#logging) - - [Cleanup Dead Locks](#cleanup-dead-locks) - - [Other Sidekiq gems](#other-sidekiq-gems) - - [sidekiq-global_id](#sidekiq-global_id) -- [Debugging](#debugging) - - [Sidekiq Web](#sidekiq-web) - - [Show Locks](#show-locks) - - [Show Lock](#show-lock) -- [Communication](#communication) -- [Testing](#testing) - - [Unique Sidekiq Configuration](#unique-sidekiq-configuration) - - [Uniqueness](#uniqueness) +- [lib/strategies/my_custom_strategy.rb](#libstrategiesmycustomstrategyrb) +- [For rails application](#for-rails-application) +- [config/initializers/sidekiq_unique_jobs.rb](#configinitializerssidekiquniquejobsrb) +- [For other projects, whenever you prefer](#for-other-projects-whenever-you-prefer) +- [this goes in your initializer](#this-goes-in-your-initializer) +- [app/config/routes.rb](#appconfigroutesrb) +- [app/workers/bad_worker.rb](#appworkersbad_workerrb) +- [spec/workers/bad_worker_spec.rb](#specworkersbadworkerspecrb) +- [OR](#or) - [Contributing](#contributing) - [Contributors](#contributors) @@ -415,7 +403,12 @@ Please not that if you try to override a default lock, an `ArgumentError` will b Decides how we handle conflict. We can either reject the job to the dead queue or reschedule it. Both are useful for jobs that absolutely need to run and have been configured to use the lock `WhileExecuting` that is used only by the sidekiq server process. -The last one is log which can be be used with the lock `UntilExecuted` and `UntilExpired`. Now we write a log entry saying the job could not be pushed because it is a duplicate of another job with the same arguments +The last one is log which can be be used with the lock `UntilExecuted` and `UntilExpired`. Now we write a log entry saying the job could not be pushed because it is a duplicate of another job with the same arguments. + +It is possible for locks to have different conflict strategy for the client and server. This is useful for `:until_and_while_executing`. + +```ruby +sidekiq_options lock: :until_and_while_executing, on_conflict: { client: :log, server: :reject } ### log diff --git a/lib/sidekiq_unique_jobs/lock/base_lock.rb b/lib/sidekiq_unique_jobs/lock/base_lock.rb index 407462acf..61d8f30d4 100644 --- a/lib/sidekiq_unique_jobs/lock/base_lock.rb +++ b/lib/sidekiq_unique_jobs/lock/base_lock.rb @@ -29,6 +29,7 @@ def initialize(item, callback, redis_pool = nil) @redis_pool = redis_pool @attempt = 0 add_uniqueness_when_missing # Used to ease testing + @lock_config = LockConfig.new(item) end # @@ -41,7 +42,6 @@ def initialize(item, callback, redis_pool = nil) # @yield to the caller when given a block # def lock(&block) - # TODO: only use replace strategy when server is executing the lock return call_strategy unless (locked_token = locksmith.lock(&block)) locked_token @@ -100,16 +100,19 @@ def add_uniqueness_when_missing def call_strategy @attempt += 1 - strategy.call { lock if replace? } + client_strategy.call { lock if replace? } end def replace? - strategy.replace? && attempt < 2 + client_strategy.replace? && attempt < 2 end # @!attribute [r] item # @return [Hash] the Sidekiq job hash attr_reader :item + # @!attribute [r] lock_config + # @return [LockConfig] a lock configuration + attr_reader :lock_config # @!attribute [r] redis_pool # @return [Sidekiq::RedisConnection, ConnectionPool, NilClass] the redis connection attr_reader :redis_pool @@ -135,8 +138,14 @@ def callback_safely raise end - def strategy - @strategy ||= OnConflict.find_strategy(item[ON_CONFLICT]).new(item, redis_pool) + def client_strategy + @client_strategy ||= + OnConflict.find_strategy(lock_config.on_client_conflict).new(item, redis_pool) + end + + def server_strategy + @server_strategy ||= + OnConflict.find_strategy(lock_config.on_server_conflict).new(item, redis_pool) end end end diff --git a/lib/sidekiq_unique_jobs/lock/while_executing.rb b/lib/sidekiq_unique_jobs/lock/while_executing.rb index a2ffe926f..cccfd03de 100644 --- a/lib/sidekiq_unique_jobs/lock/while_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/while_executing.rb @@ -37,7 +37,7 @@ def lock # @yield to the worker class perform method def execute with_logging_context do - strategy.call unless locksmith.lock do + server_strategy&.call unless locksmith.lock do yield callback_safely end diff --git a/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb b/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb index 5cf7422c7..8739ff99e 100644 --- a/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb +++ b/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb @@ -13,8 +13,8 @@ class Lock class WhileExecutingReject < WhileExecuting # Overridden with a forced {OnConflict::Reject} strategy # @return [OnConflict::Reject] a reject strategy - def strategy - @strategy ||= OnConflict.find_strategy(:reject).new(item, redis_pool) + def server_strategy + @server_strategy ||= OnConflict.find_strategy(:reject).new(item, redis_pool) end end end diff --git a/spec/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb b/spec/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb index c23758492..cba222039 100644 --- a/spec/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb +++ b/spec/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb @@ -65,7 +65,7 @@ before do allow(strategy).to receive(:deadset_kill?).and_return(false) - allow(process_two).to receive(:strategy).and_return(strategy) + allow(process_two).to receive(:server_strategy).and_return(strategy) end it_behaves_like "rejects job to deadset" diff --git a/spec/sidekiq_unique_jobs/lock/while_executing_spec.rb b/spec/sidekiq_unique_jobs/lock/while_executing_spec.rb index bcc56ae6b..3b5cb5210 100644 --- a/spec/sidekiq_unique_jobs/lock/while_executing_spec.rb +++ b/spec/sidekiq_unique_jobs/lock/while_executing_spec.rb @@ -72,8 +72,8 @@ let(:callback_one) { -> { true } } let(:callback_two) { nil } - let(:strategy_one) { process_one.send(:strategy) } - let(:strategy_two) { process_two.send(:strategy) } + let(:strategy_one) { process_one.send(:server_strategy) } + let(:strategy_two) { process_two.send(:server_strategy) } before do allow(strategy_one).to receive(:call).and_call_original