Skip to content

Commit

Permalink
Don't allow quiet or stop on embedded processes (#5716)
Browse files Browse the repository at this point in the history
* add embedded to Process, don't allow quiet! or stop! show in the Web UI as embedded and don't show the buttons to quiet or stop

* add api method to find process by identity

* removing check around cleaning up open redis connections

* change Process.find to ProcessSet.[]
  • Loading branch information
mikebaldry authored Jan 6, 2023
1 parent 3c0204b commit 2e2eb5a
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 20 deletions.
33 changes: 30 additions & 3 deletions lib/sidekiq/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,24 @@ def retry_all
class ProcessSet
include Enumerable

def self.[](identity)
exists, (info, busy, beat, quiet, rss, rtt_us) = Sidekiq.redis { |conn|
conn.multi { |transaction|
transaction.sismember("processes", identity)
transaction.hmget(identity, "info", "busy", "beat", "quiet", "rss", "rtt_us")
}
}

return nil if exists == 0 || info.nil?

hash = Sidekiq.load_json(info)
Process.new(hash.merge("busy" => busy.to_i,
"beat" => beat.to_f,
"quiet" => quiet,
"rss" => rss.to_i,
"rtt_us" => rtt_us.to_i))
end

# :nodoc:
# @api private
def initialize(clean_plz = true)
Expand Down Expand Up @@ -876,18 +894,18 @@ def each
end
}

result.each do |info, busy, at_s, quiet, rss, rtt|
result.each do |info, busy, beat, quiet, rss, rtt_us|
# If a process is stopped between when we query Redis for `procs` and
# when we query for `result`, we will have an item in `result` that is
# composed of `nil` values.
next if info.nil?

hash = Sidekiq.load_json(info)
yield Process.new(hash.merge("busy" => busy.to_i,
"beat" => at_s.to_f,
"beat" => beat.to_f,
"quiet" => quiet,
"rss" => rss.to_i,
"rtt_us" => rtt.to_i))
"rtt_us" => rtt_us.to_i))
end
end

Expand Down Expand Up @@ -943,6 +961,7 @@ def leader
# 'busy' => 10,
# 'beat' => <last heartbeat>,
# 'identity' => <unique string identifying the process>,
# 'embedded' => true,
# }
class Process
# :nodoc:
Expand Down Expand Up @@ -979,11 +998,17 @@ def version
self["version"]
end

def embedded?
self["embedded"]
end

# Signal this process to stop processing new jobs.
# It will continue to execute jobs it has already fetched.
# This method is *asynchronous* and it can take 5-10
# seconds for the process to quiet.
def quiet!
raise "Can't quiet an embedded process" if embedded?

signal("TSTP")
end

Expand All @@ -992,6 +1017,8 @@ def quiet!
# This method is *asynchronous* and it can take 5-10
# seconds for the process to start shutting down.
def stop!
raise "Can't stop an embedded process" if embedded?

signal("TERM")
end

Expand Down
9 changes: 4 additions & 5 deletions lib/sidekiq/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def ❤
fails = procd = 0
kb = memory_usage(::Process.pid)

