diff --git a/.reek.yml b/.reek.yml index 4a5a941c..a4384dc3 100644 --- a/.reek.yml +++ b/.reek.yml @@ -18,7 +18,7 @@ detectors: exclude: - Sidekiq::JobSet::UniqueExtension#delete_by_value - Sidekiq::ScheduledSet::UniqueExtension#delete - - SidekiqUniqueJobs::Lock::BaseLock#call_strategy + - SidekiqUniqueJobs::Lock::BaseLock#strategy_for - SidekiqUniqueJobs::Orphans::Manager#start - SidekiqUniqueJobs::Orphans::RubyReaper#active? - SidekiqUniqueJobs::Redis::Hash#entries diff --git a/lib/sidekiq_unique_jobs/lock/base_lock.rb b/lib/sidekiq_unique_jobs/lock/base_lock.rb index c78f6a5e..d0420a6f 100644 --- a/lib/sidekiq_unique_jobs/lock/base_lock.rb +++ b/lib/sidekiq_unique_jobs/lock/base_lock.rb @@ -91,6 +91,13 @@ def locksmith # @return [Integer] the current locking attempt attr_reader :attempt + # + # Eases testing by allowing the lock implementation to add the missing + # keys to the job hash. + # + # + # @return [void] the return value should be irrelevant + # def prepare_item return if item.key?(LOCK_DIGEST) @@ -100,34 +107,22 @@ def prepare_item end # - # Handle when lock failed + # Call whatever strategry that has been configured # - # @param [Symbol] origin either `:client` or `:server` + # @param [Symbol] origin: the origin `:client` or `:server` # - # @return [void] + # @return [void] the return value is irrelevant # - def lock_failed(origin: :client) - reflect(:lock_failed, item) - call_strategy(origin: origin) - nil - end - + # @yieldparam [void] if a new job id was set and a block is given + # @yieldreturn [void] the yield is irrelevant, it only provides a mechanism in + # one specific situation to yield back to the middleware. def call_strategy(origin:) - @attempt += 1 + new_job_id = nil + strategy = strategy_for(origin) + @attempt += 1 - case origin - when :client - client_strategy.call { lock if replace? } - when :server - server_strategy.call { lock if replace? } - else - raise SidekiqUniqueJobs::InvalidArgument, - "either `for: :server` or `for: :client` needs to be specified" - end - end - - def replace? - client_strategy.replace? && attempt < 2 + strategy.call { new_job_id = lock if strategy.replace? && @attempt < 2 } + yield if new_job_id && block_given? end def unlock_and_callback @@ -144,6 +139,18 @@ def callback_safely raise end + def strategy_for(origin) + case origin + when :client + client_strategy + when :server + server_strategy + else + raise SidekiqUniqueJobs::InvalidArgument, + "#origin needs to be either `:server` or `:client`" + end + end + def client_strategy @client_strategy ||= OnConflict.find_strategy(lock_config.on_client_conflict).new(item, redis_pool) diff --git a/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb b/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb index ced00745..cb167232 100644 --- a/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb @@ -22,9 +22,15 @@ class UntilAndWhileExecuting < BaseLock # # @yield to the caller when given a block # - def lock(origin: :client) - return lock_failed(origin: origin) unless (token = locksmith.lock) - return yield token if block_given? + def lock(origin: :client, &block) + unless (token = locksmith.lock) + reflect(:lock_failed, item) + call_strategy(origin: origin, &block) + + return + end + + yield if block token end diff --git a/lib/sidekiq_unique_jobs/lock/until_executed.rb b/lib/sidekiq_unique_jobs/lock/until_executed.rb index 01a8ec61..b90467a3 100644 --- a/lib/sidekiq_unique_jobs/lock/until_executed.rb +++ b/lib/sidekiq_unique_jobs/lock/until_executed.rb @@ -17,9 +17,15 @@ class UntilExecuted < BaseLock # # @yield to the caller when given a block # - def lock - return lock_failed(origin: :client) unless (token = locksmith.lock) - return yield token if block_given? + def lock(&block) + unless (token = locksmith.lock) + reflect(:lock_failed, item) + call_strategy(origin: :client, &block) + + return + end + + yield if block token end diff --git a/lib/sidekiq_unique_jobs/lock/until_executing.rb b/lib/sidekiq_unique_jobs/lock/until_executing.rb index 745136ea..7903b356 100644 --- a/lib/sidekiq_unique_jobs/lock/until_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/until_executing.rb @@ -15,11 +15,17 @@ class UntilExecuting < BaseLock # # @return [String, nil] the locked jid when properly locked, else nil. # - def lock - return lock_failed unless (job_id = locksmith.lock) - return yield job_id if block_given? + def lock(&block) + unless (token = locksmith.lock) + reflect(:lock_failed, item) + call_strategy(origin: :client, &block) - job_id + return + end + + yield if block + + token end # Executes in the Sidekiq server process diff --git a/lib/sidekiq_unique_jobs/lock/until_expired.rb b/lib/sidekiq_unique_jobs/lock/until_expired.rb index f498dfab..1b39f26d 100644 --- a/lib/sidekiq_unique_jobs/lock/until_expired.rb +++ b/lib/sidekiq_unique_jobs/lock/until_expired.rb @@ -17,11 +17,17 @@ class UntilExpired < UntilExecuted # # @yield to the caller when given a block # - def lock - return lock_failed unless (job_id = locksmith.lock) - return yield job_id if block_given? + def lock(&block) + unless (token = locksmith.lock) + reflect(:lock_failed, item) + call_strategy(origin: :client, &block) - job_id + return + end + + yield if block + + token end # Executes in the Sidekiq server process diff --git a/lib/sidekiq_unique_jobs/lock/while_executing.rb b/lib/sidekiq_unique_jobs/lock/while_executing.rb index df6a9188..713c73d3 100644 --- a/lib/sidekiq_unique_jobs/lock/while_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/while_executing.rb @@ -11,7 +11,7 @@ class Lock # # @author Mikael Henriksson class WhileExecuting < BaseLock - RUN_SUFFIX ||= ":RUN" + RUN_SUFFIX = ":RUN" include SidekiqUniqueJobs::OptionsWithFallback include SidekiqUniqueJobs::Logging::Middleware @@ -30,7 +30,7 @@ def initialize(item, callback, redis_pool = nil) # @return [true] always returns true def lock job_id = item[JID] - yield job_id if block_given? + yield if block_given? job_id end @@ -38,14 +38,16 @@ def lock # Executes in the Sidekiq server process. # These jobs are locked in the server process not from the client # @yield to the worker class perform method - def execute + def execute(&block) with_logging_context do - call_strategy(origin: :server) unless locksmith.execute do + executed = locksmith.execute do yield callback_safely if locksmith.unlock ensure locksmith.unlock end + + call_strategy(origin: :server, &block) unless executed end end diff --git a/lib/sidekiq_unique_jobs/middleware/client.rb b/lib/sidekiq_unique_jobs/middleware/client.rb index 5b73daf9..ea033903 100644 --- a/lib/sidekiq_unique_jobs/middleware/client.rb +++ b/lib/sidekiq_unique_jobs/middleware/client.rb @@ -30,7 +30,7 @@ def call(*, &block) private def lock - lock_instance.lock do |_locked_jid| + lock_instance.lock do reflect(:locked, item) return yield end diff --git a/spec/sidekiq_unique_jobs/lock/base_lock_spec.rb b/spec/sidekiq_unique_jobs/lock/base_lock_spec.rb index 7eb8335a..05cab9fa 100644 --- a/spec/sidekiq_unique_jobs/lock/base_lock_spec.rb +++ b/spec/sidekiq_unique_jobs/lock/base_lock_spec.rb @@ -24,26 +24,6 @@ end end - describe "#replace?" do - subject(:replace?) { lock.send(:replace?) } - - context "when strategy is not :replace" do - let(:strategy) { :log } - - it { is_expected.to eq(false) } - end - - context "when attempt is less than 2" do - it { is_expected.to eq(true) } - end - - context "when attempt is equal to 2" do - before { lock.instance_variable_set(:@attempt, 2) } - - it { is_expected.to eq(false) } - end - end - describe "#callback_safely" do subject(:callback_safely) { lock.send(:callback_safely) } diff --git a/spec/sidekiq_unique_jobs/on_conflict/replace_spec.rb b/spec/sidekiq_unique_jobs/on_conflict/replace_spec.rb index 7bb16229..de4850c6 100644 --- a/spec/sidekiq_unique_jobs/on_conflict/replace_spec.rb +++ b/spec/sidekiq_unique_jobs/on_conflict/replace_spec.rb @@ -39,7 +39,7 @@ end it "logs important information" do - call + expect(call).to eq(nil) expect(strategy).to have_received(:log_info).with("Deleted job: #{jid}") expect(strategy).to have_received(:log_info).with("Deleted `9` keys for #{lock_digest}") @@ -56,7 +56,7 @@ end it "logs important information" do - call + expect(call).to eq(nil) expect(strategy).to have_received(:log_info).with("Deleted job: #{jid}") expect(strategy).not_to have_received(:log_info).with("Deleted `` keys for #{lock_digest}") @@ -74,7 +74,7 @@ end it "does not call block" do - call + expect(call).to eq(nil) expect(block).not_to have_received(:call) end end @@ -107,6 +107,7 @@ it "removes the job from the scheduled set" do expect { call }.to change { schedule_count }.from(1).to(0) + expect(call).to eq(nil) expect(block).to have_received(:call) end end @@ -116,7 +117,7 @@ it "removes the job from the queue" do expect { call }.to change { queue_count(:customqueue) }.from(1).to(0) - + expect(call).to eq(nil) expect(block).to have_received(:call) end end diff --git a/spec/support/workers/until_and_while_executing_reject_job.rb b/spec/support/workers/until_and_while_executing_reject_job.rb new file mode 100644 index 00000000..4e9c2133 --- /dev/null +++ b/spec/support/workers/until_and_while_executing_reject_job.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +# :nocov: + +class UntilAndWhileExecutingRejectJob + include Sidekiq::Worker + + sidekiq_options lock: :until_and_while_executing, + queue: :working, + on_conflict: { + client: :reject, + server: :reject, + } + + def self.lock_args(args) + [args[0]] + end + + def perform(key); end +end diff --git a/spec/support/workers/until_and_while_executing_replace_job.rb b/spec/support/workers/until_and_while_executing_replace_job.rb new file mode 100644 index 00000000..c4d84c89 --- /dev/null +++ b/spec/support/workers/until_and_while_executing_replace_job.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +# :nocov: + +class UntilAndWhileExecutingReplaceJob + include Sidekiq::Worker + + sidekiq_options lock: :until_and_while_executing, + queue: :working, + on_conflict: { + client: :replace, + server: :reschedule, + } + + def self.lock_args(args) + [args[0]] + end + + def perform(key); end +end diff --git a/spec/workers/until_and_while_executing_reject_job_spec.rb b/spec/workers/until_and_while_executing_reject_job_spec.rb new file mode 100644 index 00000000..3f9f2b1c --- /dev/null +++ b/spec/workers/until_and_while_executing_reject_job_spec.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +RSpec.describe UntilAndWhileExecutingRejectJob do + it_behaves_like "sidekiq with options" do + let(:options) do + { + "queue" => :working, + "retry" => true, + "lock" => :until_and_while_executing, + "on_conflict" => { client: :reject, server: :reject }, + } + end + end + + it "rejects the job successfully" do + Sidekiq::Testing.disable! do + set = Sidekiq::ScheduledSet.new + + described_class.perform_at(Time.now + 30, 1) + expect(set.size).to eq(1) + + expect(described_class.perform_at(Time.now + 30, 1)).to be_nil + + set.each(&:delete) + end + end + + it "rejects job successfully when using perform_in" do + Sidekiq::Testing.disable! do + set = Sidekiq::ScheduledSet.new + + described_class.perform_in(30, 1) + expect(set.size).to eq(1) + + expect(described_class.perform_in(30, 1)).to be_nil + + set.each(&:delete) + end + end +end diff --git a/spec/workers/until_and_while_executing_replace_job_spec.rb b/spec/workers/until_and_while_executing_replace_job_spec.rb new file mode 100644 index 00000000..13d889bc --- /dev/null +++ b/spec/workers/until_and_while_executing_replace_job_spec.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +RSpec.describe UntilAndWhileExecutingReplaceJob do + it_behaves_like "sidekiq with options" do + let(:options) do + { + "queue" => :working, + "retry" => true, + "lock" => :until_and_while_executing, + "on_conflict" => { client: :replace, server: :reschedule }, + } + end + end + + it "replaces the previous job successfully" do + Sidekiq::Testing.disable! do + set = Sidekiq::ScheduledSet.new + + described_class.perform_at(Time.now + 30, "unique", "first argument") + expect(set.size).to eq(1) + expect(set.first.item["args"]).to eq(["unique", "first argument"]) + + job_id = described_class.perform_at(Time.now + 30, "unique", "new argument") + expect(job_id).not_to be_nil + expect(set.size).to eq(1) + expect(set.first.item["args"]).to eq(["unique", "new argument"]) + + set.each(&:delete) + end + end + + it "replaces the previous job successfully when using perform_in" do + Sidekiq::Testing.disable! do + set = Sidekiq::ScheduledSet.new + + described_class.perform_in(30, "unique", "first argument") + expect(set.size).to eq(1) + expect(set.first.item["args"]).to eq(["unique", "first argument"]) + + job_id = described_class.perform_in(30, "unique", "new argument") + expect(job_id).not_to be_nil + expect(set.size).to eq(1) + expect(set.first.item["args"]).to eq(["unique", "new argument"]) + + set.each(&:delete) + end + end +end