Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amazon S3 objects inventoring #123

Merged
merged 1 commit into from
Feb 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/models/manageiq/providers/amazon/inventory/collector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class ManageIQ::Providers::Amazon::Inventory::Collector < ManagerRefresh::Invent
attr_reader :stacks, :stacks_refs, :stacks_deleted
attr_reader :cloud_volumes, :cloud_volumes_refs
attr_reader :cloud_volume_snapshots, :cloud_volume_snapshots_refs
attr_reader :cloud_objects_store_containers
attr_reader :cloud_objects_store_objects

def initialize(_manager, _target)
super
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,26 @@ def cloud_object_store_containers
hash_collection.new(aws_s3.client.list_buckets.buckets)
end

def cloud_object_store_objects
hash_collection.new([])
def cloud_object_store_objects(options = {})
options[:token] ||= nil
# S3 bucket accessible only for API client with same region
regional_client = aws_s3_regional(options[:region]).client
response = regional_client.list_objects_v2(
:bucket => options[:bucket],
:continuation_token => options[:token]
)
token = response.next_continuation_token if response.is_truncated
return hash_collection.new(response.contents), token
end

private

def aws_s3_regional(region)
if !region || region == manager.provider_region
aws_s3
else
@regional_resources ||= {}
@regional_resources[region] ||= manager.connect(:service => :S3, :region => region)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ def parse
log_header = "MIQ(#{self.class.name}.#{__method__}) Collecting data for EMS name: [#{collector.manager.name}] id: [#{collector.manager.id}]"

$aws_log.info("#{log_header}...}")
object_store
process_containers
$aws_log.info("#{log_header}...Complete")
end

private

def object_store
def process_containers
process_inventory_collection(
collector.cloud_object_store_containers,
:cloud_object_store_containers
) { |c| parse_container(c) }
persister.collections[:cloud_object_store_containers].data_index.each do |bucket_id, object|
lazy_object = persister.collections[:cloud_object_store_containers].lazy_find(bucket_id)
object_stats = process_objects(bucket_id, lazy_object)
object.data.merge!(object_stats)
end
end

def parse_container(bucket)
Expand All @@ -28,4 +33,44 @@ def parse_container(bucket)
:key => bucket['name']
}
end

def process_objects(bucket_id, bucket_object)
# S3 bucket accessible only for API client with same region
region = collector.aws_s3.client.get_bucket_location(:bucket => bucket_id).location_constraint
options = { :region => region, :bucket => bucket_id }

# AWS SDK doesn't show information about overall size and object count.
# We need to collect it manually.
bytes = 0
object_count = 0
proceed = true
while proceed
objects, token = collector.cloud_object_store_objects(options)
options[:token] = token

process_inventory_collection(objects, :cloud_object_store_objects) do |o|
new_result = parse_object(o, bucket_object)
bytes += new_result[:content_length]
object_count += 1
new_result
end

proceed = token.present?
end

{ :bytes => bytes, :object_count => object_count }
end

def parse_object(object, bucket)
uid = object['key']
{
:ext_management_system => ems,
:ems_ref => "#{bucket.ems_ref}_#{uid}",
:etag => object['etag'],
:last_modified => object['last_modified'],
:content_length => object['size'],
:key => uid,
:cloud_object_store_container => bucket
}
end
end
3 changes: 2 additions & 1 deletion app/models/manageiq/providers/amazon/manager_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ def connect(options = {})
password = options[:pass] || authentication_password(options[:auth_type])
service = options[:service] || :EC2
proxy = options[:proxy_uri] || http_proxy_uri
region = options[:region] || provider_region

self.class.raw_connect(username, password, service, provider_region, proxy)
self.class.raw_connect(username, password, service, region, proxy)
end

