Skip to content

Commit

Permalink
Merge pull request #19984 from agrare/add_support_for_kafka
Browse files Browse the repository at this point in the history
Add kafka as an option for prototype.queue_type

(cherry picked from commit beea070)
  • Loading branch information
Fryguy authored and simaishi committed Apr 8, 2020
1 parent 9912863 commit c19e513
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 67 deletions.
22 changes: 11 additions & 11 deletions app/models/ems_event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ def self.bottleneck_event_groups
end

def self.add_queue(meth, ems_id, event)
if Settings.prototype.queue_type == 'artemis'
MiqQueue.artemis_client('event_handler').publish_topic(
:service => "events",
unless MiqQueue.messaging_type == "miq_queue"
MiqQueue.messaging_client('event_handler').publish_topic(
:service => "manageiq.ems-events",
:sender => ems_id,
:event => event[:event_type],
:payload => event
)
else
MiqQueue.submit_job(
:service => "event",
:target_id => ems_id,
:class_name => "EmsEvent",
:method_name => meth,
:args => [event],
)
end

MiqQueue.submit_job(
:service => "event",
:target_id => ems_id,
:class_name => "EmsEvent",
:method_name => meth,
:args => [event]
)
end

def self.add(ems_id, event_hash)
Expand Down
35 changes: 0 additions & 35 deletions app/models/miq_event_handler/runner.rb
Original file line number Diff line number Diff line change
@@ -1,37 +1,2 @@
class MiqEventHandler::Runner < MiqQueueWorkerBase::Runner
def artemis?
Settings.prototype.queue_type == 'artemis'
end

def do_before_work_loop
if artemis?
topic_options = {
:service => "events",
:persist_ref => "event_handler"
}

# this block is stored in a lambda callback and is executed in another thread once a msg is received
MiqQueue.artemis_client('event_handler').subscribe_topic(topic_options) do |sender, event, payload|
_log.info "Received Event (#{event}) by sender #{sender}: #{payload[:event_type]} #{payload[:chain_id]}"
EmsEvent.add(sender.to_i, payload)
end
_log.info "Listening for events..."
end
end

def do_work
# If we are using MiqQueue then use the default do_work method
super unless artemis?

# we dont do any work, we are lazy
# upon msg received, the messaging thread will execute the block in .subscribe_topic as above
# sleeping is done in do_work_loop
end

def before_exit(_message, _exit_code)
return unless artemis?
MiqQueue.artemis_client('event_handler').close
rescue => e
safe_log("Could not close artemis connection: #{e}", 1)
end
end
46 changes: 34 additions & 12 deletions app/models/miq_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,22 @@ class MiqQueue < ApplicationRecord
PRIORITY_WHICH = [:max, :high, :normal, :low, :min]
PRIORITY_DIR = [:higher, :lower]

def self.artemis_client(client_ref)
@artemis_client ||= {}
@artemis_client[client_ref] ||= begin
def self.messaging_type
ENV["MESSAGING_TYPE"] || Settings.prototype.messaging_type
end

def self.messaging_client(client_ref)
@messaging_client ||= {}
return if messaging_type == "miq_queue"

@messaging_client[client_ref] ||= begin
require "manageiq-messaging"
ManageIQ::Messaging.logger = _log
queue_settings = Settings.prototype.artemis
connect_opts = {
:host => ENV["ARTEMIS_QUEUE_HOSTNAME"] || queue_settings.queue_hostname,
:port => (ENV["ARTEMIS_QUEUE_PORT"] || queue_settings.queue_port).to_i,
:username => ENV["ARTEMIS_QUEUE_USERNAME"] || queue_settings.queue_username,
:password => ENV["ARTEMIS_QUEUE_PASSWORD"] || queue_settings.queue_password,
:client_ref => client_ref,
}

# caching the client works, even if the connection becomes unavailable
# internally the client will track the state of the connection and re-open it,
# once it's available again - at least thats true for a stomp connection
ManageIQ::Messaging::Client.open(connect_opts)
ManageIQ::Messaging::Client.open(messaging_client_options.merge(:client_ref => client_ref))
end
end

Expand Down Expand Up @@ -641,6 +639,30 @@ def self.optional_values(options, keys = [:zone])

private_class_method :optional_values

def self.messaging_client_options
messaging_settings = Settings.prototype[messaging_type]

{
:host => ENV["MESSAGING_HOSTNAME"] || messaging_settings.messaging_hostname,
:port => (ENV["MESSAGING_PORT"] || messaging_settings.messaging_port).to_i,
:username => ENV["MESSAGING_USERNAME"] || messaging_settings.messaging_username,
:password => ENV["MESSAGING_PASSWORD"] || messaging_settings.messaging_password,
:protocol => messaging_protocol,
:encoding => "json"
}
end
private_class_method :messaging_client_options

def self.messaging_protocol
case messaging_type
when "artemis"
:Stomp
when "kafka"
:Kafka
end
end
private_class_method :messaging_protocol

def destroy_potentially_stale_record
destroy
rescue ActiveRecord::StaleObjectError
Expand Down
7 changes: 6 additions & 1 deletion config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -930,12 +930,17 @@
:run_automate_methods_on_service_api_submit: false
:allow_api_service_ordering: true
:prototype:
:queue_type: miq_queue
:messaging_type: miq_queue
:artemis:
:queue_hostname: localhost
:queue_port: 61616
:queue_username: admin
:queue_password: smartvm
:kafka:
:queue_hostname: localhost
:queue_port: 9092
:queue_username: admin
:queue_password: smartvm
:recommendations:
:cpu_minimum: 1
:mem_minimum: 32.megabytes
Expand Down
36 changes: 28 additions & 8 deletions spec/models/ems_event_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,48 @@
}
end

