Skip to content

Commit

Permalink
Allow some threshold before being able to reap
Browse files Browse the repository at this point in the history
Close #559

This takes care of an edge case where jobs were just marked as active.
  • Loading branch information
mhenrixon committed Jan 20, 2021
1 parent ae42983 commit 0f98129
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 31 deletions.
15 changes: 8 additions & 7 deletions lib/sidekiq_unique_jobs/lua/reap_orphans.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ local retry_set = KEYS[3]
-------- END keys ---------

-------- BEGIN argv ---------
local reaper_count = tonumber(ARGV[1])
local reaper_count = tonumber(ARGV[1])
local threshold = tonumber(ARGV[2])
-------- END argv ---------

-------- BEGIN injected arguments --------
local current_time = tonumber(ARGV[2])
local debug_lua = ARGV[3] == "true"
local max_history = tonumber(ARGV[4])
local script_name = ARGV[5] .. ".lua"
local redisversion = ARGV[6]
local current_time = tonumber(ARGV[3])
local debug_lua = ARGV[4] == "true"
local max_history = tonumber(ARGV[5])
local script_name = ARGV[6] .. ".lua"
local redisversion = ARGV[7]
--------- END injected arguments ---------


Expand Down Expand Up @@ -65,7 +66,7 @@ repeat
-- TODO: Add check for jobs checked out by process
if found ~= true then
log_debug("Searching for digest:", digest, "in process sets")
found = find_digest_in_process_set(digest)
found = find_digest_in_process_set(digest, threshold)
end

if found ~= true then
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
local function find_digest_in_process_set(digest)
local function find_digest_in_process_set(digest, threshold)
local process_cursor = 0
local job_cursor = 0
local pattern = "*" .. digest .. "*"
Expand Down Expand Up @@ -26,11 +26,18 @@ local function find_digest_in_process_set(digest)
log_debug("No entries in:", workers_key)
else
for i = 1, #jobs, 2 do
if string.find(jobs[i +1], digest) then
local jobstr = jobs[i +1]
if string.find(jobstr, digest) then
log_debug("Found digest", digest, "in:", workers_key)
found = true
break
end

local job = cjson.decode(jobstr)
if job.created_at > threshold then
found = true
break
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/orphans/lua_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def call
:reap_orphans,
conn,
keys: [DIGESTS, SCHEDULE, RETRY, PROCESSES],
argv: [reaper_count],
argv: [reaper_count, (Time.now - reaper_timeout).to_f],
)
end
end
Expand Down
10 changes: 10 additions & 0 deletions lib/sidekiq_unique_jobs/orphans/reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ def reaper
config.reaper
end

#
# The configured timeout for the reaper
#
#
# @return [Integer] timeout in seconds
#
def reaper_timeout
config.reaper_timeout
end

#
# The number of locks to reap at a time
#
Expand Down
11 changes: 9 additions & 2 deletions lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def enqueued?(digest)
end
end

def active?(digest) # rubocop:disable Metrics/MethodLength
def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a
return false if procs.empty?
Expand All @@ -132,14 +132,21 @@ def active?(digest) # rubocop:disable Metrics/MethodLength
next unless workers.any?

workers.each_pair do |_tid, job|
return true if load_json(job).dig(PAYLOAD, LOCK_DIGEST) == digest
item = load_json(job)

return true if item.dig(PAYLOAD, LOCK_DIGEST) == digest
return true if considered_active?(item[CREATED_AT])
end
end

false
end
end

def considered_active?(time_f)
(Time.now - reaper_timeout).to_f < time_f
end

#
# Loops through all the redis queues and yields them one by one
#
Expand Down
8 changes: 7 additions & 1 deletion spec/sidekiq_unique_jobs/lua/delete_job_by_digest_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
]
end

context "when job doesn't exist" do
let(:argv) { ["abcdefab"] }

it { is_expected.to eq(nil) }
end

