Skip to content

Commit

Permalink
Merge pull request #123 from aiperon/amazon-s3-objects-parsing
Browse files Browse the repository at this point in the history
Amazon S3 objects inventoring
  • Loading branch information
Ladas authored Feb 21, 2017
2 parents a5d566b + 59a95ae commit a2b9fe6
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 47 deletions.
2 changes: 2 additions & 0 deletions app/models/manageiq/providers/amazon/inventory/collector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class ManageIQ::Providers::Amazon::Inventory::Collector < ManagerRefresh::Invent
attr_reader :stacks, :stacks_refs, :stacks_deleted
attr_reader :cloud_volumes, :cloud_volumes_refs
attr_reader :cloud_volume_snapshots, :cloud_volume_snapshots_refs
attr_reader :cloud_objects_store_containers
attr_reader :cloud_objects_store_objects

def initialize(_manager, _target)
super
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,26 @@ def cloud_object_store_containers
hash_collection.new(aws_s3.client.list_buckets.buckets)
end

def cloud_object_store_objects
hash_collection.new([])
def cloud_object_store_objects(options = {})
options[:token] ||= nil
# S3 bucket accessible only for API client with same region
regional_client = aws_s3_regional(options[:region]).client
response = regional_client.list_objects_v2(
:bucket => options[:bucket],
:continuation_token => options[:token]
)
token = response.next_continuation_token if response.is_truncated
return hash_collection.new(response.contents), token
end

private

def aws_s3_regional(region)
if !region || region == manager.provider_region
aws_s3
else
@regional_resources ||= {}
@regional_resources[region] ||= manager.connect(:service => :S3, :region => region)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ def parse
log_header = "MIQ(#{self.class.name}.#{__method__}) Collecting data for EMS name: [#{collector.manager.name}] id: [#{collector.manager.id}]"

$aws_log.info("#{log_header}...}")
object_store
process_containers
$aws_log.info("#{log_header}...Complete")
end

private

def object_store
def process_containers
process_inventory_collection(
collector.cloud_object_store_containers,
:cloud_object_store_containers
) { |c| parse_container(c) }
persister.collections[:cloud_object_store_containers].data_index.each do |bucket_id, object|
lazy_object = persister.collections[:cloud_object_store_containers].lazy_find(bucket_id)
object_stats = process_objects(bucket_id, lazy_object)
object.data.merge!(object_stats)
end
end

def parse_container(bucket)
Expand All @@ -28,4 +33,44 @@ def parse_container(bucket)
:key => bucket['name']
}
end

def process_objects(bucket_id, bucket_object)
# S3 bucket accessible only for API client with same region
region = collector.aws_s3.client.get_bucket_location(:bucket => bucket_id).location_constraint
options = { :region => region, :bucket => bucket_id }

# AWS SDK doesn't show information about overall size and object count.
# We need to collect it manually.
bytes = 0
object_count = 0
proceed = true
while proceed
objects, token = collector.cloud_object_store_objects(options)
options[:token] = token

process_inventory_collection(objects, :cloud_object_store_objects) do |o|
new_result = parse_object(o, bucket_object)
bytes += new_result[:content_length]
object_count += 1
new_result
end

proceed = token.present?
end

{ :bytes => bytes, :object_count => object_count }
end

def parse_object(object, bucket)
uid = object['key']
{
:ext_management_system => ems,
:ems_ref => "#{bucket.ems_ref}_#{uid}",
:etag => object['etag'],
:last_modified => object['last_modified'],
:content_length => object['size'],
:key => uid,
:cloud_object_store_container => bucket
}
end
end
3 changes: 2 additions & 1 deletion app/models/manageiq/providers/amazon/manager_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ def connect(options = {})
password = options[:pass] || authentication_password(options[:auth_type])
service = options[:service] || :EC2
proxy = options[:proxy_uri] || http_proxy_uri
region = options[:region] || provider_region

self.class.raw_connect(username, password, service, provider_region, proxy)
self.class.raw_connect(username, password, service, region, proxy)
end

