Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only register queue workers using drb for dequeue #19829

Merged
merged 2 commits into from
Feb 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion app/models/miq_queue_worker_base/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def sync_dequeue_method
end

def dequeue_method_via_drb?
@dequeue_method == :drb
@dequeue_method == :drb && drb_dequeue_available?
end

def thresholds_exceeded?
Expand Down Expand Up @@ -130,6 +130,8 @@ def deliver_message(msg)
end

def do_work
register_worker_with_worker_monitor if dequeue_method_via_drb?

# Keep collecting messages from the queue until the queue is empty,
# so we don't sleep in between messages
loop do
Expand All @@ -143,6 +145,21 @@ def do_work

private

def register_worker_with_worker_monitor
worker_monitor_drb.register_worker(@worker.pid, @worker.class.name, @worker.queue_name)
rescue DRb::DRbError => err
do_exit("Failed to register worker with worker monitor: #{err.class.name}: #{err.message}", 1)
end

def drb_dequeue_available?
@drb_dequeue_available ||=
begin
server.drb_uri.present? && worker_monitor_drb.respond_to?(:register_worker)
rescue DRb::DRbError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there something else in this block before? How would DRb::DRbError be raised? We're not doing anything with drb, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see, we used to call worker_monitor_drb.register_worker which could raise that exception...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so in this case I just need some call to the drb object to raise to tell if it's available or not. I used a respond_to? here because that proxies over to the server side without actually doing anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. I don't think the begin/rescue is needed here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(it's needed in the method above that calls register_worker on the drb object)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very much needed I think. respond_to? is proxied to the remote drb object so it will raise rather than return false if the drb server is inaccessible.

I would love a way to tell if the drb server is up that didn't rely on exception handling, do you know of something like that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrare do you know if there's anything like that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. I forgot the drb object was proxied and could raise here. Oh DRb.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I'm aware of, in miq_fault_tolerant_vim which wraps MiqVim to deal with DRb "issues" we just catch DRb::DRbConnError and retry if the broker is on a new port

false
end
end

# Only for file based heartbeating
def heartbeat_message_timeout(message)
if message.msg_timeout
Expand Down
11 changes: 11 additions & 0 deletions app/models/miq_server/worker_management/dequeue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ def get_worker_count_and_priority_by_queue_name
queue_names
end

def register_worker(worker_pid, worker_class, queue_name)
worker_class = worker_class.constantize if worker_class.kind_of?(String)

@workers_lock.synchronize(:EX) do
worker_add(worker_pid)
h = @workers[worker_pid]
h[:class] ||= worker_class
h[:queue_name] ||= queue_name
end unless @workers_lock.nil?
end

def populate_queue_messages
queue_names = get_worker_count_and_priority_by_queue_name
@queue_messages_lock.synchronize(:EX) do
Expand Down
11 changes: 0 additions & 11 deletions app/models/miq_server/worker_management/heartbeat.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
module MiqServer::WorkerManagement::Heartbeat
extend ActiveSupport::Concern

def register_worker(worker_pid, worker_class, queue_name)
worker_class = worker_class.constantize if worker_class.kind_of?(String)

@workers_lock.synchronize(:EX) do
worker_add(worker_pid)
h = @workers[worker_pid]
h[:class] ||= worker_class
h[:queue_name] ||= queue_name
end unless @workers_lock.nil?
end

def persist_last_heartbeat(w)
last_heartbeat = workers_last_heartbeat(w)

Expand Down
8 changes: 0 additions & 8 deletions app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,6 @@ def heartbeat
_log.info("#{log_prefix} Synchronizing configuration complete...")
end

register_worker_with_worker_monitor unless MiqEnvironment::Command.is_podified?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay, that is_podified? removal must feel good...


@last_hb = now
do_heartbeat_work
rescue SystemExit, SignalException
Expand All @@ -298,12 +296,6 @@ def heartbeat
do_exit("Error heartbeating because #{err.class.name}: #{err.message}\n#{err.backtrace.join('\n')}", 1)
end

def register_worker_with_worker_monitor
worker_monitor_drb.register_worker(@worker.pid, @worker.class.name, @worker.queue_name)
rescue DRb::DRbError => err
do_exit("Error processing messages from MiqServer because #{err.class.name}: #{err.message}", 1)
end

def heartbeat_to_file(timeout = nil)
# Disable heartbeat check. Useful if a worker is running in isolation
# without the oversight of MiqServer::WorkerManagement
Expand Down
44 changes: 44 additions & 0 deletions spec/models/miq_queue_worker_base/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,48 @@
expect(q1.reload.state).to eql(MiqQueue::STATUS_ERROR)
end
end

describe "#dequeue_method_via_drb?" do
let(:server) { EvmSpecHelper.local_miq_server }
let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server) }
let(:runner) do
described_class.allocate.tap do |r|
r.worker = worker
r.instance_variable_set(:@server, server)
end
end

it "returns false when the instance variable is not :drb" do
runner.instance_variable_set(:@dequeue_method, :sql)
expect(runner.dequeue_method_via_drb?).to be_falsey
end

context "when the instance variable is set to :drb" do
before { runner.instance_variable_set(:@dequeue_method, :drb) }

it "returns false if the server's drb uri is nil" do
server.update(:drb_uri => nil)
expect(runner.dequeue_method_via_drb?).to be_falsey
end

context "with a drb uri" do
let(:drb_object) { instance_double(DRbObject) }

before do
server.update(:drb_uri => "drbunix:///tmp/worker_monitor20200211-24337-1ma102h")
expect(runner).to receive(:worker_monitor_drb).and_return(drb_object)
end

it "returns false when the drb server is not accessible" do
expect(drb_object).to receive(:respond_to?).with(:register_worker).and_raise(DRb::DRbError)
expect(runner.dequeue_method_via_drb?).to be_falsey
end

it "returns true when the server is accessible" do
expect(drb_object).to receive(:respond_to?).with(:register_worker).and_return(true)
expect(runner.dequeue_method_via_drb?).to be_truthy
end
end
end
end
end