From ccbfd1271696f7e9e0efc85968dcfc7769f72087 Mon Sep 17 00:00:00 2001 From: Ilya Date: Fri, 27 Jan 2017 10:44:44 +0100 Subject: [PATCH] Amazon S3 objects listing Parsing information about objects on S3 buckets. Calculating bucket's size and object count. --- .../inventory/targets/storage_manager/s3.rb | 23 +++++- .../providers/amazon/manager_mixin.rb | 3 +- .../storage_manager/s3/refresh_parser.rb | 70 ++++++++++++++++--- .../s3/refresh_parser_inventory_object.rb | 54 +++++++++++++- .../manageiq/providers/amazon/aws_stubs.rb | 16 ++++- .../s3/stubbed_refresher_spec.rb | 34 ++++++++- 6 files changed, 180 insertions(+), 20 deletions(-) diff --git a/app/models/manageiq/providers/amazon/inventory/targets/storage_manager/s3.rb b/app/models/manageiq/providers/amazon/inventory/targets/storage_manager/s3.rb index 0ce967a70..b8b74f1e9 100644 --- a/app/models/manageiq/providers/amazon/inventory/targets/storage_manager/s3.rb +++ b/app/models/manageiq/providers/amazon/inventory/targets/storage_manager/s3.rb @@ -8,7 +8,26 @@ def cloud_object_store_containers HashCollection.new(aws_s3.client.list_buckets.buckets) end - def cloud_object_store_objects - HashCollection.new([]) + def cloud_object_store_objects(options = {}) + options[:token] ||= nil + # S3 bucket accessible only for API client with same region + regional_client = aws_s3_regional(options[:region]).client + response = regional_client.list_objects_v2( + bucket: options[:bucket], + continuation_token: options[:token] + ) + token = response.next_continuation_token if response.is_truncated + return HashCollection.new(response.contents), token + end + +private + + def aws_s3_regional(region) + if !region || region == ems.provider_region + aws_s3 + else + @regional_resources ||= {} + @regional_resources[region] ||= ems.connect(:service => :S3, :region => region) + end end end diff --git a/app/models/manageiq/providers/amazon/manager_mixin.rb b/app/models/manageiq/providers/amazon/manager_mixin.rb index 7ef38587b..e67732b51 100644 --- a/app/models/manageiq/providers/amazon/manager_mixin.rb +++ b/app/models/manageiq/providers/amazon/manager_mixin.rb @@ -24,8 +24,9 @@ def connect(options = {}) password = options[:pass] || authentication_password(options[:auth_type]) service = options[:service] || :EC2 proxy = options[:proxy_uri] || http_proxy_uri + region = options[:region] || provider_region - self.class.raw_connect(username, password, service, provider_region, proxy) + self.class.raw_connect(username, password, service, region, proxy) end def translate_exception(err) diff --git a/app/models/manageiq/providers/amazon/storage_manager/s3/refresh_parser.rb b/app/models/manageiq/providers/amazon/storage_manager/s3/refresh_parser.rb index 71b4b32b7..43ded5b28 100644 --- a/app/models/manageiq/providers/amazon/storage_manager/s3/refresh_parser.rb +++ b/app/models/manageiq/providers/amazon/storage_manager/s3/refresh_parser.rb @@ -21,8 +21,56 @@ def ems_inv_to_hashes end def object_store - buckets = @aws_s3.client.list_buckets.buckets - process_collection(buckets, :cloud_object_store_containers) { |c| parse_container(c) } + process_collection( + @aws_s3.client.list_buckets.buckets, + :cloud_object_store_containers + ) { |c| parse_container(c) } + + process_containers_content + end + + def process_containers_content + containers = @data[:cloud_object_store_containers] + if containers.empty? + process_collection([], :cloud_object_store_objects) + return + end + + containers.each do |bucket| + bucket_id = bucket[:ems_ref] + + # S3 bucket accessible only for API client with same region + region = @aws_s3.client.get_bucket_location(bucket: bucket_id).location_constraint + api_client = regional_client(region) + + # AWS SDK doesn't show information about overall size and object count. + # We need to collect it manually. + bytes = 0 + object_count = 0 + # API returns up to 1000 objects per request + continuation_token = nil + begin + response = api_client.list_objects_v2(bucket: bucket_id, continuation_token: continuation_token) + process_collection(response.contents, :cloud_object_store_objects) do |o| + uid, new_result = parse_object(o, bucket_id) + bytes += new_result[:content_length] + object_count += 1 + [ uid, new_result ] + end + continuation_token = response.next_continuation_token + end while response.is_truncated && continuation_token + bucket[:bytes] = bytes + bucket[:object_count] = object_count + end + end + + def regional_client(region) + if !region || region == ems.provider_region + aws_s3 + else + @regional_resources ||= {} + @regional_resources[region] ||= @ems.connect(:service => :S3, :region => region) + end.client end def parse_container(bucket) @@ -35,18 +83,18 @@ def parse_container(bucket) return uid, new_result end - def parse_object(obj, bucket) - uid = obj.key + def parse_object(object, bucket_id) + uid = object.key new_result = { - :ems_ref => uid, - :etag => obj.etag, - :last_modified => obj.last_modified, - :content_length => obj.size, - :key => obj.key, - #:content_type => obj.content_type, - :container => bucket + :ems_ref => "#{bucket_id}_#{uid}", + :etag => object.etag, + :last_modified => object.last_modified, + :content_length => object.size, + :key => object.key, + :cloud_object_store_container_id => @data_index.fetch_path(:cloud_object_store_containers, bucket_id) } return uid, new_result end + end diff --git a/app/models/manageiq/providers/amazon/storage_manager/s3/refresh_parser_inventory_object.rb b/app/models/manageiq/providers/amazon/storage_manager/s3/refresh_parser_inventory_object.rb index 5515b90fb..20bab9eb5 100644 --- a/app/models/manageiq/providers/amazon/storage_manager/s3/refresh_parser_inventory_object.rb +++ b/app/models/manageiq/providers/amazon/storage_manager/s3/refresh_parser_inventory_object.rb @@ -9,7 +9,7 @@ def populate_inventory_collections log_header = "MIQ(#{self.class.name}.#{__method__}) Collecting data for EMS name: [#{inventory.ems.name}] id: [#{inventory.ems.id}]" $aws_log.info("#{log_header}...}") - object_store + process_containers $aws_log.info("#{log_header}...Complete") inventory_collections @@ -17,19 +17,67 @@ def populate_inventory_collections private - def object_store + def process_containers process_inventory_collection( inventory.cloud_object_store_containers, :cloud_object_store_containers ) { |c| parse_container(c) } + $aws_log.info "PROCESSING" + inventory_collections[:cloud_object_store_containers].data_index.each do |bucket_id, object| + lazy_object = inventory_collections[:cloud_object_store_containers].lazy_find(bucket_id) + object_stats = process_objects(bucket_id, lazy_object) + object.data.merge!(object_stats) + end + end def parse_container(bucket) uid = bucket['name'] - { + container_hash = { :ext_management_system => ems, :ems_ref => uid, :key => bucket['name'] } end + + def process_objects(bucket_id, bucket_object) + # S3 bucket accessible only for API client with same region + region = inventory.aws_s3.client.get_bucket_location(bucket: bucket_id).location_constraint + options = { region: region, bucket: bucket_id } + + # AWS SDK doesn't show information about overall size and object count. + # We need to collect it manually. + bytes = 0 + object_count = 0 + begin + objects, token = inventory.cloud_object_store_objects(options) + options[:token] = token + + process_inventory_collection(objects, :cloud_object_store_objects) do |o| + new_result = parse_object(o, bucket_object) + bytes += new_result[:content_length] + #$aws_log.info "#{new_result[:ems_ref]}" + object_count += 1 + new_result + end + + #$aws_log.info "#{bucket_id}. Objects: #{object_count}" + + end while token.present? + + { bytes: bytes, object_count: object_count } + end + + def parse_object(object, bucket) + uid = object['key'] + { + :ext_management_system => ems, + :ems_ref => "#{bucket.ems_ref}_#{uid}", + :etag => object['etag'], + :last_modified => object['last_modified'], + :content_length => object['size'], + :key => uid, + :cloud_object_store_container => bucket + } + end end diff --git a/spec/models/manageiq/providers/amazon/aws_stubs.rb b/spec/models/manageiq/providers/amazon/aws_stubs.rb index 675a34d92..8565bfdf7 100644 --- a/spec/models/manageiq/providers/amazon/aws_stubs.rb +++ b/spec/models/manageiq/providers/amazon/aws_stubs.rb @@ -32,7 +32,8 @@ def test_counts(scaling = nil) :outbound_firewall_rule_per_security_group_count => scaling * 5, :cloud_volume_count => scaling * 5, :cloud_volume_snapshot_count => scaling * 5, - :s3_buckets_count => scaling * 5 + :s3_buckets_count => scaling * 5, + :s3_objects_per_bucket_count => scaling * 5 } end @@ -358,6 +359,19 @@ def mocked_s3_buckets mocked_s3_buckets end + def mocked_s3_objects + mocked_s3_objects = [] + test_counts[:s3_objects_per_bucket_count].times do |i| + mocked_s3_objects << { + :key => "object_key_#{i}", + :etag => "object_key_#{i}", + :size => 1, + :last_modified => Time.now.utc + } + end + mocked_s3_objects + end + def mocked_instance_health mocked_instance_healths = [] expected_table_counts[:load_balancer_pool_member].times do |i| diff --git a/spec/models/manageiq/providers/amazon/storage_manager/s3/stubbed_refresher_spec.rb b/spec/models/manageiq/providers/amazon/storage_manager/s3/stubbed_refresher_spec.rb index 3e4c0b69a..6639330cc 100644 --- a/spec/models/manageiq/providers/amazon/storage_manager/s3/stubbed_refresher_spec.rb +++ b/spec/models/manageiq/providers/amazon/storage_manager/s3/stubbed_refresher_spec.rb @@ -69,6 +69,7 @@ def refresh_spec @ems.reload assert_table_counts + assert_buckets_content assert_ems end @@ -77,6 +78,14 @@ def stub_responses :s3 => { :list_buckets => { :buckets => mocked_s3_buckets + }, + :get_bucket_location => { + :location_constraint => mocked_regions[:regions][0][:region_name] + }, + :list_objects_v2 => { + :contents => mocked_s3_objects, + :next_continuation_token => nil, + :is_truncated => false } } } @@ -121,7 +130,8 @@ def expected_table_counts :load_balancer_listener_pool => 0, :load_balancer_health_check => 0, :load_balancer_health_check_member => 0, - :cloud_object_store_containers => test_counts[:s3_buckets_count] + :cloud_object_store_containers => test_counts[:s3_buckets_count], + :cloud_object_store_objects => test_counts[:s3_buckets_count] * test_counts[:s3_objects_per_bucket_count] } end @@ -164,12 +174,31 @@ def assert_table_counts :load_balancer_listener_pool => LoadBalancerListenerPool.count, :load_balancer_health_check => LoadBalancerHealthCheck.count, :load_balancer_health_check_member => LoadBalancerHealthCheckMember.count, - :cloud_object_store_containers => CloudObjectStoreContainer.count + :cloud_object_store_containers => CloudObjectStoreContainer.count, + :cloud_object_store_objects => CloudObjectStoreObject.count } expect(actual).to eq expected_table_counts end + def assert_buckets_content + mocked_objects = mocked_s3_objects + expected_hash = { + object_count: mocked_objects.count, + content_length: mocked_objects.map{ |object| object[:size] }.sum, + } + actual = {} + expected_content = {} + CloudObjectStoreContainer.all.each do |container| + expected_content[container.ems_ref] = expected_hash + actual[container.ems_ref] = { + object_count: container.object_count, + content_length: container.bytes + } + end + expect(actual).to eq expected_content + end + def assert_ems ems = @ems.s3_storage_manager expect(ems).to have_attributes(:api_version => nil, @@ -177,4 +206,5 @@ def assert_ems expect(ems.cloud_object_store_containers.size).to eql(expected_table_counts[:cloud_object_store_containers]) end + end