Skip to content

Commit

Permalink
Add kafka as an option for prototype.queue_type
Browse files Browse the repository at this point in the history
Currently we have two queue_types, miq_queue and artemis with the later
allowing the use of ManageIQ::Messaging client.

ManageIQ::Messaging also has support for kafka which we can also allow
to be used here.
  • Loading branch information
agrare committed Mar 20, 2020
1 parent e8dc8d2 commit 54a7579
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 28 deletions.
16 changes: 8 additions & 8 deletions app/models/ems_event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ def self.bottleneck_event_groups
end

def self.add_queue(meth, ems_id, event)
if Settings.prototype.queue_type == 'artemis'
MiqQueue.artemis_client('event_handler').publish_topic(
:service => "events",
:sender => ems_id,
:event => event[:event_type],
:payload => event
)
else
if MiqQueue.queue_type == "miq_queue"
MiqQueue.submit_job(
:service => "event",
:target_id => ems_id,
:class_name => "EmsEvent",
:method_name => meth,
:args => [event],
)
else
MiqQueue.queue_client('event_handler').publish_topic(
:service => "events",
:sender => ems_id,
:event => event[:event_type],
:payload => event
)
end
end

Expand Down
14 changes: 7 additions & 7 deletions app/models/miq_event_handler/runner.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
class MiqEventHandler::Runner < MiqQueueWorkerBase::Runner
def artemis?
Settings.prototype.queue_type == 'artemis'
def miq_queue?
MiqQueue.queue_type == "miq_queue"
end

def do_before_work_loop
if artemis?
unless miq_queue?
topic_options = {
:service => "events",
:persist_ref => "event_handler"
}

# this block is stored in a lambda callback and is executed in another thread once a msg is received
MiqQueue.artemis_client('event_handler').subscribe_topic(topic_options) do |sender, event, payload|
MiqQueue.queue_client('event_handler').subscribe_topic(topic_options) do |sender, event, payload|
_log.info "Received Event (#{event}) by sender #{sender}: #{payload[:event_type]} #{payload[:chain_id]}"
EmsEvent.add(sender.to_i, payload)
end
Expand All @@ -21,16 +21,16 @@ def do_before_work_loop

def do_work
# If we are using MiqQueue then use the default do_work method
super unless artemis?
super if miq_queue?

# we dont do any work, we are lazy
# upon msg received, the messaging thread will execute the block in .subscribe_topic as above
# sleeping is done in do_work_loop
end

def before_exit(_message, _exit_code)
return unless artemis?
MiqQueue.artemis_client('event_handler').close
return if miq_queue?
MiqQueue.queue_client('event_handler').close
rescue => e
safe_log("Could not close artemis connection: #{e}", 1)
end
Expand Down
44 changes: 32 additions & 12 deletions app/models/miq_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,22 @@ class MiqQueue < ApplicationRecord
PRIORITY_WHICH = [:max, :high, :normal, :low, :min]
PRIORITY_DIR = [:higher, :lower]

def self.artemis_client(client_ref)
@artemis_client ||= {}
@artemis_client[client_ref] ||= begin
def self.queue_type
Settings.prototype.queue_type
end

def self.queue_client(client_ref)
@queue_client ||= {}
return if queue_type == "miq_queue"

@queue_client[client_ref] ||= begin
require "manageiq-messaging"
ManageIQ::Messaging.logger = _log
queue_settings = Settings.prototype.artemis
connect_opts = {
:host => ENV["ARTEMIS_QUEUE_HOSTNAME"] || queue_settings.queue_hostname,
:port => (ENV["ARTEMIS_QUEUE_PORT"] || queue_settings.queue_port).to_i,
:username => ENV["ARTEMIS_QUEUE_USERNAME"] || queue_settings.queue_username,
:password => ENV["ARTEMIS_QUEUE_PASSWORD"] || queue_settings.queue_password,
:client_ref => client_ref,
}

# caching the client works, even if the connection becomes unavailable
# internally the client will track the state of the connection and re-open it,
# once it's available again - at least thats true for a stomp connection
ManageIQ::Messaging::Client.open(connect_opts)
ManageIQ::Messaging::Client.open(messaging_client_options.merge(:client_ref => client_ref))
end
end

Expand Down Expand Up @@ -641,6 +639,28 @@ def self.optional_values(options, keys = [:zone])

private_class_method :optional_values

def self.messaging_client_options
queue_settings = Settings.prototype[queue_type]
connect_opts = {
:host => ENV["ARTEMIS_QUEUE_HOSTNAME"] || queue_settings.queue_hostname,
:port => (ENV["ARTEMIS_QUEUE_PORT"] || queue_settings.queue_port).to_i,
:username => ENV["ARTEMIS_QUEUE_USERNAME"] || queue_settings.queue_username,
:password => ENV["ARTEMIS_QUEUE_PASSWORD"] || queue_settings.queue_password,
:protocol => messaging_client_protocol
}
end
private_class_method :messaging_client_options

def self.messaging_client_protocol
case queue_type
when "artemis"
:Stomp
when "kafka"
:Kafka
end
end
private_class_method :messaging_client_protocol

def destroy_potentially_stale_record
destroy
rescue ActiveRecord::StaleObjectError
Expand Down
5 changes: 5 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,11 @@
:queue_port: 61616
:queue_username: admin
:queue_password: smartvm
:kafka:
:queue_hostname: localhost
:queue_port: 9092
:queue_username: admin
:queue_password: smartvm
:recommendations:
:cpu_minimum: 1
:mem_minimum: 32.megabytes
Expand Down
22 changes: 21 additions & 1 deletion spec/models/ems_event_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,27 @@
}

expect(queue_client).to receive(:publish_topic).with(expected_queue_payload)
expect(MiqQueue).to receive(:artemis_client).with('event_handler').and_return(queue_client)
expect(MiqQueue).to receive(:queue_client).with('event_handler').and_return(queue_client)

described_class.add_queue('add', ems.id, event_hash)
end
end

context "queue_type: kafka" do
before { stub_settings_merge(:prototype => {:queue_type => 'kafka'}) }

it "Adds event to Kafka topic" do
queue_client = double("ManageIQ::Messaging")

expected_queue_payload = {
:service => "events",
:sender => ems.id,
:event => event_hash[:event_type],
:payload => event_hash,
}

expect(queue_client).to receive(:publish_topic).with(expected_queue_payload)
expect(MiqQueue).to receive(:queue_client).with('event_handler').and_return(queue_client)

described_class.add_queue('add', ems.id, event_hash)
end
Expand Down

0 comments on commit 54a7579

Please sign in to comment.