Skip to content

Commit

Permalink
Amazon S3 objects listing
Browse files Browse the repository at this point in the history
Parsing information about objects on S3 buckets.
Calculating bucket's size and object count.
  • Loading branch information
aiperon committed Feb 20, 2017
1 parent 4202895 commit 59a95ae
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 47 deletions.
2 changes: 2 additions & 0 deletions app/models/manageiq/providers/amazon/inventory/collector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class ManageIQ::Providers::Amazon::Inventory::Collector < ManagerRefresh::Invent
attr_reader :stacks, :stacks_refs, :stacks_deleted
attr_reader :cloud_volumes, :cloud_volumes_refs
attr_reader :cloud_volume_snapshots, :cloud_volume_snapshots_refs
attr_reader :cloud_objects_store_containers
attr_reader :cloud_objects_store_objects

def initialize(_manager, _target)
super
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,26 @@ def cloud_object_store_containers
hash_collection.new(aws_s3.client.list_buckets.buckets)
end

def cloud_object_store_objects
hash_collection.new([])
def cloud_object_store_objects(options = {})
options[:token] ||= nil
# S3 bucket accessible only for API client with same region
regional_client = aws_s3_regional(options[:region]).client
response = regional_client.list_objects_v2(
:bucket => options[:bucket],
:continuation_token => options[:token]
)
token = response.next_continuation_token if response.is_truncated
return hash_collection.new(response.contents), token
end

private

def aws_s3_regional(region)
if !region || region == manager.provider_region
aws_s3
else
@regional_resources ||= {}
@regional_resources[region] ||= manager.connect(:service => :S3, :region => region)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ def parse
log_header = "MIQ(#{self.class.name}.#{__method__}) Collecting data for EMS name: [#{collector.manager.name}] id: [#{collector.manager.id}]"

$aws_log.info("#{log_header}...}")
object_store
process_containers
$aws_log.info("#{log_header}...Complete")
end

private

def object_store
def process_containers
process_inventory_collection(
collector.cloud_object_store_containers,
:cloud_object_store_containers
) { |c| parse_container(c) }
persister.collections[:cloud_object_store_containers].data_index.each do |bucket_id, object|
lazy_object = persister.collections[:cloud_object_store_containers].lazy_find(bucket_id)
object_stats = process_objects(bucket_id, lazy_object)
object.data.merge!(object_stats)
end
end

def parse_container(bucket)
Expand All @@ -28,4 +33,44 @@ def parse_container(bucket)
:key => bucket['name']
}
end

def process_objects(bucket_id, bucket_object)
# S3 bucket accessible only for API client with same region
region = collector.aws_s3.client.get_bucket_location(:bucket => bucket_id).location_constraint
options = { :region => region, :bucket => bucket_id }

# AWS SDK doesn't show information about overall size and object count.
# We need to collect it manually.
bytes = 0
object_count = 0
proceed = true
while proceed
objects, token = collector.cloud_object_store_objects(options)
options[:token] = token

process_inventory_collection(objects, :cloud_object_store_objects) do |o|
new_result = parse_object(o, bucket_object)
bytes += new_result[:content_length]
object_count += 1
new_result
end

proceed = token.present?
end

{ :bytes => bytes, :object_count => object_count }
end

def parse_object(object, bucket)
uid = object['key']
{
:ext_management_system => ems,
:ems_ref => "#{bucket.ems_ref}_#{uid}",
:etag => object['etag'],
:last_modified => object['last_modified'],
:content_length => object['size'],
:key => uid,
:cloud_object_store_container => bucket
}
end
end
3 changes: 2 additions & 1 deletion app/models/manageiq/providers/amazon/manager_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ def connect(options = {})
password = options[:pass] || authentication_password(options[:auth_type])
service = options[:service] || :EC2
proxy = options[:proxy_uri] || http_proxy_uri
region = options[:region] || provider_region

self.class.raw_connect(username, password, service, provider_region, proxy)
self.class.raw_connect(username, password, service, region, proxy)
end

