diff --git a/.reek.yml b/.reek.yml index c3df9fbcd..b164fd4f4 100644 --- a/.reek.yml +++ b/.reek.yml @@ -51,6 +51,7 @@ detectors: - SidekiqUniqueJobs::Logging#debug_item - SidekiqUniqueJobs::NotUniqueWorker#initialize - SidekiqUniqueJobs::OnConflict::Reject#push_to_deadset + - SidekiqUniqueJobs::Orphans::Reaper#active? - SidekiqUniqueJobs::Orphans::Reaper#entries - SidekiqUniqueJobs::SidekiqWorkerMethods#worker_class_constantize - SidekiqUniqueJobs::Web::Helpers#cparams @@ -90,6 +91,7 @@ detectors: - SidekiqUniqueJobs::Locksmith#create_lock - SidekiqUniqueJobs::Middleware#self.configure_client - SidekiqUniqueJobs::Middleware#self.configure_server + - SidekiqUniqueJobs::Orphans::Reaper#active? - SidekiqUniqueJobs::Orphans::Reaper#enqueued? - SidekiqUniqueJobs::UpgradeLocks#keys_for_digest RepeatedConditional: @@ -124,6 +126,7 @@ detectors: - SidekiqUniqueJobs::Middleware#call - SidekiqUniqueJobs::Middleware#self.configure_server - SidekiqUniqueJobs::Orphans::Manager#start + - SidekiqUniqueJobs::Orphans::Reaper#active? - SidekiqUniqueJobs::Orphans::Reaper#enqueued? - SidekiqUniqueJobs::Orphans::Reaper#entries - SidekiqUniqueJobs::Profiler#self.stop @@ -144,6 +147,7 @@ detectors: - SidekiqUniqueJobs::Lock::BaseLock - SidekiqUniqueJobs::Locksmith - SidekiqUniqueJobs::Lock + - SidekiqUniqueJobs::Orphans::Reaper UncommunicativeVariableName: exclude: - Hash#slice diff --git a/lib/sidekiq_unique_jobs/lock_digest.rb b/lib/sidekiq_unique_jobs/lock_digest.rb index 8940fa8ac..b587c6adb 100644 --- a/lib/sidekiq_unique_jobs/lock_digest.rb +++ b/lib/sidekiq_unique_jobs/lock_digest.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require 'openssl' +require "openssl" module SidekiqUniqueJobs # Handles uniqueness of sidekiq arguments diff --git a/lib/sidekiq_unique_jobs/lua/reap_orphans.lua b/lib/sidekiq_unique_jobs/lua/reap_orphans.lua index 38cbf9011..db1cbb095 100644 --- a/lib/sidekiq_unique_jobs/lua/reap_orphans.lua +++ b/lib/sidekiq_unique_jobs/lua/reap_orphans.lua @@ -23,6 +23,7 @@ local redisversion = ARGV[6] <%= include_partial "shared/_common.lua" %> <%= include_partial "shared/_find_digest_in_queues.lua" %> <%= include_partial "shared/_find_digest_in_sorted_set.lua" %> +<%= include_partial "shared/_find_digest_in_process_set.lua" %> ---------- END local functions ---------- @@ -61,6 +62,16 @@ repeat end end + -- TODO: Add check for jobs checked out by process + if found ~= true then + log_debug("Searching for digest:", digest, "in process sets") + local queue = find_digest_in_process_set(digest) + if queue then + log_debug("found digest:", digest, "in queue:", queue) + found = true + end + end + if found ~= true then local queued = digest .. ":QUEUED" local primed = digest .. ":PRIMED" diff --git a/lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_process_set.lua b/lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_process_set.lua new file mode 100644 index 000000000..3c79b7c1a --- /dev/null +++ b/lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_process_set.lua @@ -0,0 +1,35 @@ +local function find_digest_in_process_set(digest) + local process_cursor = 0 + local job_cursor = 0 + local pattern = "*" .. digest .. "*" + local found = false + + log_debug("searching in list processes:", + "for digest:", digest, + "cursor:", process_cursor) + + repeat + local process_paginator = redis.call("SSCAN", "processes", process_cursor, "MATCH", "*") + local next_process_cursor = process_paginator[1] + local processes = process_paginator[2] + log_debug("Found number of processes:", #processes, "next cursor:", next_process_cursor) + + for _, process in ipairs(processes) do + log_debug("searching in process set:", process, + "for digest:", digest, + "cursor:", process_cursor) + + local job = redis.call("HGET", process, "info") + + if string.find(job, digest) then + log_debug("Found digest", digest, "in process:", process) + found = true + break + end + end + + process_cursor = next_process_cursor + until found == true or process_cursor == "0" + + return found +end diff --git a/lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_queues.lua b/lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_queues.lua index 2a07bfe24..6486e3bf3 100644 --- a/lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_queues.lua +++ b/lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_queues.lua @@ -6,7 +6,7 @@ local function find_digest_in_queues(digest) repeat log_debug("searching all queues for a matching digest:", digest) - local pagination = redis.call("SCAN", cursor, "MATCH", "*queue:*", "COUNT", count) + local pagination = redis.call("SCAN", cursor, "MATCH", "queue:*", "COUNT", count) local next_cursor = pagination[1] local queues = pagination[2] diff --git a/lib/sidekiq_unique_jobs/orphans/reaper.rb b/lib/sidekiq_unique_jobs/orphans/reaper.rb index 6a7cdd944..25578981b 100644 --- a/lib/sidekiq_unique_jobs/orphans/reaper.rb +++ b/lib/sidekiq_unique_jobs/orphans/reaper.rb @@ -9,10 +9,12 @@ module Orphans # # @author Mikael Henriksson # + # rubocop:disable Metrics/ClassLength class Reaper include SidekiqUniqueJobs::Connection include SidekiqUniqueJobs::Script::Caller include SidekiqUniqueJobs::Logging + include SidekiqUniqueJobs::JSON # # Execute deletion of orphaned digests @@ -141,7 +143,7 @@ def orphans # @return [false] when no job was found for this digest # def belongs_to_job?(digest) - scheduled?(digest) || retried?(digest) || enqueued?(digest) + scheduled?(digest) || retried?(digest) || enqueued?(digest) || active?(digest) end # @@ -185,6 +187,20 @@ def enqueued?(digest) end end + def active?(digest) + Sidekiq.redis do |conn| + procs = conn.sscan_each("processes").to_a.sort + + result = conn.pipelined do + procs.map do |key| + conn.hget(key, "info") + end + end + + result.flatten.compact.any? { |job| load_json(job)[LOCK_DIGEST] == digest } + end + end + # # Loops through all the redis queues and yields them one by one # @@ -233,5 +249,6 @@ def in_sorted_set?(key, digest) conn.zscan_each(key, match: "*#{digest}*", count: 1).to_a.any? end end + # rubocop:enable Metrics/ClassLength end end diff --git a/lib/sidekiq_unique_jobs/web.rb b/lib/sidekiq_unique_jobs/web.rb index 08d75868d..16e7377c6 100644 --- a/lib/sidekiq_unique_jobs/web.rb +++ b/lib/sidekiq_unique_jobs/web.rb @@ -2,7 +2,7 @@ begin require "sidekiq/web" -rescue LoadError # rubocop:disable Lint/SuppressedException +rescue LoadError # client-only usage end diff --git a/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb b/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb index e3aa763a7..e233fe337 100644 --- a/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb @@ -101,4 +101,33 @@ end end end + + context "when digest exists in a a process set" do + context "without job" do + it "keeps the digest" do + expect { reap_orphans }.to change { digests.count }.by(-1) + expect(unique_keys).to match_array([]) + end + end + + context "with job" do + let(:process_key) { "worker-id" } + + before do + SidekiqUniqueJobs.redis do |conn| + conn.multi do + conn.sadd("processes", process_key) + conn.exists(process_key) + conn.hmset(process_key, "info", Sidekiq.dump_json(item), "busy", 1, "beat", Time.now.to_f, "quiet", false) + conn.expire(process_key, 60) + end + end + end + + it "keeps the digest" do + expect { reap_orphans }.not_to change { digests.count }.from(1) + expect(unique_keys).not_to match_array([]) + end + end + end end diff --git a/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb b/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb index 87d751062..298ff5ddb 100644 --- a/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb +++ b/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb @@ -140,6 +140,35 @@ end end end + + context "when processing" do + context "without job in process" do + it "deletes the digest" do + expect { call }.to change { digests.count }.by(-1) + expect(unique_keys).to match_array([]) + end + end + + context "with job in process" do + let(:process_key) { "worker-id" } + + before do + SidekiqUniqueJobs.redis do |conn| + conn.multi do + conn.sadd("processes", process_key) + conn.exists(process_key) + conn.hmset(process_key, "info", Sidekiq.dump_json(item), "busy", 1, "beat", Time.now.to_f, "quiet", false) + conn.expire(process_key, 60) + end + end + end + + it "keeps the digest" do + expect { call }.not_to change { digests.count }.from(1) + expect(unique_keys).not_to match_array([]) + end + end + end end context "when config.reaper = :ruby" do diff --git a/spec/support/rspec_benchmark.rb b/spec/support/rspec_benchmark.rb index f4dbed2b6..61a0c7b6d 100644 --- a/spec/support/rspec_benchmark.rb +++ b/spec/support/rspec_benchmark.rb @@ -11,6 +11,6 @@ RSpec::Benchmark.configure do |config| config.samples = 10 end -rescue LoadError # rubocop:disable Lint/SuppressedException +rescue LoadError # Do nothing, we don't have test_prof end diff --git a/spec/support/test_prof.rb b/spec/support/test_prof.rb index 66eceb6f9..21800af93 100644 --- a/spec/support/test_prof.rb +++ b/spec/support/test_prof.rb @@ -13,6 +13,6 @@ # color output config.color = true end -rescue LoadError # rubocop:disable Lint/SuppressedException +rescue LoadError # Do nothing, we don't have test_prof end