diff --git a/.reek.yml b/.reek.yml index 6bbe867db..0e1cc3340 100644 --- a/.reek.yml +++ b/.reek.yml @@ -57,6 +57,7 @@ detectors: - SidekiqUniqueJobs::Redis::Entity#exist? - SidekiqUniqueJobs::SidekiqWorkerMethods#worker_class_constantize - SidekiqUniqueJobs::Web::Helpers#cparams + - SidekiqUniqueJobs::Web::Helpers#display_lock_args IrresponsibleModule: enabled: true LongParameterList: @@ -75,6 +76,7 @@ detectors: - Hash#slice - Hash#slice! - SidekiqUniqueJobs::Logging#logger_respond_to_with_context? + - SidekiqUniqueJobs::Server#self.configure - SidekiqUniqueJobs::OnConflict::Reject#deadset_kill? - SidekiqUniqueJobs::SidekiqWorkerMethods#worker_method_defined? - SidekiqUniqueJobs::Web::Helpers#redirect_to @@ -127,7 +129,7 @@ detectors: - SidekiqUniqueJobs::LockTTL#calculate - SidekiqUniqueJobs::Logging::Middleware#logging_context - SidekiqUniqueJobs::Middleware#call - - SidekiqUniqueJobs::Middleware#self.configure_server + - SidekiqUniqueJobs::Server#self.configure - SidekiqUniqueJobs::Orphans::Manager#start - SidekiqUniqueJobs::Orphans::Manager#stop - SidekiqUniqueJobs::Orphans::RubyReaper#active? diff --git a/README.md b/README.md index 7add8eea1..0b7640d26 100644 --- a/README.md +++ b/README.md @@ -98,21 +98,23 @@ bundle Before v7, the middleware was configured automatically. Since some people reported issues with other gems (see [Other Sidekiq Gems](#other-sidekiq-gems)) it was decided to give full control over to the user. +*NOTE* if you want to use the reaper you also need to configure the server middleware. + [A full and hopefully working example](https://github.com/mhenrixon/sidekiq-unique-jobs/blob/master/myapp/config/sidekiq.rb#L12) ```ruby Sidekiq.configure_server do |config| config.redis = { url: ENV["REDIS_URL"], driver: :hiredis } + config.client_middleware do |chain| + chain.add SidekiqUniqueJobs::Middleware::Client + end + config.server_middleware do |chain| chain.add SidekiqUniqueJobs::Middleware::Server end - config.error_handlers << ->(ex, ctx_hash) { p ex, ctx_hash } - config.death_handlers << lambda do |job, _ex| - digest = job["lock_digest"] - SidekiqUniqueJobs::Digests.new.delete_by_digest(digest) if digest - end + SidekiqUniqueJobs::Server.configure(config) end Sidekiq.configure_client do |config| diff --git a/lib/sidekiq_unique_jobs.rb b/lib/sidekiq_unique_jobs.rb index ad4f731f5..9d1b0a6a6 100644 --- a/lib/sidekiq_unique_jobs.rb +++ b/lib/sidekiq_unique_jobs.rb @@ -71,3 +71,4 @@ require "sidekiq_unique_jobs/sidekiq_unique_jobs" require "sidekiq_unique_jobs/update_version" require "sidekiq_unique_jobs/upgrade_locks" +require "sidekiq_unique_jobs/server" diff --git a/lib/sidekiq_unique_jobs/orphans/manager.rb b/lib/sidekiq_unique_jobs/orphans/manager.rb index 51a38ff4b..d98424f91 100644 --- a/lib/sidekiq_unique_jobs/orphans/manager.rb +++ b/lib/sidekiq_unique_jobs/orphans/manager.rb @@ -59,12 +59,7 @@ def stop # @return [] # def task - @task ||= Concurrent::TimerTask.new(timer_task_options, &task_body) - end - - # @private - def task_body - @task_body ||= lambda do + @task ||= Concurrent::TimerTask.new(timer_task_options) do with_logging_context do redis do |conn| refresh_reaper_mutex diff --git a/lib/sidekiq_unique_jobs/server.rb b/lib/sidekiq_unique_jobs/server.rb new file mode 100644 index 000000000..287f3e553 --- /dev/null +++ b/lib/sidekiq_unique_jobs/server.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +module SidekiqUniqueJobs + # The unique sidekiq middleware for the server processor + # + # @author Mikael Henriksson + class Server + DEATH_HANDLER ||= (lambda do |job, _ex| + return unless (digest = job["lock_digest"]) + + SidekiqUniqueJobs::Digests.new.delete_by_digest(digest) + end).freeze + # + # Configure the server middleware + # + # + # @return [Sidekiq] the sidekiq configuration + # + def self.configure(config) + config.on(:startup) { start } + config.on(:shutdown) { stop } + + return unless config.respond_to?(:death_handlers) + + config.death_handlers << death_handler + end + + def self.start + SidekiqUniqueJobs::UpdateVersion.call + SidekiqUniqueJobs::UpgradeLocks.call + SidekiqUniqueJobs::Orphans::Manager.start + end + + def self.stop + SidekiqUniqueJobs::Orphans::Manager.stop + end + + # + # A death handler for dead jobs + # + # + # @return [lambda] + # + def self.death_handler + DEATH_HANDLER + end + end +end diff --git a/lib/sidekiq_unique_jobs/web/helpers.rb b/lib/sidekiq_unique_jobs/web/helpers.rb index ffbcfc579..7f8c81b68 100644 --- a/lib/sidekiq_unique_jobs/web/helpers.rb +++ b/lib/sidekiq_unique_jobs/web/helpers.rb @@ -15,7 +15,7 @@ module Helpers # @return [Array] safe params SAFE_CPARAMS = %w[cursor prev_cursor].freeze - module_function + extend self # # Opens a template file contained within this gem diff --git a/lib/sidekiq_unique_jobs/web/views/changelogs.erb b/lib/sidekiq_unique_jobs/web/views/changelogs.erb index 2e296b7c1..28677e231 100644 --- a/lib/sidekiq_unique_jobs/web/views/changelogs.erb +++ b/lib/sidekiq_unique_jobs/web/views/changelogs.erb @@ -40,6 +40,7 @@ <%= safe_relative_time(changelog["time"]) %> <%= changelog["digest"] %> <%= changelog["script"] %> + <%= changelog["job_id"] %> <%= changelog["prev_jid"] %> <%= changelog["message"] %> diff --git a/myapp/config/initializers/sidekiq.rb b/myapp/config/initializers/sidekiq.rb index 2ccb2d5b7..caaf46f6f 100644 --- a/myapp/config/initializers/sidekiq.rb +++ b/myapp/config/initializers/sidekiq.rb @@ -37,14 +37,9 @@ chain.add SidekiqUniqueJobs::Middleware::Client end + SidekiqUniqueJobs::Server.configure(config) + config.error_handlers << ->(ex, ctx_hash) { p ex, ctx_hash } - config.death_handlers << lambda do |job, ex| - digest = job["lock_digest"] - p ex - p digest - p job - SidekiqUniqueJobs::Digests.new.delete_by_digest(digest) if digest - end end Sidekiq.logger = Sidekiq::Logger.new($stdout) @@ -52,12 +47,11 @@ Sidekiq.log_format = :json if Sidekiq.respond_to?(:log_format) SidekiqUniqueJobs.configure do |config| config.debug_lua = true - config.enabled = true config.lock_info = true config.logger = Sidekiq.logger config.max_history = 10_000 config.reaper = :lua - config.reaper_count = 1_000 + config.reaper_count = 100 config.reaper_interval = 10 config.reaper_timeout = 5 end diff --git a/spec/sidekiq_unique_jobs/orphans/manager_spec.rb b/spec/sidekiq_unique_jobs/orphans/manager_spec.rb index 2265ea0f5..12cdd92bb 100644 --- a/spec/sidekiq_unique_jobs/orphans/manager_spec.rb +++ b/spec/sidekiq_unique_jobs/orphans/manager_spec.rb @@ -302,31 +302,16 @@ before do allow(Concurrent::TimerTask).to receive(:new).and_call_original - end - - it "initializes a new timer task with the correct arguments" do - expect(task).to be_a(Concurrent::TimerTask) - - expect(Concurrent::TimerTask).to have_received(:new) - .with(described_class.timer_task_options, &described_class.task_body) - end - end - - describe "#task_body" do - subject(:task_body) { described_class.task_body } - - before do allow(described_class).to receive(:with_logging_context).and_yield allow(described_class).to receive(:refresh_reaper_mutex).and_return(true) allow(SidekiqUniqueJobs::Orphans::Reaper).to receive(:call).and_return(true) end - it "is wired up correctly" do - task_body.call + it "initializes a new timer task with the correct arguments" do + expect(task).to be_a(Concurrent::TimerTask) - expect(described_class).to have_received(:with_logging_context) - expect(described_class).to have_received(:refresh_reaper_mutex) - expect(SidekiqUniqueJobs::Orphans::Reaper).to have_received(:call) + expect(Concurrent::TimerTask).to have_received(:new) + .with(described_class.timer_task_options) end end diff --git a/spec/sidekiq_unique_jobs/server_spec.rb b/spec/sidekiq_unique_jobs/server_spec.rb new file mode 100644 index 000000000..dc4dc4c0c --- /dev/null +++ b/spec/sidekiq_unique_jobs/server_spec.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +RSpec.describe SidekiqUniqueJobs::Server do + describe ".configure" do + subject(:configure) { described_class.configure(config) } + + let(:config) { Sidekiq } + + before do + allow(config).to receive(:on).with(:startup).and_call_original + allow(config).to receive(:on).with(:shutdown).and_call_original + allow(config.death_handlers).to receive(:<<).and_call_original if Sidekiq.respond_to?(:death_handlers) + end + + it "configures startup" do + configure + + expect(config).to have_received(:on).with(:startup) + expect(config).to have_received(:on).with(:shutdown) + + if Sidekiq.respond_to?(:death_handlers) + expect(config.death_handlers).to have_received(:<<) + .with(described_class.death_handler) + end + end + end + + describe ".start" do + subject(:start) { described_class.start } + + before do + allow(SidekiqUniqueJobs::UpdateVersion).to receive(:call).and_return(true) + allow(SidekiqUniqueJobs::UpgradeLocks).to receive(:call).and_return(true) + allow(SidekiqUniqueJobs::Orphans::Manager).to receive(:start).and_return(true) + end + + it "starts processes in the background" do + start + + expect(SidekiqUniqueJobs::UpdateVersion).to have_received(:call) + expect(SidekiqUniqueJobs::UpgradeLocks).to have_received(:call) + expect(SidekiqUniqueJobs::Orphans::Manager).to have_received(:start) + end + end + + describe ".stop" do + subject(:stop) { described_class.stop } + + before do + allow(SidekiqUniqueJobs::Orphans::Manager).to receive(:stop).and_return(true) + end + + it "starts processes in the background" do + stop + + expect(SidekiqUniqueJobs::Orphans::Manager).to have_received(:stop) + end + end + + describe ".death_handler" do + subject(:death_handler) { described_class.death_handler } + + let(:item) { { "lock_digest" => digest } } + let(:digest) { "uniquejobs:abcdefab" } + let(:digests) { SidekiqUniqueJobs::Digests.new } + + before do + allow(digests).to receive(:delete_by_digest).and_return(true) + allow(SidekiqUniqueJobs::Digests).to receive(:new).and_return(digests) + end + + it "deletes digests for dead jobs" do + death_handler.call(item, nil) + + expect(digests).to have_received(:delete_by_digest).with(digest) + end + end +end diff --git a/spec/sidekiq_unique_jobs/web/helpers_spec.rb b/spec/sidekiq_unique_jobs/web/helpers_spec.rb index 8b1c5319b..84bc17429 100644 --- a/spec/sidekiq_unique_jobs/web/helpers_spec.rb +++ b/spec/sidekiq_unique_jobs/web/helpers_spec.rb @@ -3,8 +3,24 @@ require "sidekiq_unique_jobs/web/helpers" RSpec.describe SidekiqUniqueJobs::Web::Helpers do + before do + stub_const( + "SidekiqUniqueJobs::WebHelpers", + Class.new do + include Sidekiq::WebHelpers + include SidekiqUniqueJobs::Web::Helpers + + def params + {} + end + end, + ) + end + + let(:helper) { SidekiqUniqueJobs::WebHelpers.new } + describe "#safe_relative_time" do - subject(:safe_relative_time) { described_class.safe_relative_time(time) } + subject(:safe_relative_time) { helper.safe_relative_time(time) } let(:frozen_time) { Time.new(1982, 6, 8, 14, 15, 34) } let(:time) { Time.now.to_f } @@ -21,8 +37,47 @@ end end + describe "#cparams" do + subject(:cparams) { helper.cparams(options) } + + before do + allow(helper).to receive(:params).and_return({}) + end + + let(:options) do + { + "cursor" => "0", + "prev_cursor" => "1", + "bogus" => "hokuspokus", + } + end + + it { is_expected.to eq("cursor=0&prev_cursor=1") } + end + + describe "#display_lock_args" do + subject(:display_lock_args) { helper.display_lock_args(args, num) } + + let(:args) { ["abc", 1, "cde"] } + let(:num) { 2000 } + + it { is_expected.to eq(""abc", 1, "cde"") } + + context "when args is nil" do + let(:args) { nil } + + it { is_expected.to eq("Invalid job payload, args is nil") } + end + + context "when args is not an array" do + let(:args) { 123 } + + it { is_expected.to eq("Invalid job payload, args must be an Array, not #{args.class.name}") } + end + end + describe "#unique_filename" do - subject(:unique_filename) { described_class.unique_filename(name) } + subject(:unique_filename) { helper.unique_filename(name) } context "when name is changelogs" do let(:name) { :changelogs } @@ -50,7 +105,7 @@ end describe "#parse_time" do - subject(:parse_time) { described_class.parse_time(time) } + subject(:parse_time) { helper.parse_time(time) } let(:frozen_time) { Time.new(1982, 6, 8, 14, 15, 34) }