def translate_exception(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,62 @@ def ems_inv_to_hashes
end

def object_store
buckets = @aws_s3.client.list_buckets.buckets
process_collection(buckets, :cloud_object_store_containers) { |c| parse_container(c) }
process_collection(
@aws_s3.client.list_buckets.buckets,
:cloud_object_store_containers
) { |c| parse_container(c) }

process_containers_content
end

def process_containers_content
containers = @data[:cloud_object_store_containers]
if containers.empty?
process_collection([], :cloud_object_store_objects)
return
end

containers.each do |bucket|
bucket_id = bucket[:ems_ref]

# S3 bucket accessible only for API client with same region
region = @aws_s3.client.get_bucket_location(:bucket => bucket_id).location_constraint
api_client = regional_client(region)

# AWS SDK doesn't show information about overall size and object count.
# We need to collect it manually.
bytes = 0
object_count = 0
# API returns up to 1000 objects per request
token = nil
proceed = true
while proceed
response = api_client.list_objects_v2(
:bucket => bucket_id,
:continuation_token => token
)
process_collection(response.contents, :cloud_object_store_objects) do |o|
uid, new_result = parse_object(o, bucket_id)
bytes += new_result[:content_length]
object_count += 1
[uid, new_result]
end
token = response.next_continuation_token

proceed = token.present?
end
bucket[:bytes] = bytes
bucket[:object_count] = object_count
end
end

def regional_client(region)
if !region || region == @ems.provider_region
@aws_s3
else
@regional_resources ||= {}
@regional_resources[region] ||= @ems.connect(:service => :S3, :region => region)
end.client
end

def parse_container(bucket)
Expand All @@ -35,17 +89,16 @@ def parse_container(bucket)
return uid, new_result
end

def parse_object(obj, bucket)
uid = obj.key
def parse_object(object, bucket_id)
uid = object.key

new_result = {
:ems_ref => uid,
:etag => obj.etag,
:last_modified => obj.last_modified,
:content_length => obj.size,
:key => obj.key,
#:content_type => obj.content_type,
:container => bucket
:ems_ref => "#{bucket_id}_#{uid}",
:etag => object.etag,
:last_modified => object.last_modified,
:content_length => object.size,
:key => object.key,
:cloud_object_store_container_id => @data_index.fetch_path(:cloud_object_store_containers, bucket_id)
}
return uid, new_result
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
class ManageIQ::Providers::Amazon::StorageManager::S3::Refresher < ManageIQ::Providers::BaseManager::Refresher
include ::EmsRefresh::Refreshers::EmsRefresherMixin
module ManageIQ::Providers
class Amazon::StorageManager::S3::Refresher < ManageIQ::Providers::BaseManager::Refresher
include ::EmsRefresh::Refreshers::EmsRefresherMixin

def collect_inventory_for_targets(ems, targets)
targets_with_data = targets.collect do |target|
target_name = target.try(:name) || target.try(:event_type)
def collect_inventory_for_targets(ems, targets)
targets_with_data = targets.collect do |target|
target_name = target.try(:name) || target.try(:event_type)

_log.info "Filtering inventory for #{target.class} [#{target_name}] id: [#{target.id}]..."
_log.info "Filtering inventory for #{target.class} [#{target_name}] id: [#{target.id}]..."

if refresher_options.try(:[], :inventory_object_refresh)
inventory = ManageIQ::Providers::Amazon::Builder.build_inventory(ems, target)
if refresher_options.try(:[], :inventory_object_refresh)
inventory = ManageIQ::Providers::Amazon::Builder.build_inventory(ems, target)
end

_log.info "Filtering inventory...Complete"
[target, inventory]
end

_log.info "Filtering inventory...Complete"
[target, inventory]
targets_with_data
end

targets_with_data
end

def parse_targeted_inventory(ems, _target, inventory)
log_header = format_ems_for_logging(ems)
_log.debug "#{log_header} Parsing inventory..."
hashes, = Benchmark.realtime_block(:parse_inventory) do
if refresher_options.try(:[], :inventory_object_refresh)
inventory.inventory_collections
else
ManageIQ::Providers::Amazon::StorageManager::S3::RefreshParser.ems_inv_to_hashes(ems, refresher_options)
def parse_targeted_inventory(ems, _target, inventory)
log_header = format_ems_for_logging(ems)
_log.debug "#{log_header} Parsing inventory..."
hashes, = Benchmark.realtime_block(:parse_inventory) do
if refresher_options.try(:[], :inventory_object_refresh)
inventory.inventory_collections
else
ManageIQ::Providers::Amazon::StorageManager::S3::RefreshParser.ems_inv_to_hashes(ems, refresher_options)
end
end
end
_log.debug "#{log_header} Parsing inventory...Complete"
_log.debug "#{log_header} Parsing inventory...Complete"

hashes
end
hashes
end

def post_process_refresh_classes
[]
def post_process_refresh_classes
[]
end
end
end
2 changes: 2 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
:values:
- machine
:ignore_terminated_instances: true
:s3:
:inventory_object_refresh: true
:http_proxy:
:ec2:
:host:
Expand Down
16 changes: 15 additions & 1 deletion spec/models/manageiq/providers/amazon/aws_stubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def test_counts(scaling = nil)
:outbound_firewall_rule_per_security_group_count => scaling * 5,
:cloud_volume_count => scaling * 5,
:cloud_volume_snapshot_count => scaling * 5,
:s3_buckets_count => scaling * 5
:s3_buckets_count => scaling * 5,
:s3_objects_per_bucket_count => scaling * 5
}
end

