Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka as an option for prototype.queue_type #19984

Merged
merged 5 commits into from
Mar 30, 2020

Conversation

agrare
Copy link
Member

@agrare agrare commented Mar 20, 2020

Currently we have two queue_types, miq_queue and artemis with the latter allowing the use of ManageIQ::Messaging client.

ManageIQ::Messaging also has support for kafka which we can also allow to be used here.

$ bin/kafka-console-consumer.sh --bootstrap-server 'localhost:9092' --topic events --from-beginning
{"event_type":"VmPoweredOnEvent",
"chain_id":"651818",
"is_task":false,
"source":"VC",
"message":"cfme-5.11 on  dell-r430-13.cloudforms.lab.eng.rdu2.redhat.com in dev-vc67-DC is powered on",
"timestamp":"2020-03-23T20:51:56.366773Z",
"full_data":{"key":"651820",
  "chainId":"651818",
  "createdTime":"2020-03-23T20:51:56.366773Z",
  "userName":"VSPHERE.LOCAL\\Administrator",
  "datacenter":{"name":"dev-vc67-DC", "datacenter":"datacenter-104"},
  "computeResource":{"name":"dev-vc67-cluster","computeResource":"domain-c109"},
  "host":{"name":"dell-r430-13.cloudforms.lab.eng.rdu2.redhat.com","host":"host-114"},
  "vm":{"name":"cfme-5.11","vm":"vm-685","path":"[NFS Share] cfme-5.11/cfme-5.11.vmx","uuid":"423dbb7a-bc76-a738-2740-4de960bd33da"},
  "fullFormattedMessage":"cfme-5.11 on  dell-r430-13.cloudforms.lab.eng.rdu2.redhat.com in dev-vc67-DC is powered on",
  "changeTag":"", 
  "template":"false", 
  "eventType":"VmPoweredOnEvent"},
"ems_id":"2",
"username":"VSPHERE.LOCAL\\Administrator",
"vm_ems_ref":"vm-685",
"vm_name":"cfme-5.11",
"vm_location":"[NFS Share] cfme-5.11/cfme-5.11.vmx",
"vm_uid_ems":"423dbb7a-bc76-a738-2740-4de960bd33da",
"host_name":"dell-r430-13.cloudforms.lab.eng.rdu2.redhat.com",
"host_ems_ref":"host-114"}

app/models/miq_queue.rb Outdated Show resolved Hide resolved
@agrare agrare force-pushed the add_support_for_kafka branch 3 times, most recently from 54a7579 to b7b0fa8 Compare March 20, 2020 18:10
@Fryguy
Copy link
Member

Fryguy commented Mar 20, 2020

cc @carbonin

:Kafka
end
end
private_class_method :messaging_client_protocol
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, I feel like this should be pushed down into manageiq-messaging... feels like something the caller shouldn't have to know about.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree

@chessbyte chessbyte added this to the Jansa milestone Mar 21, 2020
app/models/miq_queue.rb Outdated Show resolved Hide resolved
@agrare agrare changed the title [WIP] Add kafka as an option for prototype.queue_type Add kafka as an option for prototype.queue_type Mar 23, 2020
@agrare agrare added core/queue and removed wip labels Mar 23, 2020
@agrare
Copy link
Member Author

agrare commented Mar 23, 2020

#20000

@agrare agrare force-pushed the add_support_for_kafka branch 4 times, most recently from a7db1ee to 951dfcd Compare March 23, 2020 20:37
@agrare
Copy link
Member Author

agrare commented Mar 23, 2020

So when I have this use YAML, manageiq-messaging fails on YAML.safe_load:

