-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fog google upgrade (to 1.3.3) #54
Changes from all commits
5bbad7d
342c133
5002c0f
0d7510c
efd4d67
2e53e0b
8b5bc95
93c294e
e1742d1
c3d422f
0f8432c
d9258b0
523294b
1f43443
053bfca
b11719c
c245a1c
4eaba21
0daef0c
6e4576a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,7 +23,7 @@ def stop | |
# Poll for events (blocks forever until #stop is called) | ||
def each_batch | ||
while @collecting_events | ||
yield events.map { |e| JSON.parse(e.message['data']) } | ||
yield events.map { |e| JSON.parse(e[:message][:data]) } | ||
end | ||
end | ||
|
||
|
@@ -38,23 +38,24 @@ def each_batch | |
def events | ||
# For now, return immediately with up to 10 messages | ||
@ems.with_provider_connection(:service => 'pubsub') do |google| | ||
subscription = get_or_create_subscription(google) | ||
subscription.pull(:return_immediately => true, :max_messages => 10).tap do |msgs| | ||
subscription.acknowledge(msgs) | ||
end | ||
get_or_create_subscription(google) | ||
# FIXME: Change once https://github.com/fog/fog-google/issues/349 is resolved | ||
# In normal case we would use the return value of previous command and call :pull method on it. | ||
# Due to Google API inconsitency we tend to implement our own pull method and pull it directly from the service. | ||
# Current subscription.pull() in Fog follows Google API specification and does a Base64 decoding on the payload. | ||
# This is consistent with Google API documentation for PubsubMessage | ||
# (see https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage). | ||
# However in real world, the data comes in plain text, so the Base64 decoding makes it gibberish. | ||
# Pulling directly from PubSub service workarounds it. | ||
pull_subscription(google).tap { |msgs| acknowledge_messages(google, msgs) } | ||
end | ||
rescue Fog::Errors::Error | ||
raise ProviderUnreachable, "Error when contacting Google Pubsub for events; this may be a temporary failure." | ||
end | ||
|
||
def get_or_create_subscription(google) | ||
# If event catcher is not yet setup, then we'll get a fog error | ||
google.subscriptions.get(subscription_name) || | ||
google.subscriptions.create(:name => subscription_name, | ||
# add empty config - workaround for https://github.com/fog/fog-google/issues/214 | ||
# TODO: remove once the above is resolved | ||
:push_config => {}, | ||
:topic => topic_name) | ||
google.subscriptions.get(subscription_name) || google.subscriptions.create(:name => subscription_name, :topic => topic_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 nice catch now that fog/fog-google#214 is fixed |
||
rescue Fog::Errors::NotFound | ||
# Rather than expose the notfound error, we expose our own exception | ||
# indicating that the worker thread should back off | ||
|
@@ -64,6 +65,19 @@ def get_or_create_subscription(google) | |
raise TopicNotFound, msg | ||
end | ||
|
||
def pull_subscription(google) | ||
options = {:return_immediately => true, :max_messages => 10} | ||
data = google.pull_subscription(subscription_name, options).to_h | ||
|
||
data[:received_messages].to_a | ||
end | ||
|
||
def acknowledge_messages(google, messages) | ||
return if messages.empty? | ||
ack_ids = messages.collect { |m| m[:ack_id] } | ||
google.acknowledge_subscription(subscription_name, ack_ids) | ||
end | ||
|
||
def subscription_name | ||
"projects/#{@ems.project}/subscriptions/manageiq-eventcatcher-#{@ems.guid}" | ||
end | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,46 +37,45 @@ class ManageIQ::Providers::Google::CloudManager::MetricsCapture < ManageIQ::Prov | |
VIM_COUNTER_SCHEMAS = [ | ||
{ | ||
# Name of the VIM_STYLE_COUNTER this schema describes | ||
:vim_counter_name => "cpu_usage_rate_average", | ||
:vim_counter_name => "cpu_usage_rate_average", | ||
|
||
# List of metric names in GCP that should be retrieved to calculate the | ||
# vim-style metric | ||
:google_metric_names => ["compute.googleapis.com/instance/cpu/utilization"], | ||
|
||
# A function that maps a target to a list of google labels to be applied | ||
# to the request. Only results matching the label are returned. | ||
:target_to_google_labels => ->(target) { ["compute.googleapis.com/resource_id==#{target.ems_ref}"] }, | ||
# https://cloud.google.com/monitoring/api/metrics_gcp#gcp-compute | ||
:google_metric_names => %w(compute.googleapis.com/instance/cpu/utilization), | ||
|
||
# Function that maps a point returned by Google's monitoring api (which | ||
# is a hash data structure; see | ||
# https://cloud.google.com/monitoring/v2beta2/timeseries) to our counter | ||
# value. Any unit transformations are applied as well. | ||
:point_to_val => ->(point) { point["doubleValue"].to_f * 100 }, | ||
# https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries) | ||
# to our counter value. Any unit transformations are applied as well. | ||
:point_to_val => ->(point) { point[:double_value].to_f * 100 }, | ||
|
||
# Function that takes two points and reduces them to one. This is used | ||
# when multiple points are found for the same data point in the same | ||
# query (e.g. if we are querying disk usage and the host has multiple | ||
# disks, this method is used to combine the points into a single metric) | ||
:reducer => lambda do |x, _| | ||
:reducer => lambda do |x, _| | ||
_log.warn("Received multiple values for cpu_usage; ignoring duplicates") | ||
x | ||
end | ||
}, | ||
{ | ||
:vim_counter_name => "disk_usage_rate_average", | ||
:google_metric_names => ["compute.googleapis.com/instance/disk/read_bytes_count", | ||
"compute.googleapis.com/instance/disk/write_bytes_count"], | ||
:target_to_google_labels => ->(target) { ["compute.googleapis.com/resource_id==#{target.ems_ref}"] }, | ||
:point_to_val => ->(point) { point["int64Value"].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s | ||
:reducer => ->(x, y) { x + y }, | ||
:vim_counter_name => "disk_usage_rate_average", | ||
:google_metric_names => %w( | ||
compute.googleapis.com/instance/disk/read_bytes_count | ||
compute.googleapis.com/instance/disk/write_bytes_count | ||
), | ||
:point_to_val => ->(point) { point[:int64_value].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s | ||
:reducer => ->(x, y) { x + y }, | ||
}, | ||
{ | ||
:vim_counter_name => "net_usage_rate_average", | ||
:google_metric_names => ["compute.googleapis.com/instance/network/received_bytes_count", | ||
"compute.googleapis.com/instance/network/sent_bytes_count"], | ||
:target_to_google_labels => ->(target) { ["compute.googleapis.com/resource_id==#{target.ems_ref}"] }, | ||
:point_to_val => ->(point) { point["int64Value"].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s | ||
:reducer => ->(x, y) { x + y }, | ||
:vim_counter_name => "net_usage_rate_average", | ||
:google_metric_names => %w( | ||
compute.googleapis.com/instance/network/received_bytes_count | ||
compute.googleapis.com/instance/network/sent_bytes_count | ||
), | ||
:point_to_val => ->(point) { point[:int64_value].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s | ||
:reducer => ->(x, y) { x + y }, | ||
} | ||
].freeze | ||
|
||
|
@@ -127,15 +126,19 @@ def perf_collect_metrics(_interval_name, start_time = nil, end_time = nil) | |
# aggregate metrics onto (will be modified by method) | ||
# @return nil | ||
def collect_metrics(google, start_time, end_time, schema, counter_values_by_ts) | ||
interval = { | ||
:start_time => start_time.to_datetime.rfc3339, | ||
:end_time => end_time.to_datetime.rfc3339 | ||
} | ||
|
||
schema[:google_metric_names].each do |google_metric_name| | ||
options = { | ||
:labels => schema[:target_to_google_labels].call(target), | ||
:oldest => start_time.to_datetime.rfc3339, | ||
} | ||
# For filter creation and entity selection see https://cloud.google.com/monitoring/api/v3/filters | ||
filter = "metric.type = \"#{google_metric_name}\" AND resource.labels.instance_id = \"#{target.ems_ref}\"" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This looks like a much nicer interface than the target_to_google_labels There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is indeed! Also the filter query allows much more precise matching and has a lot of options. That's really a nice thing. 👍 |
||
|
||
# Make our service call for metrics; Note that we might get multiple | ||
# time series back (for example, if the host has multiple disks/network | ||
# cards) | ||
tss = google.timeseries_collection.all(google_metric_name, end_time.to_datetime.rfc3339, options) | ||
tss = google.timeseries_collection.all(:filter => filter, :interval => interval) | ||
|
||
tss.each do |ts| | ||
collect_time_series_metrics(ts, schema, counter_values_by_ts) | ||
|
@@ -147,7 +150,8 @@ def collect_metrics(google, start_time, end_time, schema, counter_values_by_ts) | |
# provided 'counter_values_by_ts' hash, using the provided schema. | ||
# | ||
# @param time_series [Hash] resource returned by GCP describing a metric | ||
# lookup result (see https://cloud.google.com/monitoring/v2beta2/timeseries) | ||
# lookup result (see | ||
# https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries) | ||
# @param schema [Hash] schema describing the metric to query (see | ||
# VIM_STYLE_COUNTERS definition for a description) | ||
# @param counter_values_by_ts [Hash{Time => Hash{String => Number}}] hash to | ||
|
@@ -159,8 +163,8 @@ def collect_time_series_metrics(time_series, schema, counter_values_by_ts) | |
# minute; this allows us to sum up points across time series that may | ||
# have landed on different seconds. Note this only holds true for | ||
# 1-minute metrics. | ||
timestamp = Time.zone.parse(point["start"]).beginning_of_minute | ||
val = schema[:point_to_val].call(point) | ||
timestamp = Time.zone.parse(point[:interval][:start_time]).beginning_of_minute | ||
val = schema[:point_to_val].call(point[:value]) | ||
|
||
# If we already have a value, reduce using our reduction function | ||
prev_val = counter_values_by_ts.fetch_path(timestamp, schema[:vim_counter_name]) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,11 +18,6 @@ def prepare_for_clone_task | |
clone_options[:machine_type] = instance_type.ems_ref | ||
clone_options[:zone_name] = dest_availability_zone.ems_ref | ||
clone_options[:preemptible] = get_option(:is_preemptible) | ||
# fog-google specifies a default value that's incompatible with | ||
# :preemptible; until this is fixed we need to be explicit about the host | ||
# behavior on maintenance | ||
# issue: https://github.com/fog/fog-google/issues/136 | ||
clone_options[:on_host_maintenance] = "TERMINATE" if clone_options[:preemptible] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tumido - how come this was removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just see the issue noted in the comment on the line above. It's been solved so this conversion is done by fog by default. |
||
|
||
if clone_options[:user_data] | ||
clone_options[:metadata] = {"user-data" => Base64.encode64(clone_options[:user_data]), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,9 +41,12 @@ def get_zones | |
end | ||
|
||
def get_flavors | ||
# connection.flavors returns a duplicate flavor for every zone | ||
# so build a unique list of flavors using the flavor id | ||
flavors = @connection.flavors.to_a.uniq(&:id) | ||
# Google API returns a duplicate flavor for every zone | ||
# so build a unique list of flavors using the flavor | ||
flavors = @connection.list_aggregated_machine_types.items.values.each_with_object([]) do |zone, arr| | ||
arr.concat(zone.machine_types) if zone.machine_types | ||
end | ||
flavors.uniq!(&:id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Think this can be a bit simpler using flat_map+compact+uniq, how about:
|
||
process_collection(flavors, :flavors) { |flavor| parse_flavor(flavor) } | ||
end | ||
|
||
|
@@ -126,7 +129,7 @@ def parse_volume(volume) | |
zone_id = parse_uid_from_url(volume.zone) | ||
|
||
new_result = { | ||
:ems_ref => volume.id, | ||
:ems_ref => volume.id.to_s, | ||
:name => volume.name, | ||
:status => volume.status, | ||
:creation_time => volume.creation_timestamp, | ||
|
@@ -147,7 +150,7 @@ def parse_volume(volume) | |
|
||
def parse_snapshot(snapshot) | ||
new_result = { | ||
:ems_ref => snapshot.id, | ||
:ems_ref => snapshot.id.to_s, | ||
:type => "ManageIQ::Providers::Google::CloudManager::CloudVolumeSnapshot", | ||
:name => snapshot.name, | ||
:status => snapshot.status, | ||
|
@@ -161,7 +164,7 @@ def parse_snapshot(snapshot) | |
end | ||
|
||
def parse_storage_as_template(storage) | ||
uid = storage.id | ||
uid = storage.id.to_s | ||
name = storage.name | ||
name ||= uid | ||
type = ManageIQ::Providers::Google::CloudManager::Template.name | ||
|
@@ -206,20 +209,20 @@ def parse_ssh_key(ssh_key) | |
end | ||
|
||
def parse_instance(instance) | ||
uid = instance.id | ||
uid = instance.id.to_s | ||
name = instance.name | ||
name ||= uid | ||
|
||
flavor_uid = parse_uid_from_url(instance.machine_type) | ||
flavor = @data_index.fetch_path(:flavors, flavor_uid) | ||
|
||
# If the flavor isn't found in our index, check if it is a custom flavor | ||
# that we have to get directly | ||
flavor = query_and_add_flavor(flavor_uid) if flavor.nil? | ||
|
||
zone_uid = parse_uid_from_url(instance.zone) | ||
zone = @data_index.fetch_path(:availability_zones, zone_uid) | ||
|
||
# If the flavor isn't found in our index, check if it is a custom flavor | ||
# that we have to get directly | ||
flavor = query_and_add_flavor(flavor_uid, zone_uid) if flavor.nil? | ||
|
||
parent_image_uid = parse_instance_parent_image(instance) | ||
parent_image = @data_index.fetch_path(:vms, parent_image_uid) | ||
|
||
|
@@ -233,7 +236,7 @@ def parse_instance(instance) | |
:name => name, | ||
:description => instance.description, | ||
:vendor => "google", | ||
:raw_power_state => instance.state, | ||
:raw_power_state => instance.status, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this changed in the google-api? Seems like an odd change for them to make, or were we relying on some conversion that the older fog was using? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, just a fog thing. They've also hid this change in a "Test fixes" PR. Google API called it |
||
:flavor => flavor, | ||
:availability_zone => zone, | ||
:parent_vm => parent_image, | ||
|
@@ -250,7 +253,7 @@ def parse_instance(instance) | |
:display_name => N_("Is VM Preemptible"), | ||
:description => N_("Whether or not the VM is 'preemptible'. See"\ | ||
" https://cloud.google.com/compute/docs/instances/preemptible for more details."), | ||
:value => instance.scheduling["preemptible"].to_s, | ||
:value => instance.scheduling[:preemptible].to_s, | ||
:read_only => true | ||
} | ||
] | ||
|
@@ -265,13 +268,13 @@ def parse_instance(instance) | |
def populate_hardware_hash_with_disks(hardware_disks_array, instance) | ||
instance.disks.each do |attached_disk| | ||
# lookup the full disk information from the data_index by source link | ||
d = @data_index.fetch_path(:cloud_volumes, attached_disk["source"]) | ||
d = @data_index.fetch_path(:cloud_volumes, attached_disk[:source]) | ||
|
||
next if d.nil? | ||
|
||
disk_size = d[:size] | ||
disk_name = attached_disk["deviceName"] | ||
disk_location = attached_disk["index"] | ||
disk_name = attached_disk[:device_name] | ||
disk_location = attached_disk[:index] | ||
|
||
disk = add_instance_disk(hardware_disks_array, disk_size, disk_name, disk_location) | ||
# Link the disk and the instance together | ||
|
@@ -288,7 +291,7 @@ def parse_instance_parent_image(instance) | |
parent_image_uid = nil | ||
|
||
instance.disks.each do |disk| | ||
parent_image_uid = @disk_to_source_image_id[disk["source"]] | ||
parent_image_uid = @disk_to_source_image_id[disk[:source]] | ||
next if parent_image_uid.nil? | ||
break | ||
end | ||
|
@@ -308,8 +311,8 @@ def populate_key_pairs_with_ssh_keys(result_key_pairs, instance) | |
end | ||
|
||
def parse_compute_metadata(metadata, key) | ||
metadata_item = metadata["items"].to_a.detect { |x| x["key"] == key } | ||
metadata_item.to_h["value"] | ||
metadata_item = metadata[:items].to_a.detect { |x| x[:key] == key } | ||
metadata_item.to_h[:value] | ||
end | ||
|
||
def parse_compute_metadata_ssh_keys(metadata) | ||
|
@@ -348,8 +351,8 @@ def link_volumes_to_base_snapshots | |
end | ||
end | ||
|
||
def query_and_add_flavor(flavor_uid) | ||
flavor = @connection.flavors.get(flavor_uid) | ||
def query_and_add_flavor(flavor_uid, zone_uid) | ||
flavor = @connection.get_machine_type(flavor_uid, zone_uid) | ||
process_collection(flavor.to_miq_a, :flavors) { |f| parse_flavor(f) } | ||
@data_index.fetch_path(:flavors, flavor_uid) | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like you are not using the returned subscription to pull events from, the name of the method
get_or_create_subscription
is a bit confusing.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What led to the decision to pull from the google provider instead of the subscription? Was this a change in the API version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is a hack of a kind to workaround Google API inconsistency. I'll document it in code.
The issue is that current
subscription.pull()
follows Google API specification and does a Base64 decoding of the payload. This is consistent with Google API documentation for PubsubMessage. However in real world, this data comes in plain text so the Base64 decoding makes it gibberish. So by pulling the subscription directly form the service we can do the processing on our own and bypass the decoding process.I'll document this in code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've created an issue against fog, so maybe they can help or at least be aware of it. And also we have something to track 😉 fog/fog-google#349
The inline comment about the situation is coming soon! 😉