Expand Down Expand Up @@ -358,6 +359,19 @@ def mocked_s3_buckets
mocked_s3_buckets
end

def mocked_s3_objects
mocked_s3_objects = []
test_counts[:s3_objects_per_bucket_count].times do |i|
mocked_s3_objects << {
:key => "object_key_#{i}",
:etag => "object_key_#{i}",
:size => 1,
:last_modified => Time.now.utc
}
end
mocked_s3_objects
end

def mocked_instance_health
mocked_instance_healths = []
expected_table_counts[:load_balancer_pool_member].times do |i|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,23 @@ def refresh_spec
@ems.reload

assert_table_counts
assert_buckets_content
assert_ems
end

def stub_responses
{
:s3 => {
:list_buckets => {
:list_buckets => {
:buckets => mocked_s3_buckets
},
:get_bucket_location => {
:location_constraint => mocked_regions[:regions][0][:region_name]
},
:list_objects_v2 => {
:contents => mocked_s3_objects,
:next_continuation_token => nil,
:is_truncated => false
}
}
}
Expand Down Expand Up @@ -121,7 +130,8 @@ def expected_table_counts
:load_balancer_listener_pool => 0,
:load_balancer_health_check => 0,
:load_balancer_health_check_member => 0,
:cloud_object_store_containers => test_counts[:s3_buckets_count]
:cloud_object_store_containers => test_counts[:s3_buckets_count],
:cloud_object_store_objects => test_counts[:s3_buckets_count] * test_counts[:s3_objects_per_bucket_count]
}
end

Expand Down Expand Up @@ -164,12 +174,31 @@ def assert_table_counts
:load_balancer_listener_pool => LoadBalancerListenerPool.count,
:load_balancer_health_check => LoadBalancerHealthCheck.count,
:load_balancer_health_check_member => LoadBalancerHealthCheckMember.count,
:cloud_object_store_containers => CloudObjectStoreContainer.count
:cloud_object_store_containers => CloudObjectStoreContainer.count,
:cloud_object_store_objects => CloudObjectStoreObject.count
}

expect(actual).to eq expected_table_counts
end

def assert_buckets_content
mocked_objects = mocked_s3_objects
expected_hash = {
:object_count => mocked_objects.count,
:content_length => mocked_objects.map { |object| object[:size] }.sum,
}
actual = {}
expected_content = {}
CloudObjectStoreContainer.all.each do |container|
expected_content[container.ems_ref] = expected_hash
actual[container.ems_ref] = {
:object_count => container.object_count,
:content_length => container.bytes
}
end
expect(actual).to eq expected_content
end

def assert_ems
ems = @ems.s3_storage_manager
expect(ems).to have_attributes(:api_version => nil,
Expand Down

0 comments on commit 59a95ae

Please sign in to comment.