context "queue_type: artemis" do
before { stub_settings_merge(:prototype => {:queue_type => 'artemis'}) }
context "messaging_type: artemis" do
before { stub_settings_merge(:prototype => {:messaging_type => 'artemis'}) }

it "Adds event to Artemis queue" do
queue_client = double("ManageIQ::Messaging")
messaging_client = double("ManageIQ::Messaging")

expected_queue_payload = {
:service => "events",
:service => "manageiq.ems-events",
:sender => ems.id,
:event => event_hash[:event_type],
:payload => event_hash,
}

expect(queue_client).to receive(:publish_topic).with(expected_queue_payload)
expect(MiqQueue).to receive(:artemis_client).with('event_handler').and_return(queue_client)
expect(messaging_client).to receive(:publish_topic).with(expected_queue_payload)
expect(MiqQueue).to receive(:messaging_client).with('event_handler').and_return(messaging_client)

described_class.add_queue('add', ems.id, event_hash)
end
end

context "queue_type: miq_queue" do
before { stub_settings_merge(:prototype => {:queue_type => 'miq_queue'}) }
context "messaging_type: kafka" do
before { stub_settings_merge(:prototype => {:messaging_type => 'kafka'}) }

it "Adds event to Kafka topic" do
messaging_client = double("ManageIQ::Messaging")

expected_queue_payload = {
:service => "manageiq.ems-events",
:sender => ems.id,
:event => event_hash[:event_type],
:payload => event_hash,
}

expect(messaging_client).to receive(:publish_topic).with(expected_queue_payload)
expect(MiqQueue).to receive(:messaging_client).with('event_handler').and_return(messaging_client)

described_class.add_queue('add', ems.id, event_hash)
end
end

context "messaging_type: miq_queue" do
before { stub_settings_merge(:prototype => {:messaging_type => 'miq_queue'}) }

it "Adds event to MiqQueue" do
expected_queue_payload = {
Expand Down

0 comments on commit c19e513

Please sign in to comment.