Skip to content

Commit

Permalink
Amazon S3 objects listing
Browse files Browse the repository at this point in the history
Parsing information about objects on S3 buckets.
Calculating bucket's size and object count.
  • Loading branch information
aiperon committed Jan 27, 2017
1 parent c713660 commit ccbfd12
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,26 @@ def cloud_object_store_containers
HashCollection.new(aws_s3.client.list_buckets.buckets)
end

def cloud_object_store_objects
HashCollection.new([])
def cloud_object_store_objects(options = {})
options[:token] ||= nil
# S3 bucket accessible only for API client with same region
regional_client = aws_s3_regional(options[:region]).client
response = regional_client.list_objects_v2(
bucket: options[:bucket],
continuation_token: options[:token]
)
token = response.next_continuation_token if response.is_truncated
return HashCollection.new(response.contents), token
end

private

def aws_s3_regional(region)
if !region || region == ems.provider_region
aws_s3
else
@regional_resources ||= {}
@regional_resources[region] ||= ems.connect(:service => :S3, :region => region)
end
end
end
3 changes: 2 additions & 1 deletion app/models/manageiq/providers/amazon/manager_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ def connect(options = {})
password = options[:pass] || authentication_password(options[:auth_type])
service = options[:service] || :EC2
proxy = options[:proxy_uri] || http_proxy_uri
region = options[:region] || provider_region

self.class.raw_connect(username, password, service, provider_region, proxy)
self.class.raw_connect(username, password, service, region, proxy)
end