def translate_exception(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,62 @@ def ems_inv_to_hashes
end

def object_store
buckets = @aws_s3.client.list_buckets.buckets
process_collection(buckets, :cloud_object_store_containers) { |c| parse_container(c) }
process_collection(
@aws_s3.client.list_buckets.buckets,
:cloud_object_store_containers
) { |c| parse_container(c) }

process_containers_content
end

def process_containers_content
containers = @data[:cloud_object_store_containers]
if containers.empty?
process_collection([], :cloud_object_store_objects)
return
end

containers.each do |bucket|
bucket_id = bucket[:ems_ref]

# S3 bucket accessible only for API client with same region
region = @aws_s3.client.get_bucket_location(:bucket => bucket_id).location_constraint
api_client = regional_client(region)

# AWS SDK doesn't show information about overall size and object count.
# We need to collect it manually.
bytes = 0
object_count = 0
# API returns up to 1000 objects per request
token = nil
proceed = true
while proceed
response = api_client.list_objects_v2(
:bucket => bucket_id,
:continuation_token => token
)
process_collection(response.contents, :cloud_object_store_objects) do |o|
uid, new_result = parse_object(o, bucket_id)
bytes += new_result[:content_length]
object_count += 1
[uid, new_result]
end
token = response.next_continuation_token

proceed = token.present?
end
bucket[:bytes] = bytes
bucket[:object_count] = object_count
end
end

def regional_client(region)
if !region || region == @ems.provider_region
@aws_s3
else
@regional_resources ||= {}
@regional_resources[region] ||= @ems.connect(:service => :S3, :region => region)
end.client
end

def parse_container(bucket)
Expand All @@ -35,17 +89,16 @@ def parse_container(bucket)
return uid, new_result
end

def parse_object(obj, bucket)
uid = obj.key
def parse_object(object, bucket_id)
uid = object.key

new_result = {
:ems_ref => uid,
:etag => obj.etag,
:last_modified => obj.last_modified,
:content_length => obj.size,
:key => obj.key,
#:content_type => obj.content_type,
:container => bucket
:ems_ref => "#{bucket_id}_#{uid}",
:etag => object.etag,
:last_modified => object.last_modified,
:content_length => object.size,
:key => object.key,
:cloud_object_store_container_id => @data_index.fetch_path(:cloud_object_store_containers, bucket_id)
}
return uid, new_result
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
class ManageIQ::Providers::Amazon::StorageManager::S3::Refresher < ManageIQ::Providers::BaseManager::Refresher
include ::EmsRefresh::Refreshers::EmsRefresherMixin
module ManageIQ::Providers
class Amazon::StorageManager::S3::Refresher < ManageIQ::Providers::BaseManager::Refresher
include ::EmsRefresh::Refreshers::EmsRefresherMixin

def collect_inventory_for_targets(ems, targets)
targets_with_data = targets.collect do |target|
target_name = target.try(:name) || target.try(:event_type)
def collect_inventory_for_targets(ems, targets)
targets_with_data = targets.collect do |target|
target_name = target.try(:name) || target.try(:event_type)

_log.info "Filtering inventory for #{target.class} [#{target_name}] id: [#{target.id}]..."
_log.info "Filtering inventory for #{target.class} [#{target_name}] id: [#{target.id}]..."

if refresher_options.try(:[], :inventory_object_refresh)
inventory = ManageIQ::Providers::Amazon::Builder.build_inventory(ems, target)
if refresher_options.try(:[], :inventory_object_refresh)
inventory = ManageIQ::Providers::Amazon::Builder.build_inventory(ems, target)
end

_log.info "Filtering inventory...Complete"
[target, inventory]
end

_log.info "Filtering inventory...Complete"
[target, inventory]
targets_with_data
end

targets_with_data
end

def parse_targeted_inventory(ems, _target, inventory)
log_header = format_ems_for_logging(ems)
_log.debug "#{log_header} Parsing inventory..."
hashes, = Benchmark.realtime_block(:parse_inventory) do
if refresher_options.try(:[], :inventory_object_refresh)
inventory.inventory_collections
else
ManageIQ::Providers::Amazon::StorageManager::S3::RefreshParser.ems_inv_to_hashes(ems, refresher_options)
def parse_targeted_inventory(ems, _target, inventory)
log_header = format_ems_for_logging(ems)
_log.debug "#{log_header} Parsing inventory..."
hashes, = Benchmark.realtime_block(:parse_inventory) do
if refresher_options.try(:[], :inventory_object_refresh)
inventory.inventory_collections
else
ManageIQ::Providers::Amazon::StorageManager::S3::RefreshParser.ems_inv_to_hashes(ems, refresher_options)
end
end
end
_log.debug "#{log_header} Parsing inventory...Complete"
_log.debug "#{log_header} Parsing inventory...Complete"

hashes
end
hashes
end

def post_process_refresh_classes
[]
def post_process_refresh_classes
[]
end
end
end
2 changes: 2 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
:values:
- machine
:ignore_terminated_instances: true
:s3:
:inventory_object_refresh: true
Copy link
Contributor Author

@aiperon aiperon Feb 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old way of inventory refresh is not able to handle large amount of objects.
For example, 100k S3 objects handled by:

  • 2 minutes with the new inventory object refresher
  • about 2 hours with the old refresher

As far as removing old refresher is the question of time, we've decided to set the default setting to new mechanism usage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I'll be doing the same for the other managers soon

hm, 2m vs. 2h, that is quite a bit difference, I wonder what makes it so slow in the old refresh

Copy link
Contributor Author

@aiperon aiperon Feb 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time consumed on the next lines:
https://github.com/ManageIQ/manageiq/blob/master/app/models/ems_refresh/save_inventory_helper.rb#L55
https://github.com/ManageIQ/manageiq/blob/master/app/models/ems_refresh/save_inventory_helper.rb#L69
To reproduce that:

  • change s3_objects_per_bucket_count to "scaling * 10000" in aws_stubs.rb
  • run rspec plugins/managiq-amazon-provider/spec/models/manageiq/providers/amazon/storage_manager/s3/stubbed_refresher_spec.rb

I've rejected idea of optimizing legacy parser's core.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I mean the new refresh is 'the optimization' :-D

but I think that in this case you are hitting a 5.0.1 Rails bug, the association.push has there a horrible O(n^2), it's already fixed in rails Master. So with that fixed, the number should not be that big for the old refresh. :-) Should be something around 15 minutes

Copy link
Contributor

@Ladas Ladas Feb 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, association.push is not the major problem there. Iterating through hash is the bigger problem. Like 80 / 20 (iterating hash with find or create / association push).

Yeah, and thanks for the optimized new refresh! It's complex, but at least pretty fast))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm you are right, for me it's 7500s vs 60s, with the rails fix applied. So something is really wrong in the old refresh

:http_proxy:
:ec2:
:host:
Expand Down
16 changes: 15 additions & 1 deletion spec/models/manageiq/providers/amazon/aws_stubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def test_counts(scaling = nil)
:outbound_firewall_rule_per_security_group_count => scaling * 5,
:cloud_volume_count => scaling * 5,
:cloud_volume_snapshot_count => scaling * 5,
:s3_buckets_count => scaling * 5
:s3_buckets_count => scaling * 5,
:s3_objects_per_bucket_count => scaling * 5
}
end

