Skip to content

Commit

Permalink
Prevent reaping of active jobs (#493)
Browse files Browse the repository at this point in the history
* Begin prevention of reaping active jobs

* Prevent reaping of active jobs

Close #488

* Don't reek

* Mandatory rubocop commit
  • Loading branch information
mhenrixon authored Apr 8, 2020
1 parent 21aa2d2 commit 16764fe
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 6 deletions.
4 changes: 4 additions & 0 deletions .reek.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ detectors:
- SidekiqUniqueJobs::Logging#debug_item
- SidekiqUniqueJobs::NotUniqueWorker#initialize
- SidekiqUniqueJobs::OnConflict::Reject#push_to_deadset
- SidekiqUniqueJobs::Orphans::Reaper#active?
- SidekiqUniqueJobs::Orphans::Reaper#entries
- SidekiqUniqueJobs::SidekiqWorkerMethods#worker_class_constantize
- SidekiqUniqueJobs::Web::Helpers#cparams
Expand Down Expand Up @@ -90,6 +91,7 @@ detectors:
- SidekiqUniqueJobs::Locksmith#create_lock
- SidekiqUniqueJobs::Middleware#self.configure_client
- SidekiqUniqueJobs::Middleware#self.configure_server
- SidekiqUniqueJobs::Orphans::Reaper#active?
- SidekiqUniqueJobs::Orphans::Reaper#enqueued?
- SidekiqUniqueJobs::UpgradeLocks#keys_for_digest
RepeatedConditional:
Expand Down Expand Up @@ -124,6 +126,7 @@ detectors:
- SidekiqUniqueJobs::Middleware#call
- SidekiqUniqueJobs::Middleware#self.configure_server
- SidekiqUniqueJobs::Orphans::Manager#start
- SidekiqUniqueJobs::Orphans::Reaper#active?
- SidekiqUniqueJobs::Orphans::Reaper#enqueued?
- SidekiqUniqueJobs::Orphans::Reaper#entries
- SidekiqUniqueJobs::Profiler#self.stop
Expand All @@ -144,6 +147,7 @@ detectors:
- SidekiqUniqueJobs::Lock::BaseLock
- SidekiqUniqueJobs::Locksmith
- SidekiqUniqueJobs::Lock
- SidekiqUniqueJobs::Orphans::Reaper
UncommunicativeVariableName:
exclude:
- Hash#slice
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/lock_digest.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

require 'openssl'
require "openssl"

module SidekiqUniqueJobs
# Handles uniqueness of sidekiq arguments
Expand Down
11 changes: 11 additions & 0 deletions lib/sidekiq_unique_jobs/lua/reap_orphans.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ local redisversion = ARGV[6]
<%= include_partial "shared/_common.lua" %>
<%= include_partial "shared/_find_digest_in_queues.lua" %>
<%= include_partial "shared/_find_digest_in_sorted_set.lua" %>
<%= include_partial "shared/_find_digest_in_process_set.lua" %>
---------- END local functions ----------


Expand Down Expand Up @@ -61,6 +62,16 @@ repeat
end
end

-- TODO: Add check for jobs checked out by process
if found ~= true then
log_debug("Searching for digest:", digest, "in process sets")
local queue = find_digest_in_process_set(digest)
if queue then
log_debug("found digest:", digest, "in queue:", queue)
found = true
end
end

if found ~= true then
local queued = digest .. ":QUEUED"
local primed = digest .. ":PRIMED"
Expand Down
35 changes: 35 additions & 0 deletions lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_process_set.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
local function find_digest_in_process_set(digest)
local process_cursor = 0
local job_cursor = 0
local pattern = "*" .. digest .. "*"
local found = false

log_debug("searching in list processes:",
"for digest:", digest,
"cursor:", process_cursor)

repeat
local process_paginator = redis.call("SSCAN", "processes", process_cursor, "MATCH", "*")
local next_process_cursor = process_paginator[1]
local processes = process_paginator[2]
log_debug("Found number of processes:", #processes, "next cursor:", next_process_cursor)

for _, process in ipairs(processes) do
log_debug("searching in process set:", process,
"for digest:", digest,
"cursor:", process_cursor)

local job = redis.call("HGET", process, "info")

if string.find(job, digest) then
log_debug("Found digest", digest, "in process:", process)
found = true
break
end
end

process_cursor = next_process_cursor
until found == true or process_cursor == "0"

return found
end
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ local function find_digest_in_queues(digest)

repeat
log_debug("searching all queues for a matching digest:", digest)
local pagination = redis.call("SCAN", cursor, "MATCH", "*queue:*", "COUNT", count)
local pagination = redis.call("SCAN", cursor, "MATCH", "queue:*", "COUNT", count)
local next_cursor = pagination[1]
local queues = pagination[2]

Expand Down
19 changes: 18 additions & 1 deletion lib/sidekiq_unique_jobs/orphans/reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ module Orphans
#
# @author Mikael Henriksson <[email protected]>
#
# rubocop:disable Metrics/ClassLength
class Reaper
include SidekiqUniqueJobs::Connection
include SidekiqUniqueJobs::Script::Caller
include SidekiqUniqueJobs::Logging
include SidekiqUniqueJobs::JSON

#
# Execute deletion of orphaned digests
Expand Down Expand Up @@ -141,7 +143,7 @@ def orphans
# @return [false] when no job was found for this digest
#
def belongs_to_job?(digest)
scheduled?(digest) || retried?(digest) || enqueued?(digest)
scheduled?(digest) || retried?(digest) || enqueued?(digest) || active?(digest)
end

#
Expand Down Expand Up @@ -185,6 +187,20 @@ def enqueued?(digest)
end
end

def active?(digest)
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a.sort

result = conn.pipelined do
procs.map do |key|
conn.hget(key, "info")
end
end

result.flatten.compact.any? { |job| load_json(job)[LOCK_DIGEST] == digest }
end
end

#
# Loops through all the redis queues and yields them one by one
#
Expand Down Expand Up @@ -233,5 +249,6 @@ def in_sorted_set?(key, digest)
conn.zscan_each(key, match: "*#{digest}*", count: 1).to_a.any?
end
end
# rubocop:enable Metrics/ClassLength
end
end
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

begin
require "sidekiq/web"
rescue LoadError # rubocop:disable Lint/SuppressedException
rescue LoadError
# client-only usage
end

Expand Down
29 changes: 29 additions & 0 deletions spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,33 @@
end
end
end

context "when digest exists in a a process set" do
context "without job" do
it "keeps the digest" do
expect { reap_orphans }.to change { digests.count }.by(-1)
expect(unique_keys).to match_array([])
end
end

context "with job" do
let(:process_key) { "worker-id" }

before do
SidekiqUniqueJobs.redis do |conn|
conn.multi do
conn.sadd("processes", process_key)
conn.exists(process_key)
conn.hmset(process_key, "info", Sidekiq.dump_json(item), "busy", 1, "beat", Time.now.to_f, "quiet", false)
conn.expire(process_key, 60)
end
end
end

it "keeps the digest" do
expect { reap_orphans }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end
end
end
29 changes: 29 additions & 0 deletions spec/sidekiq_unique_jobs/orphans/reaper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,35 @@
end
end
end

context "when processing" do
context "without job in process" do
it "deletes the digest" do
expect { call }.to change { digests.count }.by(-1)
expect(unique_keys).to match_array([])
end
end

context "with job in process" do
let(:process_key) { "worker-id" }

before do
SidekiqUniqueJobs.redis do |conn|
conn.multi do
conn.sadd("processes", process_key)
conn.exists(process_key)
conn.hmset(process_key, "info", Sidekiq.dump_json(item), "busy", 1, "beat", Time.now.to_f, "quiet", false)
conn.expire(process_key, 60)
end
end
end

it "keeps the digest" do
expect { call }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end
end
end

context "when config.reaper = :ruby" do
Expand Down
2 changes: 1 addition & 1 deletion spec/support/rspec_benchmark.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
RSpec::Benchmark.configure do |config|
config.samples = 10
end
rescue LoadError # rubocop:disable Lint/SuppressedException
rescue LoadError
# Do nothing, we don't have test_prof
end
2 changes: 1 addition & 1 deletion spec/support/test_prof.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
# color output
config.color = true
end
rescue LoadError # rubocop:disable Lint/SuppressedException
rescue LoadError
# Do nothing, we don't have test_prof
end

0 comments on commit 16764fe

Please sign in to comment.