Skip to content

Commit

Permalink
Added feature to pause periodic jobs (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
dannnylo authored Oct 14, 2023
1 parent 1d8d30d commit 04c18d2
Showing 8 changed files with 255 additions and 25 deletions.
18 changes: 16 additions & 2 deletions .rubocop.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
inherit_from: .rubocop_todo.yml

require:
- rubocop-rake
- rubocop-rspec
@@ -225,3 +223,19 @@ RSpec/ExampleLength:

RSpec/NestedGroups:
Enabled: false

Metrics/AbcSize:
Enabled: false

Naming/MethodParameterName:
Enabled: false

Naming/FileName:
Exclude:
- 'lib/sidekiq-belt.rb'

RSpec/VerifiedDoubles:
Enabled: false

RSpec/MultipleMemoizedHelpers:
Enabled: false
15 changes: 0 additions & 15 deletions .rubocop_todo.yml

This file was deleted.

6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## [Unreleased]

## [0.2.0] - 2023-10-07

- Feature to Pause/Unpause Periodic Jobs

## [0.1.0] - 2023-10-07

- Run manualy Periodic Jobs
- Feature to run manualy Periodic Jobs
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
sidekiq-belt (0.1.0)
sidekiq-belt (0.2.0)
sidekiq (> 7.0)

GEM
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -47,7 +47,18 @@ Sidekiq::Belt.use!([:periodic_run])

### Pause Periodic Jobs (sidekiq-enterprise)

This feature is not yet implemented.
This option adds a button to pause and unpause the cron of a periodic job.
When a periodic job is paused, the perform is skiped and on server this content is logged.

```
2023-10-12T19:24:00.001Z pid=127183 tid=2ian INFO: Job SomeHourlyWorkerClass is paused by Periodic Pause
```

To enable this feature, pass the `periodic_pause` option:

```ruby
Sidekiq::Belt.use!([:periodic_pause])
```

### Delete an Unfinished Batch (sidekiq-pro)

77 changes: 73 additions & 4 deletions lib/sidekiq/belt/ent/periodic_pause.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,84 @@
# frozen_string_literal: true

require "sidekiq/web/helpers"

module Sidekiq
module Belt
module Ent
module PeriodicPause
def paused?
Sidekiq.redis { |r| r.hget("PeriodicPaused", @lid.to_s) }.to_s == "p"
end

def pause!
Sidekiq.redis { |r| r.hset("PeriodicPaused", @lid.to_s, "p") }
end

def unpause!
Sidekiq.redis { |r| r.hdel("PeriodicPaused", @lid.to_s) }
end

module SidekiqLoopsPeriodicPause
PAUSE_BUTTON = <<~ERB
<form action="<%= root_path %>loops/<%= loup.lid %>/pause" method="post">
<%= csrf_tag %>
<input class="btn btn-danger" type="submit" name="pause" value="<%= t('Pause') %>"
data-confirm="Pause the job <%= loup.klass %>? <%= t('AreYouSure') %>" />
</form>
ERB

UNPAUSE_BUTTON = <<~ERB
<form action="<%= root_path %>loops/<%= loup.lid %>/unpause" method="post">
<%= csrf_tag %>
<input class="btn btn-danger" type="submit" name="pause" value="<%= t('Unpause') %>"
data-confirm="Unpause the job <%= loup.klass %>? <%= t('AreYouSure') %>" />
</form>
ERB

def self.registered(app)
app.replace_content("/loops") do |content|
# Add the top of the table
content.gsub!("</th>\n </tr>", "</th><th><%= t('Pause/Unpause') %></th></th>\n </tr>")

# Add the run button
content.gsub!(
"</td>\n </tr>\n <% end %>",
"</td>\n<td>" \
"<% if (loup.paused?) %>#{UNPAUSE_BUTTON}<% else %>#{PAUSE_BUTTON}<% end %>" \
"</td>\n </tr>\n <% end %>"
)
end

app.post("/loops/:lid/pause") do
Sidekiq::Periodic::Loop.new(params[:lid]).pause!

return redirect "#{root_path}loops"
end

app.post("/loops/:lid/unpause") do
Sidekiq::Periodic::Loop.new(params[:lid]).unpause!

return redirect "#{root_path}loops"
end
end
end

module PauseServer
def enqueue_job(cycle, ts)
cycle.paused? ? logger.info("Job #{cycle.klass} is paused by Periodic Pause") : super
end
end

def self.use!
# require("sidekiq-ent/periodic")
# require("sidekiq-ent/periodic/static_loop")
require("sidekiq-ent/web")
require("sidekiq-ent/periodic")
require("sidekiq-ent/periodic/manager")
require("sidekiq-ent/periodic/static_loop")

# Sidekiq::Periodic::Loop.prepend(Sidekiq::Belt::Ent::PeriodicPause)
# Sidekiq::Periodic::StaticLoop.prepend(Sidekiq::Belt::Ent::PeriodicPause)
Sidekiq::Web.register(Sidekiq::Belt::Ent::PeriodicPause::SidekiqLoopsPeriodicPause)
Sidekiq::Periodic::Loop.prepend(Sidekiq::Belt::Ent::PeriodicPause)
Sidekiq::Periodic::StaticLoop.prepend(Sidekiq::Belt::Ent::PeriodicPause)
Sidekiq::Periodic::Manager.prepend(Sidekiq::Belt::Ent::PeriodicPause::PauseServer)
end
end
end
2 changes: 1 addition & 1 deletion lib/sidekiq/belt/version.rb
Original file line number Diff line number Diff line change
@@ -2,6 +2,6 @@

module Sidekiq
module Belt
VERSION = "0.1.0"
VERSION = "0.2.0"
end
end
147 changes: 147 additions & 0 deletions spec/sidekiq/belt/ent/periodic_pause_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# frozen_string_literal: true

require "sidekiq"
require "sidekiq/web"
require "byebug"

RSpec.describe(Sidekiq::Belt::Ent::PeriodicPause) do
let(:redis_mock) { double("redis") }
let(:dummy_job_class) do
Class.new do
include Sidekiq::Worker
end
end
let(:loop_class) do
Class.new do
attr_accessor :lid, :klass, :options

def initialize(lid, klass, options)
@lid = lid
@klass = klass
@options = options
end
end
end

before do
loop_class.prepend(described_class)
allow(Sidekiq).to receive(:redis).and_yield(redis_mock)
allow(dummy_job_class).to receive(:perform_async).and_return(true)
stub_const("DummyJob", dummy_job_class)
end

describe ".paused?" do
context "when job is paused" do
before do
allow(redis_mock).to receive(:hget).and_return("p")
end

it "return true to the job" do
expect(loop_class.new("abc", "DummyJob", {}).paused?).to be(true)
end
end

context "when job is not paused" do
before do
allow(redis_mock).to receive(:hget).and_return(nil)
end

it "return true to the job" do
expect(loop_class.new("abc", "DummyJob", {}).paused?).to be(false)
end
end
end

describe ".pause!" do
before do
allow(redis_mock).to receive(:hset).and_return(true)
end

it "saves the job as paused" do
loop_class.new("abc", "DummyJob", {}).pause!

expect(redis_mock).to have_received(:hset).with("PeriodicPaused", "abc", "p")
end
end

describe ".unpause!" do
before do
allow(redis_mock).to receive(:hdel).and_return(true)
end

it "removes the job as unpaused" do
loop_class.new("abc", "DummyJob", {}).unpause!

expect(redis_mock).to have_received(:hdel).with("PeriodicPaused", "abc")
end
end

describe ".use!" do
before do
stub_const("Sidekiq::Periodic", Module.new)
stub_const("Sidekiq::Periodic::StaticLoop", Class.new)
stub_const("Sidekiq::Periodic::Loop", Class.new)
stub_const("Sidekiq::Periodic::Manager", Class.new)

allow(described_class).to receive(:require).and_return(true)
allow(Sidekiq::Web).to receive(:register)
allow(Sidekiq::Periodic::Loop).to receive(:prepend)
allow(Sidekiq::Periodic::StaticLoop).to receive(:prepend)
allow(Sidekiq::Periodic::Manager).to receive(:prepend)
end

it "injects the code" do
described_class.use!

expect(described_class).to have_received(:require).with("sidekiq-ent/web").once
expect(described_class).to have_received(:require).with("sidekiq-ent/periodic").once
expect(described_class).to have_received(:require).with("sidekiq-ent/periodic/manager").once
expect(described_class).to have_received(:require).with("sidekiq-ent/periodic/static_loop").once

expect(Sidekiq::Web).to have_received(:register).with(described_class::SidekiqLoopsPeriodicPause)
expect(Sidekiq::Periodic::Loop).to have_received(:prepend).with(described_class)
expect(Sidekiq::Periodic::StaticLoop).to have_received(:prepend).with(described_class)
expect(Sidekiq::Periodic::Manager).to have_received(:prepend).with(described_class::PauseServer)
end
end

describe "PauseServer.enqueue_job" do
let(:logger) { Logger.new($stdout) }
let(:instance) { dummy_pause_server.new }
let(:cycle) { loop_class.new("abc", "DummyJob", {}) }
let(:dummy_pause_server) do
Class.new do
def enqueue_job(cycle, ts)
[cycle, ts]
end
end
end

before do
allow(logger).to receive(:info).and_return(true)
allow(instance).to receive(:logger).and_return(logger)
dummy_pause_server.prepend(described_class::PauseServer)
end

context "when job is paused" do
before do
allow(redis_mock).to receive(:hget).and_return("p")
end

it "does not run the job" do
expect(cycle.paused?).to be(true)
expect(instance.enqueue_job(cycle, 2)).to be(true)

expect(logger).to have_received(:info).with("Job DummyJob is paused by Periodic Pause")
end
end

context "when job is not paused" do
it "runs the job" do
allow(redis_mock).to receive(:hget).and_return(nil)

expect(instance.enqueue_job(cycle, 2)).to eq([cycle, 2])
end
end
end
end

0 comments on commit 04c18d2

Please sign in to comment.