Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fog google upgrade (to 1.3.3) #54

Merged
merged 20 commits into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def stop
# Poll for events (blocks forever until #stop is called)
def each_batch
while @collecting_events
yield events.map { |e| JSON.parse(e.message['data']) }
yield events.map { |e| JSON.parse(e[:message][:data]) }
end
end

Expand All @@ -38,23 +38,24 @@ def each_batch
def events
# For now, return immediately with up to 10 messages
@ems.with_provider_connection(:service => 'pubsub') do |google|
subscription = get_or_create_subscription(google)
subscription.pull(:return_immediately => true, :max_messages => 10).tap do |msgs|
subscription.acknowledge(msgs)
end
get_or_create_subscription(google)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like you are not using the returned subscription to pull events from, the name of the method get_or_create_subscription is a bit confusing.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What led to the decision to pull from the google provider instead of the subscription? Was this a change in the API version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is a hack of a kind to workaround Google API inconsistency. I'll document it in code.

The issue is that current subscription.pull() follows Google API specification and does a Base64 decoding of the payload. This is consistent with Google API documentation for PubsubMessage. However in real world, this data comes in plain text so the Base64 decoding makes it gibberish. So by pulling the subscription directly form the service we can do the processing on our own and bypass the decoding process.

I'll document this in code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created an issue against fog, so maybe they can help or at least be aware of it. And also we have something to track 😉 fog/fog-google#349

The inline comment about the situation is coming soon! 😉

# FIXME: Change once https://github.com/fog/fog-google/issues/349 is resolved
# In normal case we would use the return value of previous command and call :pull method on it.
# Due to Google API inconsitency we tend to implement our own pull method and pull it directly from the service.
# Current subscription.pull() in Fog follows Google API specification and does a Base64 decoding on the payload.
# This is consistent with Google API documentation for PubsubMessage
# (see https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage).
# However in real world, the data comes in plain text, so the Base64 decoding makes it gibberish.
# Pulling directly from PubSub service workarounds it.
pull_subscription(google).tap { |msgs| acknowledge_messages(google, msgs) }
end
rescue Fog::Errors::Error
raise ProviderUnreachable, "Error when contacting Google Pubsub for events; this may be a temporary failure."
end

