Skip to content

Commit

Permalink
Amazon S3 objects listing
Browse files Browse the repository at this point in the history
Parsing information about objects on S3 buckets.
Calculating bucket's size and object count.
  • Loading branch information
aiperon committed Feb 19, 2017
1 parent 4202895 commit 6dce432
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 57 deletions.
2 changes: 2 additions & 0 deletions app/models/manageiq/providers/amazon/inventory/collector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class ManageIQ::Providers::Amazon::Inventory::Collector < ManagerRefresh::Invent
attr_reader :stacks, :stacks_refs, :stacks_deleted
attr_reader :cloud_volumes, :cloud_volumes_refs
attr_reader :cloud_volume_snapshots, :cloud_volume_snapshots_refs
attr_reader :cloud_objects_store_containers
attr_reader :cloud_objects_store_objects

def initialize(_manager, _target)
super
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,26 @@ def cloud_object_store_containers
hash_collection.new(aws_s3.client.list_buckets.buckets)
end

def cloud_object_store_objects
hash_collection.new([])
def cloud_object_store_objects(options = {})
options[:token] ||= nil
# S3 bucket accessible only for API client with same region
regional_client = aws_s3_regional(options[:region]).client
response = regional_client.list_objects_v2(
:bucket => options[:bucket],
:continuation_token => options[:token]
)
token = response.next_continuation_token if response.is_truncated
return hash_collection.new(response.contents), token
end

private

def aws_s3_regional(region)
if !region || region == manager.provider_region
aws_s3
else
@regional_resources ||= {}
@regional_resources[region] ||= manager.connect(:service => :S3, :region => region)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ def parse
log_header = "MIQ(#{self.class.name}.#{__method__}) Collecting data for EMS name: [#{collector.manager.name}] id: [#{collector.manager.id}]"

$aws_log.info("#{log_header}...}")
object_store
process_containers
$aws_log.info("#{log_header}...Complete")
end

private

def object_store
def process_containers
process_inventory_collection(
collector.cloud_object_store_containers,
:cloud_object_store_containers
) { |c| parse_container(c) }
persister.collections[:cloud_object_store_containers].data_index.each do |bucket_id, object|
lazy_object = persister.collections[:cloud_object_store_containers].lazy_find(bucket_id)
object_stats = process_objects(bucket_id, lazy_object)
object.data.merge!(object_stats)
end
end

def parse_container(bucket)
Expand All @@ -28,4 +33,44 @@ def parse_container(bucket)
:key => bucket['name']
}
end

def process_objects(bucket_id, bucket_object)
# S3 bucket accessible only for API client with same region
region = collector.aws_s3.client.get_bucket_location(:bucket => bucket_id).location_constraint
options = { :region => region, :bucket => bucket_id }

# AWS SDK doesn't show information about overall size and object count.
# We need to collect it manually.
bytes = 0
object_count = 0
proceed = true
while proceed
objects, token = collector.cloud_object_store_objects(options)
options[:token] = token

process_inventory_collection(objects, :cloud_object_store_objects) do |o|
new_result = parse_object(o, bucket_object)
bytes += new_result[:content_length]
object_count += 1
new_result
end

proceed = token.present?
end

{ :bytes => bytes, :object_count => object_count }
end

def parse_object(object, bucket)
uid = object['key']
{
:ext_management_system => ems,
:ems_ref => "#{bucket.ems_ref}_#{uid}",
:etag => object['etag'],
:last_modified => object['last_modified'],
:content_length => object['size'],
:key => uid,
:cloud_object_store_container => bucket
}
end
end
3 changes: 2 additions & 1 deletion app/models/manageiq/providers/amazon/manager_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ def connect(options = {})
password = options[:pass] || authentication_password(options[:auth_type])
service = options[:service] || :EC2
proxy = options[:proxy_uri] || http_proxy_uri
region = options[:region] || provider_region

