From c19e513f8fa559fb0df7780007ffc8351475562c Mon Sep 17 00:00:00 2001 From: Jason Frey Date: Mon, 30 Mar 2020 15:46:50 -0400 Subject: [PATCH] Merge pull request #19984 from agrare/add_support_for_kafka Add kafka as an option for prototype.queue_type (cherry picked from commit beea070a44ee6e52654df333cb2128f8e0f4981b) --- app/models/ems_event.rb | 22 ++++++------ app/models/miq_event_handler/runner.rb | 35 -------------------- app/models/miq_queue.rb | 46 +++++++++++++++++++------- config/settings.yml | 7 +++- spec/models/ems_event_spec.rb | 36 +++++++++++++++----- 5 files changed, 79 insertions(+), 67 deletions(-) diff --git a/app/models/ems_event.rb b/app/models/ems_event.rb index 036ea8e00d2..72c31152303 100644 --- a/app/models/ems_event.rb +++ b/app/models/ems_event.rb @@ -25,22 +25,22 @@ def self.bottleneck_event_groups end def self.add_queue(meth, ems_id, event) - if Settings.prototype.queue_type == 'artemis' - MiqQueue.artemis_client('event_handler').publish_topic( - :service => "events", + unless MiqQueue.messaging_type == "miq_queue" + MiqQueue.messaging_client('event_handler').publish_topic( + :service => "manageiq.ems-events", :sender => ems_id, :event => event[:event_type], :payload => event ) - else - MiqQueue.submit_job( - :service => "event", - :target_id => ems_id, - :class_name => "EmsEvent", - :method_name => meth, - :args => [event], - ) end + + MiqQueue.submit_job( + :service => "event", + :target_id => ems_id, + :class_name => "EmsEvent", + :method_name => meth, + :args => [event] + ) end def self.add(ems_id, event_hash) diff --git a/app/models/miq_event_handler/runner.rb b/app/models/miq_event_handler/runner.rb index 9a76ddd979a..612d4e4997f 100644 --- a/app/models/miq_event_handler/runner.rb +++ b/app/models/miq_event_handler/runner.rb @@ -1,37 +1,2 @@ class MiqEventHandler::Runner < MiqQueueWorkerBase::Runner - def artemis? - Settings.prototype.queue_type == 'artemis' - end - - def do_before_work_loop - if artemis? - topic_options = { - :service => "events", - :persist_ref => "event_handler" - } - - # this block is stored in a lambda callback and is executed in another thread once a msg is received - MiqQueue.artemis_client('event_handler').subscribe_topic(topic_options) do |sender, event, payload| - _log.info "Received Event (#{event}) by sender #{sender}: #{payload[:event_type]} #{payload[:chain_id]}" - EmsEvent.add(sender.to_i, payload) - end - _log.info "Listening for events..." - end - end - - def do_work - # If we are using MiqQueue then use the default do_work method - super unless artemis? - - # we dont do any work, we are lazy - # upon msg received, the messaging thread will execute the block in .subscribe_topic as above - # sleeping is done in do_work_loop - end - - def before_exit(_message, _exit_code) - return unless artemis? - MiqQueue.artemis_client('event_handler').close - rescue => e - safe_log("Could not close artemis connection: #{e}", 1) - end end diff --git a/app/models/miq_queue.rb b/app/models/miq_queue.rb index 30f43bf4899..929ce6bf844 100644 --- a/app/models/miq_queue.rb +++ b/app/models/miq_queue.rb @@ -32,24 +32,22 @@ class MiqQueue < ApplicationRecord PRIORITY_WHICH = [:max, :high, :normal, :low, :min] PRIORITY_DIR = [:higher, :lower] - def self.artemis_client(client_ref) - @artemis_client ||= {} - @artemis_client[client_ref] ||= begin + def self.messaging_type + ENV["MESSAGING_TYPE"] || Settings.prototype.messaging_type + end + + def self.messaging_client(client_ref) + @messaging_client ||= {} + return if messaging_type == "miq_queue" + + @messaging_client[client_ref] ||= begin require "manageiq-messaging" ManageIQ::Messaging.logger = _log - queue_settings = Settings.prototype.artemis - connect_opts = { - :host => ENV["ARTEMIS_QUEUE_HOSTNAME"] || queue_settings.queue_hostname, - :port => (ENV["ARTEMIS_QUEUE_PORT"] || queue_settings.queue_port).to_i, - :username => ENV["ARTEMIS_QUEUE_USERNAME"] || queue_settings.queue_username, - :password => ENV["ARTEMIS_QUEUE_PASSWORD"] || queue_settings.queue_password, - :client_ref => client_ref, - } # caching the client works, even if the connection becomes unavailable # internally the client will track the state of the connection and re-open it, # once it's available again - at least thats true for a stomp connection - ManageIQ::Messaging::Client.open(connect_opts) + ManageIQ::Messaging::Client.open(messaging_client_options.merge(:client_ref => client_ref)) end end @@ -641,6 +639,30 @@ def self.optional_values(options, keys = [:zone]) private_class_method :optional_values + def self.messaging_client_options + messaging_settings = Settings.prototype[messaging_type] + + { + :host => ENV["MESSAGING_HOSTNAME"] || messaging_settings.messaging_hostname, + :port => (ENV["MESSAGING_PORT"] || messaging_settings.messaging_port).to_i, + :username => ENV["MESSAGING_USERNAME"] || messaging_settings.messaging_username, + :password => ENV["MESSAGING_PASSWORD"] || messaging_settings.messaging_password, + :protocol => messaging_protocol, + :encoding => "json" + } + end + private_class_method :messaging_client_options + + def self.messaging_protocol + case messaging_type + when "artemis" + :Stomp + when "kafka" + :Kafka + end + end + private_class_method :messaging_protocol + def destroy_potentially_stale_record destroy rescue ActiveRecord::StaleObjectError diff --git a/config/settings.yml b/config/settings.yml index a9cc0e10711..2df8d17f3e6 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -930,12 +930,17 @@ :run_automate_methods_on_service_api_submit: false :allow_api_service_ordering: true :prototype: - :queue_type: miq_queue + :messaging_type: miq_queue :artemis: :queue_hostname: localhost :queue_port: 61616 :queue_username: admin :queue_password: smartvm + :kafka: + :queue_hostname: localhost + :queue_port: 9092 + :queue_username: admin + :queue_password: smartvm :recommendations: :cpu_minimum: 1 :mem_minimum: 32.megabytes diff --git a/spec/models/ems_event_spec.rb b/spec/models/ems_event_spec.rb index 99884b24c7c..7a09e754aa2 100644 --- a/spec/models/ems_event_spec.rb +++ b/spec/models/ems_event_spec.rb @@ -120,28 +120,48 @@ } end - context "queue_type: artemis" do - before { stub_settings_merge(:prototype => {:queue_type => 'artemis'}) } + context "messaging_type: artemis" do + before { stub_settings_merge(:prototype => {:messaging_type => 'artemis'}) } it "Adds event to Artemis queue" do - queue_client = double("ManageIQ::Messaging") + messaging_client = double("ManageIQ::Messaging") expected_queue_payload = { - :service => "events", + :service => "manageiq.ems-events", :sender => ems.id, :event => event_hash[:event_type], :payload => event_hash, } - expect(queue_client).to receive(:publish_topic).with(expected_queue_payload) - expect(MiqQueue).to receive(:artemis_client).with('event_handler').and_return(queue_client) + expect(messaging_client).to receive(:publish_topic).with(expected_queue_payload) + expect(MiqQueue).to receive(:messaging_client).with('event_handler').and_return(messaging_client) described_class.add_queue('add', ems.id, event_hash) end end - context "queue_type: miq_queue" do - before { stub_settings_merge(:prototype => {:queue_type => 'miq_queue'}) } + context "messaging_type: kafka" do + before { stub_settings_merge(:prototype => {:messaging_type => 'kafka'}) } + + it "Adds event to Kafka topic" do + messaging_client = double("ManageIQ::Messaging") + + expected_queue_payload = { + :service => "manageiq.ems-events", + :sender => ems.id, + :event => event_hash[:event_type], + :payload => event_hash, + } + + expect(messaging_client).to receive(:publish_topic).with(expected_queue_payload) + expect(MiqQueue).to receive(:messaging_client).with('event_handler').and_return(messaging_client) + + described_class.add_queue('add', ems.id, event_hash) + end + end + + context "messaging_type: miq_queue" do + before { stub_settings_merge(:prototype => {:messaging_type => 'miq_queue'}) } it "Adds event to MiqQueue" do expected_queue_payload = {