Skip to content

Commit

Permalink
Expire reaper when not checking in (#508)
Browse files Browse the repository at this point in the history
Close #490
  • Loading branch information
mhenrixon authored May 21, 2020
1 parent 40f59a5 commit 9fb005b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 10 deletions.
27 changes: 25 additions & 2 deletions lib/sidekiq_unique_jobs/orphans/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module Orphans
module Manager
module_function

DRIFT_FACTOR = 0.02

include SidekiqUniqueJobs::Connection
include SidekiqUniqueJobs::Logging

Expand Down Expand Up @@ -56,6 +58,7 @@ def task
@task ||= Concurrent::TimerTask.new(timer_task_options) do
with_logging_context do
redis do |conn|
refresh_reaper_mutex
Orphans::Reaper.call(conn)
end
end
Expand Down Expand Up @@ -117,7 +120,9 @@ def logging_context
# @return [true, false]
#
def registered?
redis { |conn| conn.get(UNIQUE_REAPER) }.to_i == 1
redis do |conn|
conn.get(UNIQUE_REAPER).to_i + drift_reaper_interval > current_timestamp
end
end

def disabled?
Expand All @@ -131,7 +136,17 @@ def disabled?
# @return [void]
#
def register_reaper_process
redis { |conn| conn.set(UNIQUE_REAPER, 1) }
redis { |conn| conn.set(UNIQUE_REAPER, current_timestamp, nx: true, ex: drift_reaper_interval) }
end

#
# Updates mutex key
#
#
# @return [void]
#
def refresh_reaper_mutex
redis { |conn| conn.set(UNIQUE_REAPER, current_timestamp, ex: drift_reaper_interval) }
end

#
Expand All @@ -143,6 +158,14 @@ def register_reaper_process
def unregister_reaper_process
redis { |conn| conn.del(UNIQUE_REAPER) }
end

def drift_reaper_interval
reaper_interval + (reaper_interval * DRIFT_FACTOR).to_i
end

def current_timestamp
Time.now.to_i
end
end
end
end
27 changes: 25 additions & 2 deletions spec/sidekiq_unique_jobs/orphans/manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
describe ".start" do
subject(:start) { described_class.start }

let(:frozen_time) { Time.new(1982, 6, 8, 14, 15, 34) }

around do |example|
Timecop.freeze(frozen_time, &example)
end

before do
allow(SidekiqUniqueJobs::Orphans::Observer).to receive(:new).and_return(observer)

Expand All @@ -18,7 +24,7 @@
end

context "when registered?" do
before { redis { |conn| conn.set(SidekiqUniqueJobs::UNIQUE_REAPER, 1) } }
before { described_class.register_reaper_process }

it { is_expected.to eq(nil) }
end
Expand All @@ -29,7 +35,7 @@
it "sets a mutex" do
start

expect(get(SidekiqUniqueJobs::UNIQUE_REAPER)).to eq("1")
expect(get(SidekiqUniqueJobs::UNIQUE_REAPER)).to eq(frozen_time.to_i.to_s)
end

it "logs a start message" do
Expand Down Expand Up @@ -111,6 +117,23 @@
it { is_expected.to eq(SidekiqUniqueJobs.config.reaper_timeout) }
end

describe ".register_reaper_process" do
subject(:register_reaper_process) { described_class.register_reaper_process }

let(:frozen_time) { Time.new(1982, 6, 8, 14, 15, 34) }

around do |example|
Timecop.freeze(frozen_time, &example)
end

it "writes a redis key with timestamp" do
expect { register_reaper_process }.to change { get(SidekiqUniqueJobs::UNIQUE_REAPER) }
.from(nil).to(frozen_time.to_i.to_s)

expect(ttl(SidekiqUniqueJobs::UNIQUE_REAPER)).to be_within(20).of(SidekiqUniqueJobs.config.reaper_interval)
end
end

describe ".logging_context" do
subject(:logging_context) { described_class.logging_context }

Expand Down
12 changes: 6 additions & 6 deletions spec/sidekiq_unique_jobs/web/helpers_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
let(:time) { Time.now.to_f }
let(:stamp) { Time.now.getutc.iso8601 }

before { Timecop.freeze(frozen_time) }

after { Timecop.return }
around do |example|
Timecop.freeze(frozen_time, &example)
end

it "returns relative time html" do
expect(safe_relative_time).to eq(<<~HTML.chop)
Expand All @@ -26,9 +26,9 @@

let(:frozen_time) { Time.new(1982, 6, 8, 14, 15, 34) }

before { Timecop.freeze(frozen_time) }

after { Timecop.return }
around do |example|
Timecop.freeze(frozen_time, &example)
end

context "when time is an Integer" do
let(:time) { Time.now.to_i }
Expand Down

0 comments on commit 9fb005b

Please sign in to comment.