_, exists, _, _, msg = redis { |conn|
_, exists, _, _, signal = redis { |conn|
conn.multi { |transaction|
transaction.sadd("processes", [key])
transaction.exists(key)
Expand All @@ -180,9 +180,7 @@ def ❤
fire_event(:heartbeat) unless exists > 0
fire_event(:beat, oneshot: false)

return unless msg

::Process.kill(msg, ::Process.pid)
::Process.kill(signal, ::Process.pid) if signal && !@embedded
rescue => e
# ignore all redis/network issues
logger.error("heartbeat: #{e}")
Expand Down Expand Up @@ -254,7 +252,8 @@ def to_data
"weights" => to_weights,
"labels" => @config[:labels].to_a,
"identity" => identity,
"version" => Sidekiq::VERSION
"version" => Sidekiq::VERSION,
"embedded" => @embedded
}
end

Expand Down
9 changes: 6 additions & 3 deletions lib/sidekiq/web/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,14 @@ def self.set(key, val)

post "/busy" do
if params["identity"]
p = Sidekiq::Process.new("identity" => params["identity"])
p.quiet! if params["quiet"]
p.stop! if params["stop"]
pro = Sidekiq::ProcessSet[params["identity"]]

pro.quiet! if params["quiet"]
pro.stop! if params["stop"]
else
processes.each do |pro|
next if pro.embedded?

pro.quiet! if params["quiet"]
pro.stop! if params["stop"]
end
Expand Down
48 changes: 47 additions & 1 deletion test/api_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,8 @@ class JobWithTags
"started_at" => Time.now.to_f - 15,
"queues" => ["foo", "bar"],
"weights" => {"foo" => 1, "bar" => 1},
"version" => Sidekiq::VERSION
"version" => Sidekiq::VERSION,
"embedded" => false
}

time = Time.now.to_f
Expand All @@ -554,13 +555,58 @@ class JobWithTags
assert_equal ["foo", "bar"], data.queues
assert_equal({"foo" => 1, "bar" => 1}, data.weights)
assert_equal Sidekiq::VERSION, data.version
assert_equal false, data.embedded?
data.quiet!
data.stop!
signals_string = "#{odata["key"]}-signals"
assert_equal "TERM", @cfg.redis { |c| c.lpop(signals_string) }
assert_equal "TSTP", @cfg.redis { |c| c.lpop(signals_string) }
end

it 'can find processes' do
identity_string = "identity_string"
odata = {
"pid" => 123,
"hostname" => Socket.gethostname,
"key" => identity_string,
"identity" => identity_string,
"started_at" => Time.now.to_f - 15,
"queues" => ["foo", "bar"],
"weights" => {"foo" => 1, "bar" => 1},
"version" => Sidekiq::VERSION,
"embedded" => true
}

time = Time.now.to_f
@cfg.redis do |conn|
conn.multi do |transaction|
transaction.sadd("processes", [odata["key"]])
transaction.hmset(odata["key"], "info", Sidekiq.dump_json(odata), "busy", 10, "beat", time)
end
end

assert_nil Sidekiq::ProcessSet["nope"]

pro = Sidekiq::ProcessSet["identity_string"]
assert_equal 10, pro["busy"]
assert_equal time, pro["beat"]
assert_equal 123, pro["pid"]
assert_equal ["foo", "bar"], pro.queues
assert_equal({"foo" => 1, "bar" => 1}, pro.weights)
assert_equal Sidekiq::VERSION, pro.version
assert_equal true, pro.embedded?
end

it "can't quiet or stop embedded processes" do
p = Sidekiq::Process.new("embedded" => true)

e = assert_raises(RuntimeError) { p.quiet! }
assert_equal "Can't quiet an embedded process", e.message

e = assert_raises(RuntimeError) { p.stop! }
assert_equal "Can't stop an embedded process", e.message
end

it "can enumerate workers" do
w = Sidekiq::Workers.new
assert_equal 0, w.size
Expand Down
8 changes: 8 additions & 0 deletions test/web_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def job_params(job, score)
it "can quiet a process" do
identity = "identity"
signals_key = "#{identity}-signals"
@config.redis do |conn|
conn.sadd("processes", [identity])
conn.hmset(identity, "info", Sidekiq.dump_json("hostname" => "foo", "identity" => identity), "at", Time.now.to_f, "busy", 0)
end

assert_nil @config.redis { |c| c.lpop signals_key }
post "/busy", "quiet" => "1", "identity" => identity
Expand All @@ -96,6 +100,10 @@ def job_params(job, score)
it "can stop a process" do
identity = "identity"
signals_key = "#{identity}-signals"
@config.redis do |conn|
conn.sadd("processes", [identity])
conn.hmset(identity, "info", Sidekiq.dump_json("hostname" => "foo", "identity" => identity), "at", Time.now.to_f, "busy", 0)
end

assert_nil @config.redis { |c| c.lpop signals_key }
post "/busy", "stop" => "1", "identity" => identity
Expand Down
21 changes: 13 additions & 8 deletions web/views/busy.erb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
<% process.labels.each do |label| %>
<span class="label label-info"><%= label %></span>
<% end %>
<% if process.embedded? %>
<span class="label label-default">embedded</span>
<% end %>
<% if process.stopping? %>
<span class="label label-danger">quiet</span>
<% end %>
Expand Down Expand Up @@ -87,15 +90,17 @@
<td><%= process['concurrency'] %></td>
<td><%= process['busy'] %></td>
<td>
<form method="POST">
<%= csrf_tag %>
<input type="hidden" name="identity" value="<%= process['identity'] %>"/>
<% unless process.embedded? %>
<form method="POST">
<%= csrf_tag %>
<input type="hidden" name="identity" value="<%= process['identity'] %>"/>

<div class="btn-group pull-right flip">
<% unless process.stopping? %><button class="btn btn-xs btn-warn" type="submit" name="quiet" value="1"><%= t('Quiet') %></button><% end %>
<button class="btn btn-xs btn-danger" type="submit" name="stop" value="1"><%= t('Stop') %></button>
</div>
</form>
<div class="btn-group pull-right flip">
<% unless process.stopping? %><button class="btn btn-xs btn-warn" type="submit" name="quiet" value="1"><%= t('Quiet') %></button><% end %>
<button class="btn btn-xs btn-danger" type="submit" name="stop" value="1"><%= t('Stop') %></button>
</div>
</form>
<% end %>
</td>
</tr>
<% end %>
Expand Down

0 comments on commit 2e2eb5a

Please sign in to comment.