diff --git a/app/models/ems_event.rb b/app/models/ems_event.rb index 036ea8e00d23..fa84dd1b8111 100644 --- a/app/models/ems_event.rb +++ b/app/models/ems_event.rb @@ -25,14 +25,7 @@ def self.bottleneck_event_groups end def self.add_queue(meth, ems_id, event) - if Settings.prototype.queue_type == 'artemis' - MiqQueue.artemis_client('event_handler').publish_topic( - :service => "events", - :sender => ems_id, - :event => event[:event_type], - :payload => event - ) - else + if MiqQueue.queue_type == "miq_queue" MiqQueue.submit_job( :service => "event", :target_id => ems_id, @@ -40,6 +33,13 @@ def self.add_queue(meth, ems_id, event) :method_name => meth, :args => [event], ) + else + MiqQueue.queue_client('event_handler').publish_topic( + :service => "events", + :sender => ems_id, + :event => event[:event_type], + :payload => event + ) end end diff --git a/app/models/miq_event_handler/runner.rb b/app/models/miq_event_handler/runner.rb index 9a76ddd979af..809d5d1466ba 100644 --- a/app/models/miq_event_handler/runner.rb +++ b/app/models/miq_event_handler/runner.rb @@ -1,17 +1,17 @@ class MiqEventHandler::Runner < MiqQueueWorkerBase::Runner - def artemis? - Settings.prototype.queue_type == 'artemis' + def miq_queue? + MiqQueue.queue_type == "miq_queue" end def do_before_work_loop - if artemis? + unless miq_queue? topic_options = { :service => "events", :persist_ref => "event_handler" } # this block is stored in a lambda callback and is executed in another thread once a msg is received - MiqQueue.artemis_client('event_handler').subscribe_topic(topic_options) do |sender, event, payload| + MiqQueue.queue_client('event_handler').subscribe_topic(topic_options) do |sender, event, payload| _log.info "Received Event (#{event}) by sender #{sender}: #{payload[:event_type]} #{payload[:chain_id]}" EmsEvent.add(sender.to_i, payload) end @@ -21,7 +21,7 @@ def do_before_work_loop def do_work # If we are using MiqQueue then use the default do_work method - super unless artemis? + super if miq_queue? # we dont do any work, we are lazy # upon msg received, the messaging thread will execute the block in .subscribe_topic as above @@ -29,8 +29,8 @@ def do_work end def before_exit(_message, _exit_code) - return unless artemis? - MiqQueue.artemis_client('event_handler').close + return if miq_queue? + MiqQueue.queue_client('event_handler').close rescue => e safe_log("Could not close artemis connection: #{e}", 1) end diff --git a/app/models/miq_queue.rb b/app/models/miq_queue.rb index 30f43bf4899c..f8b90be491a7 100644 --- a/app/models/miq_queue.rb +++ b/app/models/miq_queue.rb @@ -32,24 +32,22 @@ class MiqQueue < ApplicationRecord PRIORITY_WHICH = [:max, :high, :normal, :low, :min] PRIORITY_DIR = [:higher, :lower] - def self.artemis_client(client_ref) - @artemis_client ||= {} - @artemis_client[client_ref] ||= begin + def self.queue_type + Settings.prototype.queue_type + end + + def self.queue_client(client_ref) + @queue_client ||= {} + return if queue_type == "miq_queue" + + @queue_client[client_ref] ||= begin require "manageiq-messaging" ManageIQ::Messaging.logger = _log - queue_settings = Settings.prototype.artemis - connect_opts = { - :host => ENV["ARTEMIS_QUEUE_HOSTNAME"] || queue_settings.queue_hostname, - :port => (ENV["ARTEMIS_QUEUE_PORT"] || queue_settings.queue_port).to_i, - :username => ENV["ARTEMIS_QUEUE_USERNAME"] || queue_settings.queue_username, - :password => ENV["ARTEMIS_QUEUE_PASSWORD"] || queue_settings.queue_password, - :client_ref => client_ref, - } # caching the client works, even if the connection becomes unavailable # internally the client will track the state of the connection and re-open it, # once it's available again - at least thats true for a stomp connection - ManageIQ::Messaging::Client.open(connect_opts) + ManageIQ::Messaging::Client.open(messaging_client_options.merge(:client_ref => client_ref)) end end @@ -641,6 +639,28 @@ def self.optional_values(options, keys = [:zone]) private_class_method :optional_values + def self.messaging_client_options + queue_settings = Settings.prototype[queue_type] + connect_opts = { + :host => ENV["ARTEMIS_QUEUE_HOSTNAME"] || queue_settings.queue_hostname, + :port => (ENV["ARTEMIS_QUEUE_PORT"] || queue_settings.queue_port).to_i, + :username => ENV["ARTEMIS_QUEUE_USERNAME"] || queue_settings.queue_username, + :password => ENV["ARTEMIS_QUEUE_PASSWORD"] || queue_settings.queue_password, + :protocol => messaging_client_protocol + } + end + private_class_method :messaging_client_options + + def self.messaging_client_protocol + case queue_type + when "artemis" + :Stomp + when "kafka" + :Kafka + end + end + private_class_method :messaging_client_protocol + def destroy_potentially_stale_record destroy rescue ActiveRecord::StaleObjectError diff --git a/config/settings.yml b/config/settings.yml index a9cc0e107118..6ee1515430f5 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -936,6 +936,11 @@ :queue_port: 61616 :queue_username: admin :queue_password: smartvm + :kafka: + :queue_hostname: localhost + :queue_port: 9092 + :queue_username: admin + :queue_password: smartvm :recommendations: :cpu_minimum: 1 :mem_minimum: 32.megabytes diff --git a/spec/models/ems_event_spec.rb b/spec/models/ems_event_spec.rb index 99884b24c7cc..769b8383aeb2 100644 --- a/spec/models/ems_event_spec.rb +++ b/spec/models/ems_event_spec.rb @@ -134,7 +134,27 @@ } expect(queue_client).to receive(:publish_topic).with(expected_queue_payload) - expect(MiqQueue).to receive(:artemis_client).with('event_handler').and_return(queue_client) + expect(MiqQueue).to receive(:queue_client).with('event_handler').and_return(queue_client) + + described_class.add_queue('add', ems.id, event_hash) + end + end + + context "queue_type: kafka" do + before { stub_settings_merge(:prototype => {:queue_type => 'kafka'}) } + + it "Adds event to Kafka topic" do + queue_client = double("ManageIQ::Messaging") + + expected_queue_payload = { + :service => "events", + :sender => ems.id, + :event => event_hash[:event_type], + :payload => event_hash, + } + + expect(queue_client).to receive(:publish_topic).with(expected_queue_payload) + expect(MiqQueue).to receive(:queue_client).with('event_handler').and_return(queue_client) described_class.add_queue('add', ems.id, event_hash) end