context "when job is retried" do
let(:job_id) { "abcdefab" }
let(:job) { dump_json(item) }
let(:job) { dump_json(item) }
let(:item) do
{
"class" => "MyUniqueJob",
Expand Down
22 changes: 16 additions & 6 deletions spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,22 @@
SidekiqUniqueJobs::RETRY,
]
end
let(:argv) { [100] }
let(:digest) { "uniquejobs:digest" }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) }
let(:job_id) { "job_id" }
let(:item) { raw_item }
let(:raw_item) { { "class" => MyUniqueJob, "args" => [1, 2], "jid" => job_id, "lock_digest" => digest } }
let(:argv) { [100, threshold] }
let(:digest) { "uniquejobs:digest" }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) }
let(:job_id) { "job_id" }
let(:item) { raw_item }
let(:created_at) { (Time.now - 1000).to_f }
let(:threshold) { [Time.now - SidekiqUniqueJobs.config.reaper_timeout] }
let(:raw_item) do
{
"class" => MyUniqueJob,
"args" => [1, 2],
"jid" => job_id,
"lock_digest" => digest,
"created_at" => created_at,
}
end
let(:lock_info) do
{
"job_id" => job_id,
Expand Down
71 changes: 59 additions & 12 deletions spec/sidekiq_unique_jobs/orphans/reaper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,34 @@
end

describe ".call" do
subject(:call) { described_class.call }
subject(:call) { described_class.call(conn) }

let(:conn) { nil }

around do |example|
SidekiqUniqueJobs.use_config(reaper: reaper) do
example.run
end
end

context "when given a connection" do
let(:conn) { instance_spy(ConnectionPool) }
let(:reaper) { :ruby }
let(:reaper_spy) { instance_spy("SidekiqUniqueJobs::Orphans::Reaper") }

before do
allow(reaper_spy).to receive(:call)
allow(described_class).to receive(:new).and_return(reaper_spy)
end

it "calls the reaper with the given connection" do
call

expect(reaper_spy).to have_received(:call)
expect(described_class).to have_received(:new).with(conn)
end
end

shared_examples "deletes orphans" do
context "when scheduled" do
let(:item) { raw_item.merge("at" => Time.now.to_f + 3_600) }
Expand Down Expand Up @@ -109,36 +129,63 @@
end

context "with job in process" do
let(:process_key) { "process-id" }
let(:thread_id) { "thread-id" }
let(:worker_key) { "#{process_key}:workers" }
let(:process_key) { "process-id" }
let(:thread_id) { "thread-id" }
let(:worker_key) { "#{process_key}:workers" }
let(:created_at) { (Time.now - reaper_timeout).to_f }
let(:reaper_timeout) { SidekiqUniqueJobs.config.reaper_timeout }

before do
SidekiqUniqueJobs.redis do |conn|
conn.multi do
conn.sadd("processes", process_key)
conn.set(process_key, "bogus")
conn.hset(worker_key, thread_id, dump_json(payload: item))
conn.hset(worker_key, thread_id, dump_json(created_at: created_at, payload: item))
conn.expire(process_key, 60)
conn.expire(worker_key, 60)
end
end
end

# TODO: Adjust this spec for earlier sidekiq versions
context "that matches current digest", sidekiq_ver: ">= 5.0" do
it "keeps the digest" do
expect { call }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
context "and created_at is old" do # rubocop:disable RSpec/NestedGroups
let(:created_at) { (Time.now - (reaper_timeout + 100)).to_f }

it "keeps the digest" do
expect { call }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end

context "and created_at is recent" do # rubocop:disable RSpec/NestedGroups
let(:created_at) { Time.now.to_f }

it "keeps the digest" do
expect { call }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end
end

context "that does not match current digest" do
let(:item) { { "class" => MyUniqueJob, "args" => [], "jid" => job_id, "lock_digest" => "uniquejobs:d2" } }

it "deletes the digest" do
expect { call }.to change { digests.count }.by(-1)
expect(unique_keys).to match_array([])
context "and created_at is old" do # rubocop:disable RSpec/NestedGroups
let(:created_at) { (Time.now - (reaper_timeout + 100)).to_f }

it "deletes the digest" do
expect { call }.to change { digests.count }.by(-1)
expect(unique_keys).to match_array([])
end
end

context "and created_at is recent" do # rubocop:disable RSpec/NestedGroups
let(:created_at) { Time.now.to_f }

it "keeps the digest" do
expect { call }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end
end
end
Expand Down

0 comments on commit 0f98129

Please sign in to comment.