INFO -- : MIQ(MiqQueue.raw_publish) Published to topic(manageiq.events), msg({:event_type=>"VmPoweredOnEvent", :chain_id=>"651785", :is_task=>false, :source=>"VC", :message=>"cfm)
ERROR -- : MIQ(MiqQueue.process_topic_message) Event processing error: Tried to load unspecified class: Symbol
ERROR -- : MIQ(MiqQueue.process_topic_message) /usr/lib/ruby/2.5.0/psych/class_loader.rb:97:in `find'
 /usr/lib/ruby/2.5.0/psych/class_loader.rb:28:in `load'
 /usr/lib/ruby/2.5.0/psych/class_loader.rb:39:in `block (2 levels) in <class:ClassLoader>'
 ...
 /usr/lib/ruby/2.5.0/psych.rb:325:in `safe_load'

And when I use the JSON encoding the payload that ManageIQ::Messaging yields to the block is nil

ERROR -- : MIQ(MiqQueue.process_topic_message) Event processing error: undefined method `[]' for nil:NilClass
ERROR -- : MIQ(MiqQueue.process_topic_message) /home/grare/adam/src/manageiq/manageiq/app/models/miq_event_handler/runner.rb:16:in `block in do_before_work_loop'
 /home/grare/adam/.gem/gems/manageiq-messaging-0.1.5/lib/manageiq/messaging/kafka/common.rb:104:in `process_topic_message'
 /home/grare/adam/.gem/gems/manageiq-messaging-0.1.5/lib/manageiq/messaging/kafka/topic.rb:20:in `block in subscribe_topic_impl'

Going to mark this WIP until this is figured out, or we could yield the JSON messages to the topic and publish to MiqQueue which is what we're planning on for metrics and allow kafka as an MIQ Queue replacement in a future PR @Fryguy

@agrare agrare changed the title Add kafka as an option for prototype.queue_type [WIP] Add kafka as an option for prototype.queue_type Mar 23, 2020
@miq-bot miq-bot added the wip label Mar 23, 2020
@Fryguy
Copy link
Member

Fryguy commented Mar 23, 2020

Yeah the symbol keys were what I was concerned about.

@agrare agrare force-pushed the add_support_for_kafka branch from 23c694c to d895578 Compare March 24, 2020 12:46
@agrare agrare changed the title [WIP] Add kafka as an option for prototype.queue_type Add kafka as an option for prototype.queue_type Mar 24, 2020
@agrare
Copy link
Member Author

agrare commented Mar 24, 2020

Okay instead of a queue replacement this will now publish events to kafka/artemis if available in a json format in addition to the normal event handling over MiqQueue.

This should make it easier for other services to consume events (json format not yaml w/ symbols) and when we want to move to kafka as a queue replacement we can update the event handler to deal with json payloads

@miq-bot miq-bot removed the wip label Mar 24, 2020
@agrare agrare force-pushed the add_support_for_kafka branch from 57dfefb to 18bd565 Compare March 24, 2020 13:43
@agrare agrare linked an issue Mar 24, 2020 that may be closed by this pull request
2 tasks
@jrafanie
Copy link
Member

So when I have this use YAML, manageiq-messaging fails on YAML.safe_load:

INFO -- : MIQ(MiqQueue.raw_publish) Published to topic(manageiq.events), msg({:event_type=>"VmPoweredOnEvent", :chain_id=>"651785", :is_task=>false, :source=>"VC", :message=>"cfm)
ERROR -- : MIQ(MiqQueue.process_topic_message) Event processing error: Tried to load unspecified class: Symbol
ERROR -- : MIQ(MiqQueue.process_topic_message) /usr/lib/ruby/2.5.0/psych/class_loader.rb:97:in `find'
 /usr/lib/ruby/2.5.0/psych/class_loader.rb:28:in `load'
 /usr/lib/ruby/2.5.0/psych/class_loader.rb:39:in `block (2 levels) in <class:ClassLoader>'
 ...
 /usr/lib/ruby/2.5.0/psych.rb:325:in `safe_load'

FYI, in case this was a surprise or worked previously... it could be due to #19701 because we weren't honoring safe_load until that PR "fixed" it. The fix could then cause these Psych::DisallowedClass where it worked previously.

I agree, using string keys or json is probably best.

agrare added 5 commits March 30, 2020 10:38
Currently we have two queue_types, miq_queue and artemis with the later
allowing the use of ManageIQ::Messaging client.

ManageIQ::Messaging also has support for kafka which we can also allow
to be used here.
@agrare agrare force-pushed the add_support_for_kafka branch from 18bd565 to cf48c7e Compare March 30, 2020 14:38
@miq-bot
Copy link
Member

miq-bot commented Mar 30, 2020

Checked commits agrare/manageiq@f724ebe~...cf48c7e with ruby 2.5.7, rubocop 0.69.0, haml-lint 0.28.0, and yamllint
4 files checked, 0 offenses detected
Everything looks fine. 🍪

@Fryguy Fryguy merged commit beea070 into ManageIQ:master Mar 30, 2020
@agrare agrare deleted the add_support_for_kafka branch March 30, 2020 19:50
simaishi pushed a commit that referenced this pull request Apr 8, 2020
Add kafka as an option for prototype.queue_type

(cherry picked from commit beea070)
@simaishi
Copy link
Contributor

simaishi commented Apr 8, 2020

Jansa backport details:

$ git log -1
commit c19e513f8fa559fb0df7780007ffc8351475562c
Author: Jason Frey <[email protected]>
Date:   Mon Mar 30 15:46:50 2020 -0400

    Merge pull request #19984 from agrare/add_support_for_kafka

    Add kafka as an option for prototype.queue_type

    (cherry picked from commit beea070a44ee6e52654df333cb2128f8e0f4981b)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Providers] Syndicate events to Kafka
7 participants