def translate_exception(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,62 @@ def ems_inv_to_hashes
end

def object_store
buckets = @aws_s3.client.list_buckets.buckets
process_collection(buckets, :cloud_object_store_containers) { |c| parse_container(c) }
process_collection(
@aws_s3.client.list_buckets.buckets,
:cloud_object_store_containers
) { |c| parse_container(c) }

process_containers_content
end

def process_containers_content
containers = @data[:cloud_object_store_containers]
if containers.empty?
process_collection([], :cloud_object_store_objects)
return
end

containers.each do |bucket|
bucket_id = bucket[:ems_ref]

# S3 bucket accessible only for API client with same region
region = @aws_s3.client.get_bucket_location(:bucket => bucket_id).location_constraint
api_client = regional_client(region)

# AWS SDK doesn't show information about overall size and object count.
# We need to collect it manually.
bytes = 0
object_count = 0
# API returns up to 1000 objects per request
token = nil
proceed = true
while proceed
response = api_client.list_objects_v2(
:bucket => bucket_id,
:continuation_token => token
)
process_collection(response.contents, :cloud_object_store_objects) do |o|
uid, new_result = parse_object(o, bucket_id)
bytes += new_result[:content_length]
object_count += 1
[uid, new_result]
end
token = response.next_continuation_token

proceed = token.present?
end
bucket[:bytes] = bytes
bucket[:object_count] = object_count
end
end

def regional_client(region)
if !region || region == @ems.provider_region
@aws_s3
else
@regional_resources ||= {}
@regional_resources[region] ||= @ems.connect(:service => :S3, :region => region)
end.client
end

def parse_container(bucket)
Expand All @@ -35,17 +89,16 @@ def parse_container(bucket)
return uid, new_result
end

def parse_object(obj, bucket)
uid = obj.key
def parse_object(object, bucket_id)
uid = object.key

new_result = {
:ems_ref => uid,
:etag => obj.etag,
:last_modified => obj.last_modified,
:content_length => obj.size,
:key => obj.key,
#:content_type => obj.content_type,
:container => bucket
:ems_ref => "#{bucket_id}_#{uid}",
:etag => object.etag,
:last_modified => object.last_modified,
:content_length => object.size,
:key => object.key,
:cloud_object_store_container_id => @data_index.fetch_path(:cloud_object_store_containers, bucket_id)
}
return uid, new_result
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
class ManageIQ::Providers::Amazon::StorageManager::S3::Refresher < ManageIQ::Providers::BaseManager::Refresher
include ::EmsRefresh::Refreshers::EmsRefresherMixin
module ManageIQ::Providers
class Amazon::StorageManager::S3::Refresher < ManageIQ::Providers::BaseManager::Refresher
include ::EmsRefresh::Refreshers::EmsRefresherMixin

def collect_inventory_for_targets(ems, targets)
targets_with_data = targets.collect do |target|
target_name = target.try(:name) || target.try(:event_type)
def collect_inventory_for_targets(ems, targets)
targets_with_data = targets.collect do |target|
target_name = target.try(:name) || target.try(:event_type)

_log.info "Filtering inventory for #{target.class} [#{target_name}] id: [#{target.id}]..."
_log.info "Filtering inventory for #{target.class} [#{target_name}] id: [#{target.id}]..."

if refresher_options.try(:[], :inventory_object_refresh)
inventory = ManageIQ::Providers::Amazon::Builder.build_inventory(ems, target)
if refresher_options.try(:[], :inventory_object_refresh)
inventory = ManageIQ::Providers::Amazon::Builder.build_inventory(ems, target)
end

_log.info "Filtering inventory...Complete"
[target, inventory]
end

_log.info "Filtering inventory...Complete"
[target, inventory]
targets_with_data
end

targets_with_data
end

def parse_targeted_inventory(ems, _target, inventory)
log_header = format_ems_for_logging(ems)
_log.debug "#{log_header} Parsing inventory..."
hashes, = Benchmark.realtime_block(:parse_inventory) do
if refresher_options.try(:[], :inventory_object_refresh)
inventory.inventory_collections
else
ManageIQ::Providers::Amazon::StorageManager::S3::RefreshParser.ems_inv_to_hashes(ems, refresher_options)
def parse_targeted_inventory(ems, _target, inventory)
log_header = format_ems_for_logging(ems)
_log.debug "#{log_header} Parsing inventory..."
hashes, = Benchmark.realtime_block(:parse_inventory) do
if refresher_options.try(:[], :inventory_object_refresh)
inventory.inventory_collections
else
ManageIQ::Providers::Amazon::StorageManager::S3::RefreshParser.ems_inv_to_hashes(ems, refresher_options)
end
end
end
_log.debug "#{log_header} Parsing inventory...Complete"
_log.debug "#{log_header} Parsing inventory...Complete"

hashes
end
hashes
end

def post_process_refresh_classes
[]
def post_process_refresh_classes
[]
end
end
end
2 changes: 2 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
:values:
- machine
:ignore_terminated_instances: true
:s3:
:inventory_object_refresh: true
:http_proxy:
:ec2:
:host:
Expand Down
16 changes: 15 additions & 1 deletion spec/models/manageiq/providers/amazon/aws_stubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def test_counts(scaling = nil)
:outbound_firewall_rule_per_security_group_count => scaling * 5,
:cloud_volume_count => scaling * 5,
:cloud_volume_snapshot_count => scaling * 5,
:s3_buckets_count => scaling * 5
:s3_buckets_count => scaling * 5,
:s3_objects_per_bucket_count => scaling * 5
}
end