Expand Down Expand Up @@ -358,6 +359,19 @@ def mocked_s3_buckets
mocked_s3_buckets
end

def mocked_s3_objects
mocked_s3_objects = []
test_counts[:s3_objects_per_bucket_count].times do |i|
mocked_s3_objects << {
:key => "object_key_#{i}",
:etag => "object_key_#{i}",
:size => 1,
:last_modified => Time.now.utc
}
end
mocked_s3_objects
end

def mocked_instance_health
mocked_instance_healths = []
expected_table_counts[:load_balancer_pool_member].times do |i|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,23 @@ def refresh_spec
@ems.reload

assert_table_counts
assert_buckets_content
assert_ems
end

def stub_responses
{
:s3 => {
:list_buckets => {
:list_buckets => {
:buckets => mocked_s3_buckets
},
:get_bucket_location => {
:location_constraint => mocked_regions[:regions][0][:region_name]
},
:list_objects_v2 => {
:contents => mocked_s3_objects,
:next_continuation_token => nil,
:is_truncated => false
}
}
}
Expand Down Expand Up @@ -121,7 +130,8 @@ def expected_table_counts
:load_balancer_listener_pool => 0,
:load_balancer_health_check => 0,
:load_balancer_health_check_member => 0,
:cloud_object_store_containers => test_counts[:s3_buckets_count]
:cloud_object_store_containers => test_counts[:s3_buckets_count],
:cloud_object_store_objects => test_counts[:s3_buckets_count] * test_counts[:s3_objects_per_bucket_count]
}
end

Expand Down Expand Up @@ -164,12 +174,31 @@ def assert_table_counts
:load_balancer_listener_pool => LoadBalancerListenerPool.count,
:load_balancer_health_check => LoadBalancerHealthCheck.count,
:load_balancer_health_check_member => LoadBalancerHealthCheckMember.count,
:cloud_object_store_containers => CloudObjectStoreContainer.count
:cloud_object_store_containers => CloudObjectStoreContainer.count,
:cloud_object_store_objects => CloudObjectStoreObject.count
}

expect(actual).to eq expected_table_counts
end

def assert_buckets_content
mocked_objects = mocked_s3_objects
expected_hash = {
:object_count => mocked_objects.count,
:content_length => mocked_objects.map { |object| object[:size] }.sum,
}
actual = {}
expected_content = {}
CloudObjectStoreContainer.all.each do |container|
expected_content[container.ems_ref] = expected_hash
actual[container.ems_ref] = {
:object_count => container.object_count,
:content_length => container.bytes
}
end
expect(actual).to eq expected_content
end

def assert_ems
ems = @ems.s3_storage_manager
expect(ems).to have_attributes(:api_version => nil,
Expand Down