def get_or_create_subscription(google)
# If event catcher is not yet setup, then we'll get a fog error
google.subscriptions.get(subscription_name) ||
google.subscriptions.create(:name => subscription_name,
# add empty config - workaround for https://github.com/fog/fog-google/issues/214
# TODO: remove once the above is resolved
:push_config => {},
:topic => topic_name)
google.subscriptions.get(subscription_name) || google.subscriptions.create(:name => subscription_name, :topic => topic_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 nice catch now that fog/fog-google#214 is fixed

rescue Fog::Errors::NotFound
# Rather than expose the notfound error, we expose our own exception
# indicating that the worker thread should back off
Expand All @@ -64,6 +65,19 @@ def get_or_create_subscription(google)
raise TopicNotFound, msg
end

def pull_subscription(google)
options = {:return_immediately => true, :max_messages => 10}
data = google.pull_subscription(subscription_name, options).to_h

data[:received_messages].to_a
end

def acknowledge_messages(google, messages)
return if messages.empty?
ack_ids = messages.collect { |m| m[:ack_id] }
google.acknowledge_subscription(subscription_name, ack_ids)
end

def subscription_name
"projects/#{@ems.project}/subscriptions/manageiq-eventcatcher-#{@ems.guid}"
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ def self.event_to_hash(event, ems_id)
log_header = "ems_id: [#{ems_id}] " unless ems_id.nil?

event_type = parse_event_type(event)
timestamp = event.fetch_path('metadata', 'timestamp')

_log.debug { "#{log_header}event: [#{event_type}]" }

event_hash = {
:event_type => event_type,
:source => "GOOGLE",
:message => event_type,
:timestamp => timestamp,
:timestamp => event['timestamp'],
:full_data => event,
:ems_id => ems_id
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,46 +37,45 @@ class ManageIQ::Providers::Google::CloudManager::MetricsCapture < ManageIQ::Prov
VIM_COUNTER_SCHEMAS = [
{
# Name of the VIM_STYLE_COUNTER this schema describes
:vim_counter_name => "cpu_usage_rate_average",
:vim_counter_name => "cpu_usage_rate_average",

# List of metric names in GCP that should be retrieved to calculate the
# vim-style metric
:google_metric_names => ["compute.googleapis.com/instance/cpu/utilization"],

# A function that maps a target to a list of google labels to be applied
# to the request. Only results matching the label are returned.
:target_to_google_labels => ->(target) { ["compute.googleapis.com/resource_id==#{target.ems_ref}"] },
# https://cloud.google.com/monitoring/api/metrics_gcp#gcp-compute
:google_metric_names => %w(compute.googleapis.com/instance/cpu/utilization),

# Function that maps a point returned by Google's monitoring api (which
# is a hash data structure; see
# https://cloud.google.com/monitoring/v2beta2/timeseries) to our counter
# value. Any unit transformations are applied as well.
:point_to_val => ->(point) { point["doubleValue"].to_f * 100 },
# https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries)
# to our counter value. Any unit transformations are applied as well.
:point_to_val => ->(point) { point[:double_value].to_f * 100 },

# Function that takes two points and reduces them to one. This is used
# when multiple points are found for the same data point in the same
# query (e.g. if we are querying disk usage and the host has multiple
# disks, this method is used to combine the points into a single metric)
:reducer => lambda do |x, _|
:reducer => lambda do |x, _|
_log.warn("Received multiple values for cpu_usage; ignoring duplicates")
x
end
},
{
:vim_counter_name => "disk_usage_rate_average",
:google_metric_names => ["compute.googleapis.com/instance/disk/read_bytes_count",
"compute.googleapis.com/instance/disk/write_bytes_count"],
:target_to_google_labels => ->(target) { ["compute.googleapis.com/resource_id==#{target.ems_ref}"] },
:point_to_val => ->(point) { point["int64Value"].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s
:reducer => ->(x, y) { x + y },
:vim_counter_name => "disk_usage_rate_average",
:google_metric_names => %w(
compute.googleapis.com/instance/disk/read_bytes_count
compute.googleapis.com/instance/disk/write_bytes_count
),
:point_to_val => ->(point) { point[:int64_value].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s
:reducer => ->(x, y) { x + y },
},
{
:vim_counter_name => "net_usage_rate_average",
:google_metric_names => ["compute.googleapis.com/instance/network/received_bytes_count",
"compute.googleapis.com/instance/network/sent_bytes_count"],
:target_to_google_labels => ->(target) { ["compute.googleapis.com/resource_id==#{target.ems_ref}"] },
:point_to_val => ->(point) { point["int64Value"].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s
:reducer => ->(x, y) { x + y },
:vim_counter_name => "net_usage_rate_average",
:google_metric_names => %w(
compute.googleapis.com/instance/network/received_bytes_count
compute.googleapis.com/instance/network/sent_bytes_count
),
:point_to_val => ->(point) { point[:int64_value].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s
:reducer => ->(x, y) { x + y },
}
].freeze

Expand Down Expand Up @@ -127,15 +126,19 @@ def perf_collect_metrics(_interval_name, start_time = nil, end_time = nil)
# aggregate metrics onto (will be modified by method)
# @return nil
def collect_metrics(google, start_time, end_time, schema, counter_values_by_ts)
interval = {
:start_time => start_time.to_datetime.rfc3339,
:end_time => end_time.to_datetime.rfc3339
}

schema[:google_metric_names].each do |google_metric_name|
options = {
:labels => schema[:target_to_google_labels].call(target),
:oldest => start_time.to_datetime.rfc3339,
}
# For filter creation and entity selection see https://cloud.google.com/monitoring/api/v3/filters
filter = "metric.type = \"#{google_metric_name}\" AND resource.labels.instance_id = \"#{target.ems_ref}\""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AND resource.labels.instance_id = "#{target.ems_ref}""

This looks like a much nicer interface than the target_to_google_labels

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed! Also the filter query allows much more precise matching and has a lot of options. That's really a nice thing. 👍


# Make our service call for metrics; Note that we might get multiple
# time series back (for example, if the host has multiple disks/network
# cards)
tss = google.timeseries_collection.all(google_metric_name, end_time.to_datetime.rfc3339, options)
tss = google.timeseries_collection.all(:filter => filter, :interval => interval)

tss.each do |ts|
collect_time_series_metrics(ts, schema, counter_values_by_ts)
Expand All @@ -147,7 +150,8 @@ def collect_metrics(google, start_time, end_time, schema, counter_values_by_ts)
# provided 'counter_values_by_ts' hash, using the provided schema.
#
# @param time_series [Hash] resource returned by GCP describing a metric
# lookup result (see https://cloud.google.com/monitoring/v2beta2/timeseries)
# lookup result (see
# https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries)
# @param schema [Hash] schema describing the metric to query (see
# VIM_STYLE_COUNTERS definition for a description)
# @param counter_values_by_ts [Hash{Time => Hash{String => Number}}] hash to
Expand All @@ -159,8 +163,8 @@ def collect_time_series_metrics(time_series, schema, counter_values_by_ts)
# minute; this allows us to sum up points across time series that may
# have landed on different seconds. Note this only holds true for
# 1-minute metrics.
timestamp = Time.zone.parse(point["start"]).beginning_of_minute
val = schema[:point_to_val].call(point)
timestamp = Time.zone.parse(point[:interval][:start_time]).beginning_of_minute
val = schema[:point_to_val].call(point[:value])

# If we already have a value, reduce using our reduction function
prev_val = counter_values_by_ts.fetch_path(timestamp, schema[:vim_counter_name])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ def prepare_for_clone_task
clone_options[:machine_type] = instance_type.ems_ref
clone_options[:zone_name] = dest_availability_zone.ems_ref
clone_options[:preemptible] = get_option(:is_preemptible)
# fog-google specifies a default value that's incompatible with
# :preemptible; until this is fixed we need to be explicit about the host
# behavior on maintenance
# issue: https://github.com/fog/fog-google/issues/136
clone_options[:on_host_maintenance] = "TERMINATE" if clone_options[:preemptible]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tumido - how come this was removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just see the issue noted in the comment on the line above. It's been solved so this conversion is done by fog by default.


if clone_options[:user_data]
clone_options[:metadata] = {"user-data" => Base64.encode64(clone_options[:user_data]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ def get_zones
end

def get_flavors
# connection.flavors returns a duplicate flavor for every zone
# so build a unique list of flavors using the flavor id
flavors = @connection.flavors.to_a.uniq(&:id)
# Google API returns a duplicate flavor for every zone
# so build a unique list of flavors using the flavor
flavors = @connection.list_aggregated_machine_types.items.values.each_with_object([]) do |zone, arr|
arr.concat(zone.machine_types) if zone.machine_types
end
flavors.uniq!(&:id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this can be a bit simpler using flat_map+compact+uniq, how about:

flavors_by_zone = @connection.list_aggregated_machine_types.items
flavors = flavors_by_zone.values.flat_map(&:machine_types).compact.uniq(&:id)

process_collection(flavors, :flavors) { |flavor| parse_flavor(flavor) }
end

Expand Down Expand Up @@ -126,7 +129,7 @@ def parse_volume(volume)
zone_id = parse_uid_from_url(volume.zone)

new_result = {
:ems_ref => volume.id,
:ems_ref => volume.id.to_s,
:name => volume.name,
:status => volume.status,
:creation_time => volume.creation_timestamp,
Expand All @@ -147,7 +150,7 @@ def parse_volume(volume)

def parse_snapshot(snapshot)
new_result = {
:ems_ref => snapshot.id,
:ems_ref => snapshot.id.to_s,
:type => "ManageIQ::Providers::Google::CloudManager::CloudVolumeSnapshot",
:name => snapshot.name,
:status => snapshot.status,
Expand All @@ -161,7 +164,7 @@ def parse_snapshot(snapshot)
end

def parse_storage_as_template(storage)
uid = storage.id
uid = storage.id.to_s
name = storage.name
name ||= uid
type = ManageIQ::Providers::Google::CloudManager::Template.name
Expand Down Expand Up @@ -206,20 +209,20 @@ def parse_ssh_key(ssh_key)
end

def parse_instance(instance)
uid = instance.id
uid = instance.id.to_s
name = instance.name
name ||= uid

flavor_uid = parse_uid_from_url(instance.machine_type)
flavor = @data_index.fetch_path(:flavors, flavor_uid)

# If the flavor isn't found in our index, check if it is a custom flavor
# that we have to get directly
flavor = query_and_add_flavor(flavor_uid) if flavor.nil?

zone_uid = parse_uid_from_url(instance.zone)
zone = @data_index.fetch_path(:availability_zones, zone_uid)

# If the flavor isn't found in our index, check if it is a custom flavor
# that we have to get directly
flavor = query_and_add_flavor(flavor_uid, zone_uid) if flavor.nil?

parent_image_uid = parse_instance_parent_image(instance)
parent_image = @data_index.fetch_path(:vms, parent_image_uid)

Expand All @@ -233,7 +236,7 @@ def parse_instance(instance)
:name => name,
:description => instance.description,
:vendor => "google",
:raw_power_state => instance.state,
:raw_power_state => instance.status,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this changed in the google-api? Seems like an odd change for them to make, or were we relying on some conversion that the older fog was using?

Copy link
Member Author

@tumido tumido May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just a fog thing. They've also hid this change in a "Test fixes" PR. Google API called it :status all the time, but fog has is mapped to a :state attribute, so they made it consistent.

:flavor => flavor,
:availability_zone => zone,
:parent_vm => parent_image,
Expand All @@ -250,7 +253,7 @@ def parse_instance(instance)
:display_name => N_("Is VM Preemptible"),
:description => N_("Whether or not the VM is 'preemptible'. See"\
" https://cloud.google.com/compute/docs/instances/preemptible for more details."),
:value => instance.scheduling["preemptible"].to_s,
:value => instance.scheduling[:preemptible].to_s,
:read_only => true
}
]
Expand All @@ -265,13 +268,13 @@ def parse_instance(instance)
def populate_hardware_hash_with_disks(hardware_disks_array, instance)
instance.disks.each do |attached_disk|
# lookup the full disk information from the data_index by source link
d = @data_index.fetch_path(:cloud_volumes, attached_disk["source"])
d = @data_index.fetch_path(:cloud_volumes, attached_disk[:source])

next if d.nil?

disk_size = d[:size]
disk_name = attached_disk["deviceName"]
disk_location = attached_disk["index"]
disk_name = attached_disk[:device_name]
disk_location = attached_disk[:index]

disk = add_instance_disk(hardware_disks_array, disk_size, disk_name, disk_location)
# Link the disk and the instance together
Expand All @@ -288,7 +291,7 @@ def parse_instance_parent_image(instance)
parent_image_uid = nil

instance.disks.each do |disk|
parent_image_uid = @disk_to_source_image_id[disk["source"]]
parent_image_uid = @disk_to_source_image_id[disk[:source]]
next if parent_image_uid.nil?
break
end
Expand All @@ -308,8 +311,8 @@ def populate_key_pairs_with_ssh_keys(result_key_pairs, instance)
end

def parse_compute_metadata(metadata, key)
metadata_item = metadata["items"].to_a.detect { |x| x["key"] == key }
metadata_item.to_h["value"]
metadata_item = metadata[:items].to_a.detect { |x| x[:key] == key }
metadata_item.to_h[:value]
end

def parse_compute_metadata_ssh_keys(metadata)
Expand Down Expand Up @@ -348,8 +351,8 @@ def link_volumes_to_base_snapshots
end
end

def query_and_add_flavor(flavor_uid)
flavor = @connection.flavors.get(flavor_uid)
def query_and_add_flavor(flavor_uid, zone_uid)
flavor = @connection.get_machine_type(flavor_uid, zone_uid)
process_collection(flavor.to_miq_a, :flavors) { |f| parse_flavor(f) }
@data_index.fetch_path(:flavors, flavor_uid)
end
Expand Down
6 changes: 3 additions & 3 deletions app/models/manageiq/providers/google/event_catcher_mixin.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module ManageIQ::Providers::Google::EventCatcherMixin
def parse_event_type(event)
event_type = event.fetch_path('structPayload', 'event_type')
event_subtype = event.fetch_path('structPayload', 'event_subtype')
event_type = event.fetch_path('jsonPayload', 'event_type')
event_subtype = event.fetch_path('jsonPayload', 'event_subtype')

event_type = "unknown" if event_type.blank?
event_subtype = "unknown" if event_subtype.blank?
Expand All @@ -12,7 +12,7 @@ def parse_event_type(event)
end

def parse_resource_id(event)
resource_id = event.fetch_path('structPayload', 'resource', 'id')
resource_id = event.fetch_path('jsonPayload', 'resource', 'id')
resource_id = "unknown" if resource_id.blank?

resource_id
Expand Down
Loading