def translate_exception(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,56 @@ def ems_inv_to_hashes
end

def object_store
buckets = @aws_s3.client.list_buckets.buckets
process_collection(buckets, :cloud_object_store_containers) { |c| parse_container(c) }
process_collection(
@aws_s3.client.list_buckets.buckets,
:cloud_object_store_containers
) { |c| parse_container(c) }

process_containers_content
end

def process_containers_content
containers = @data[:cloud_object_store_containers]
if containers.empty?
process_collection([], :cloud_object_store_objects)
return
end

containers.each do |bucket|
bucket_id = bucket[:ems_ref]

# S3 bucket accessible only for API client with same region
region = @aws_s3.client.get_bucket_location(bucket: bucket_id).location_constraint
api_client = regional_client(region)

# AWS SDK doesn't show information about overall size and object count.
# We need to collect it manually.
bytes = 0
object_count = 0
# API returns up to 1000 objects per request
continuation_token = nil
begin
response = api_client.list_objects_v2(bucket: bucket_id, continuation_token: continuation_token)
process_collection(response.contents, :cloud_object_store_objects) do |o|
uid, new_result = parse_object(o, bucket_id)
bytes += new_result[:content_length]
object_count += 1
[ uid, new_result ]
end
continuation_token = response.next_continuation_token
end while response.is_truncated && continuation_token
bucket[:bytes] = bytes
bucket[:object_count] = object_count
end
end

def regional_client(region)
if !region || region == ems.provider_region
aws_s3
else
@regional_resources ||= {}
@regional_resources[region] ||= @ems.connect(:service => :S3, :region => region)
end.client
end

def parse_container(bucket)
Expand All @@ -35,18 +83,18 @@ def parse_container(bucket)
return uid, new_result
end

def parse_object(obj, bucket)
uid = obj.key
def parse_object(object, bucket_id)
uid = object.key

new_result = {
:ems_ref => uid,
:etag => obj.etag,
:last_modified => obj.last_modified,
:content_length => obj.size,
:key => obj.key,
#:content_type => obj.content_type,
:container => bucket
:ems_ref => "#{bucket_id}_#{uid}",
:etag => object.etag,
:last_modified => object.last_modified,
:content_length => object.size,
:key => object.key,
:cloud_object_store_container_id => @data_index.fetch_path(:cloud_object_store_containers, bucket_id)
}
return uid, new_result
end

end
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,75 @@ def populate_inventory_collections
log_header = "MIQ(#{self.class.name}.#{__method__}) Collecting data for EMS name: [#{inventory.ems.name}] id: [#{inventory.ems.id}]"

$aws_log.info("#{log_header}...}")
object_store
process_containers
$aws_log.info("#{log_header}...Complete")

inventory_collections
end

private

def object_store
def process_containers
process_inventory_collection(
inventory.cloud_object_store_containers,
:cloud_object_store_containers
) { |c| parse_container(c) }
$aws_log.info "PROCESSING"
inventory_collections[:cloud_object_store_containers].data_index.each do |bucket_id, object|
lazy_object = inventory_collections[:cloud_object_store_containers].lazy_find(bucket_id)
object_stats = process_objects(bucket_id, lazy_object)
object.data.merge!(object_stats)
end

end

def parse_container(bucket)
uid = bucket['name']
{
container_hash = {
:ext_management_system => ems,
:ems_ref => uid,
:key => bucket['name']
}
end

def process_objects(bucket_id, bucket_object)
# S3 bucket accessible only for API client with same region
region = inventory.aws_s3.client.get_bucket_location(bucket: bucket_id).location_constraint
options = { region: region, bucket: bucket_id }

# AWS SDK doesn't show information about overall size and object count.
# We need to collect it manually.
bytes = 0
object_count = 0
begin
objects, token = inventory.cloud_object_store_objects(options)
options[:token] = token

process_inventory_collection(objects, :cloud_object_store_objects) do |o|
new_result = parse_object(o, bucket_object)
bytes += new_result[:content_length]
#$aws_log.info "#{new_result[:ems_ref]}"
object_count += 1
new_result
end

#$aws_log.info "#{bucket_id}. Objects: #{object_count}"

end while token.present?

{ bytes: bytes, object_count: object_count }
end

def parse_object(object, bucket)
uid = object['key']
{
:ext_management_system => ems,
:ems_ref => "#{bucket.ems_ref}_#{uid}",
:etag => object['etag'],
:last_modified => object['last_modified'],
:content_length => object['size'],
:key => uid,
:cloud_object_store_container => bucket
}
end
end
16 changes: 15 additions & 1 deletion spec/models/manageiq/providers/amazon/aws_stubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def test_counts(scaling = nil)
:outbound_firewall_rule_per_security_group_count => scaling * 5,
:cloud_volume_count => scaling * 5,
:cloud_volume_snapshot_count => scaling * 5,
:s3_buckets_count => scaling * 5
:s3_buckets_count => scaling * 5,
:s3_objects_per_bucket_count => scaling * 5
}
end

Expand Down Expand Up @@ -358,6 +359,19 @@ def mocked_s3_buckets
mocked_s3_buckets
end

def mocked_s3_objects
mocked_s3_objects = []
test_counts[:s3_objects_per_bucket_count].times do |i|
mocked_s3_objects << {
:key => "object_key_#{i}",
:etag => "object_key_#{i}",
:size => 1,
:last_modified => Time.now.utc
}
end
mocked_s3_objects
end

def mocked_instance_health
mocked_instance_healths = []
expected_table_counts[:load_balancer_pool_member].times do |i|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def refresh_spec
@ems.reload

assert_table_counts
assert_buckets_content
assert_ems
end

Expand All @@ -77,6 +78,14 @@ def stub_responses
:s3 => {
:list_buckets => {
:buckets => mocked_s3_buckets
},
:get_bucket_location => {
:location_constraint => mocked_regions[:regions][0][:region_name]
},
:list_objects_v2 => {
:contents => mocked_s3_objects,
:next_continuation_token => nil,
:is_truncated => false
}
}
}
Expand Down Expand Up @@ -121,7 +130,8 @@ def expected_table_counts
:load_balancer_listener_pool => 0,
:load_balancer_health_check => 0,
:load_balancer_health_check_member => 0,
:cloud_object_store_containers => test_counts[:s3_buckets_count]
:cloud_object_store_containers => test_counts[:s3_buckets_count],
:cloud_object_store_objects => test_counts[:s3_buckets_count] * test_counts[:s3_objects_per_bucket_count]
}
end

Expand Down Expand Up @@ -164,17 +174,37 @@ def assert_table_counts
:load_balancer_listener_pool => LoadBalancerListenerPool.count,
:load_balancer_health_check => LoadBalancerHealthCheck.count,
:load_balancer_health_check_member => LoadBalancerHealthCheckMember.count,
:cloud_object_store_containers => CloudObjectStoreContainer.count
:cloud_object_store_containers => CloudObjectStoreContainer.count,
:cloud_object_store_objects => CloudObjectStoreObject.count
}

expect(actual).to eq expected_table_counts
end

def assert_buckets_content
mocked_objects = mocked_s3_objects
expected_hash = {
object_count: mocked_objects.count,
content_length: mocked_objects.map{ |object| object[:size] }.sum,
}
actual = {}
expected_content = {}
CloudObjectStoreContainer.all.each do |container|
expected_content[container.ems_ref] = expected_hash
actual[container.ems_ref] = {
object_count: container.object_count,
content_length: container.bytes
}
end
expect(actual).to eq expected_content
end

def assert_ems
ems = @ems.s3_storage_manager
expect(ems).to have_attributes(:api_version => nil,
:uid_ems => nil)

expect(ems.cloud_object_store_containers.size).to eql(expected_table_counts[:cloud_object_store_containers])
end

end

0 comments on commit ccbfd12

Please sign in to comment.