Expand Down Expand Up @@ -358,6 +359,19 @@ def mocked_s3_buckets
mocked_s3_buckets
end

def mocked_s3_objects
mocked_s3_objects = []
test_counts[:s3_objects_per_bucket_count].times do |i|
mocked_s3_objects << {
:key => "object_key_#{i}",
:etag => "object_key_#{i}",
:size => 1,
:last_modified => Time.now.utc
}
end
mocked_s3_objects
end

def mocked_instance_health
mocked_instance_healths = []
expected_table_counts[:load_balancer_pool_member].times do |i|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,23 @@ def refresh_spec
@ems.reload

assert_table_counts
assert_buckets_content
assert_ems
end

def stub_responses
{
:s3 => {
:list_buckets => {
:list_buckets => {
:buckets => mocked_s3_buckets
},
:get_bucket_location => {
:location_constraint => mocked_regions[:regions][0][:region_name]
},
:list_objects_v2 => {
:contents => mocked_s3_objects,
:next_continuation_token => nil,
:is_truncated => false
}
}
}
Expand Down Expand Up @@ -121,7 +130,8 @@ def expected_table_counts
:load_balancer_listener_pool => 0,
:load_balancer_health_check => 0,
:load_balancer_health_check_member => 0,
:cloud_object_store_containers => test_counts[:s3_buckets_count]
:cloud_object_store_containers => test_counts[:s3_buckets_count],
:cloud_object_store_objects => test_counts[:s3_buckets_count] * test_counts[:s3_objects_per_bucket_count]
}
end

Expand Down Expand Up @@ -164,12 +174,31 @@ def assert_table_counts
:load_balancer_listener_pool => LoadBalancerListenerPool.count,
:load_balancer_health_check => LoadBalancerHealthCheck.count,
:load_balancer_health_check_member => LoadBalancerHealthCheckMember.count,
:cloud_object_store_containers => CloudObjectStoreContainer.count
:cloud_object_store_containers => CloudObjectStoreContainer.count,
:cloud_object_store_objects => CloudObjectStoreObject.count
}

expect(actual).to eq expected_table_counts
end

def assert_buckets_content
mocked_objects = mocked_s3_objects
expected_hash = {
:object_count => mocked_objects.count,
:content_length => mocked_objects.map { |object| object[:size] }.sum,
}
actual = {}
expected_content = {}
CloudObjectStoreContainer.all.each do |container|
expected_content[container.ems_ref] = expected_hash
actual[container.ems_ref] = {
:object_count => container.object_count,
:content_length => container.bytes
}
end
expect(actual).to eq expected_content
end

def assert_ems
ems = @ems.s3_storage_manager
expect(ems).to have_attributes(:api_version => nil,
Expand Down

0 comments on commit a2b9fe6

Please sign in to comment.