self.class.raw_connect(username, password, service, provider_region, proxy)
self.class.raw_connect(username, password, service, region, proxy)
end

def translate_exception(err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,39 @@
module ManageIQ::Providers
class Amazon::StorageManager::Ebs::Refresher < ManageIQ::Providers::BaseManager::Refresher
include ::EmsRefresh::Refreshers::EmsRefresherMixin
class ManageIQ::Providers::Amazon::StorageManager::Ebs::Refresher < ManageIQ::Providers::BaseManager::Refresher
include ::EmsRefresh::Refreshers::EmsRefresherMixin

def collect_inventory_for_targets(ems, targets)
targets_with_data = targets.collect do |target|
target_name = target.try(:name) || target.try(:event_type)
def collect_inventory_for_targets(ems, targets)
targets_with_data = targets.collect do |target|
target_name = target.try(:name) || target.try(:event_type)

_log.info "Filtering inventory for #{target.class} [#{target_name}] id: [#{target.id}]..."
_log.info "Filtering inventory for #{target.class} [#{target_name}] id: [#{target.id}]..."

if refresher_options.try(:[], :inventory_object_refresh)
inventory = ManageIQ::Providers::Amazon::Builder.build_inventory(ems, target)
end

_log.info "Filtering inventory...Complete"
[target, inventory]
if refresher_options.try(:[], :inventory_object_refresh)
inventory = ManageIQ::Providers::Amazon::Builder.build_inventory(ems, target)
end

targets_with_data
_log.info "Filtering inventory...Complete"
[target, inventory]
end

def parse_targeted_inventory(ems, _target, inventory)
log_header = format_ems_for_logging(ems)
_log.debug "#{log_header} Parsing inventory..."
hashes, = Benchmark.realtime_block(:parse_inventory) do
if refresher_options.try(:[], :inventory_object_refresh)
inventory.inventory_collections
else
ManageIQ::Providers::Amazon::StorageManager::Ebs::RefreshParser.ems_inv_to_hashes(ems, refresher_options)
end
end
_log.debug "#{log_header} Parsing inventory...Complete"
targets_with_data
end

hashes
def parse_targeted_inventory(ems, _target, inventory)
log_header = format_ems_for_logging(ems)
_log.debug "#{log_header} Parsing inventory..."
hashes, = Benchmark.realtime_block(:parse_inventory) do
if refresher_options.try(:[], :inventory_object_refresh)
inventory.inventory_collections
else
ManageIQ::Providers::Amazon::StorageManager::Ebs::RefreshParser.ems_inv_to_hashes(ems, refresher_options)
end
end
_log.debug "#{log_header} Parsing inventory...Complete"

def post_process_refresh_classes
[]
end
hashes
end

def post_process_refresh_classes
[]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,62 @@ def ems_inv_to_hashes
end

def object_store
buckets = @aws_s3.client.list_buckets.buckets
process_collection(buckets, :cloud_object_store_containers) { |c| parse_container(c) }
process_collection(
@aws_s3.client.list_buckets.buckets,
:cloud_object_store_containers
) { |c| parse_container(c) }

process_containers_content
end

def process_containers_content
containers = @data[:cloud_object_store_containers]
if containers.empty?
process_collection([], :cloud_object_store_objects)
return
end

containers.each do |bucket|
bucket_id = bucket[:ems_ref]

# S3 bucket accessible only for API client with same region
region = @aws_s3.client.get_bucket_location(:bucket => bucket_id).location_constraint
api_client = regional_client(region)

# AWS SDK doesn't show information about overall size and object count.
# We need to collect it manually.
bytes = 0
object_count = 0
# API returns up to 1000 objects per request
token = nil
proceed = true
while proceed
response = api_client.list_objects_v2(
:bucket => bucket_id,
:continuation_token => token
)
process_collection(response.contents, :cloud_object_store_objects) do |o|
uid, new_result = parse_object(o, bucket_id)
bytes += new_result[:content_length]
object_count += 1
[uid, new_result]
end
token = response.next_continuation_token

proceed = token.present?
end
bucket[:bytes] = bytes
bucket[:object_count] = object_count
end
end

def regional_client(region)
if !region || region == @ems.provider_region
@aws_s3
else
@regional_resources ||= {}
@regional_resources[region] ||= @ems.connect(:service => :S3, :region => region)
end.client
end

def parse_container(bucket)
Expand All @@ -35,17 +89,16 @@ def parse_container(bucket)
return uid, new_result
end

def parse_object(obj, bucket)
uid = obj.key
def parse_object(object, bucket_id)
uid = object.key

new_result = {
:ems_ref => uid,
:etag => obj.etag,
:last_modified => obj.last_modified,
:content_length => obj.size,
:key => obj.key,
#:content_type => obj.content_type,
:container => bucket
:ems_ref => "#{bucket_id}_#{uid}",
:etag => object.etag,
:last_modified => object.last_modified,
:content_length => object.size,
:key => object.key,
:cloud_object_store_container_id => @data_index.fetch_path(:cloud_object_store_containers, bucket_id)
}
return uid, new_result
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ def collect_inventory_for_targets(ems, targets)

_log.info "Filtering inventory for #{target.class} [#{target_name}] id: [#{target.id}]..."

if refresher_options.try(:[], :inventory_object_refresh)
inventory = ManageIQ::Providers::Amazon::Builder.build_inventory(ems, target)
end
inventory = ManageIQ::Providers::Amazon::Builder.build_inventory(ems, target)
# if refresher_options.try(:[], :inventory_object_refresh)
# inventory = ManageIQ::Providers::Amazon::Builder.build_inventory(ems, target)
# end

_log.info "Filtering inventory...Complete"
[target, inventory]
Expand All @@ -22,11 +23,14 @@ def parse_targeted_inventory(ems, _target, inventory)
log_header = format_ems_for_logging(ems)
_log.debug "#{log_header} Parsing inventory..."
hashes, = Benchmark.realtime_block(:parse_inventory) do
if refresher_options.try(:[], :inventory_object_refresh)
inventory.inventory_collections
else
ManageIQ::Providers::Amazon::StorageManager::S3::RefreshParser.ems_inv_to_hashes(ems, refresher_options)
end
inventory.inventory_collections
# Old parser algorytm is not able to inventory large amount of records in reasonable time

# if refresher_options.try(:[], :inventory_object_refresh)
# inventory.inventory_collections
# else
# ManageIQ::Providers::Amazon::StorageManager::S3::RefreshParser.ems_inv_to_hashes(ems, refresher_options)
# end
end
_log.debug "#{log_header} Parsing inventory...Complete"

Expand Down
16 changes: 15 additions & 1 deletion spec/models/manageiq/providers/amazon/aws_stubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def test_counts(scaling = nil)
:outbound_firewall_rule_per_security_group_count => scaling * 5,
:cloud_volume_count => scaling * 5,
:cloud_volume_snapshot_count => scaling * 5,
:s3_buckets_count => scaling * 5
:s3_buckets_count => scaling * 5,
:s3_objects_per_bucket_count => scaling * 5
}
end

Expand Down Expand Up @@ -358,6 +359,19 @@ def mocked_s3_buckets
mocked_s3_buckets
end

def mocked_s3_objects
mocked_s3_objects = []
test_counts[:s3_objects_per_bucket_count].times do |i|
mocked_s3_objects << {
:key => "object_key_#{i}",
:etag => "object_key_#{i}",
:size => 1,
:last_modified => Time.now.utc
}
end
mocked_s3_objects
end

def mocked_instance_health
mocked_instance_healths = []
expected_table_counts[:load_balancer_pool_member].times do |i|
Expand Down
Loading

0 comments on commit 6dce432

Please sign in to comment.