Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MiqFileStorage interface and subclassing (with file splitting) #361

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 281 additions & 0 deletions lib/gems/pending/util/extensions/aws-sdk/s3_upload_stream_patch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
# This is a backport of the `Aws::S3::Object#upload_stream` method that is
# introduced in the V3 of the 'aws-sdk'
#
# To my knowledge, the changes from this patch should be completely backwards
# compatible, and the API should easily support this change.
#
# Once upgraded, this file should be removed.

require 'aws-sdk'

if defined? Aws::S3::GEM_VERSION # This constant doesn't exist in v2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I wouldn't want to push it just for this patch, but do we know if we are super far away from being able to use the new version as a whole? Would be curious to know how much effort that would be rather than forward porting this patch. @bronaghs Can you comment here or refer who would know about the aws-sdk version we are using?

Copy link
Member Author

@NickLaMuro NickLaMuro Sep 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we know if we are super far away from being able to use the new version as a whole?

I don't know personally, or if it has been considered at all.

I want to say there was something that was being done to update fog in the recent future, and I thought that was potentially going to update the aws-sdk, but I might be remembering incorrectly (Update: I was wrong... no surprise... It was for ManageIQ/manageiq#17258 and it was fog-google, not AWS... oops)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching to v3 of the amazon API is not in the near future... cc @bronaghs @agrare

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the far future has arrived? #374

raise <<~ERROR
The 'aws-sdk' gem has been updated to v3!!!

Remove this file!

#{File.expand_path(__FILE__)}

And delete any references to it, starting with this one:

#{caller(1..1).first}

This functionality was backported and should now exist normally in the
aws-sdk library, so no monkey patching is necessary.

Horray!
ERROR
else
# make sure the client is autoloaded before we continue
Aws::S3::Client
end

# The code below comes directly from the aws-sdk, so ignore rubocop warnings
# for the remainder of this file.
#
# rubocop:disable all

### Aws::S3::MultipartStreamUploader
#
# Ripped from:
#
# https://github.com/aws/aws-sdk-ruby/blob/708140b8/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb

require 'thread'
require 'set'
require 'tempfile'
require 'stringio'

module Aws
module S3
# @api private
class MultipartStreamUploader
# api private
PART_SIZE = 5 * 1024 * 1024 # 5MB

# api private
THREAD_COUNT = 10

# api private
TEMPFILE_PREIX = 'aws-sdk-s3-upload_stream'.freeze

# @api private
CREATE_OPTIONS =
Set.new(Client.api.operation(:create_multipart_upload).input.shape.member_names)

# @api private
UPLOAD_PART_OPTIONS =
Set.new(Client.api.operation(:upload_part).input.shape.member_names)

# @option options [Client] :client
def initialize(options = {})
@client = options[:client] || Client.new
@tempfile = options[:tempfile]
@part_size = options[:part_size] || PART_SIZE
@thread_count = options[:thread_count] || THREAD_COUNT
end

# @return [Client]
attr_reader :client

# @option options [required,String] :bucket
# @option options [required,String] :key
# @return [void]
def upload(options = {}, &block)
upload_id = initiate_upload(options)
parts = upload_parts(upload_id, options, &block)
complete_upload(upload_id, parts, options)
end

private

def initiate_upload(options)
@client.create_multipart_upload(create_opts(options)).upload_id
end

def complete_upload(upload_id, parts, options)
@client.complete_multipart_upload(
bucket: options[:bucket],
key: options[:key],
upload_id: upload_id,
multipart_upload: { parts: parts })
end

def upload_parts(upload_id, options, &block)
completed = Queue.new
errors = IO.pipe do |read_pipe, write_pipe|
threads = upload_in_threads(read_pipe, completed, upload_part_opts(options).merge(upload_id: upload_id))
block.call(write_pipe)
write_pipe.close
threads.map(&:value).compact
end
if errors.empty?
Array.new(completed.size) { completed.pop }.sort_by { |part| part[:part_number] }
else
abort_upload(upload_id, options, errors)
end
end

def abort_upload(upload_id, options, errors)
@client.abort_multipart_upload(
bucket: options[:bucket],
key: options[:key],
upload_id: upload_id
)
msg = "multipart upload failed: #{errors.map(&:message).join("; ")}"
raise MultipartUploadError.new(msg, errors)
rescue MultipartUploadError => error
raise error
rescue => error
msg = "failed to abort multipart upload: #{error.message}"
raise MultipartUploadError.new(msg, errors + [error])
end

def create_opts(options)
CREATE_OPTIONS.inject({}) do |hash, key|
hash[key] = options[key] if options.key?(key)
hash
end
end

def upload_part_opts(options)
UPLOAD_PART_OPTIONS.inject({}) do |hash, key|
hash[key] = options[key] if options.key?(key)
hash
end
end

def read_to_part_body(read_pipe)
return if read_pipe.closed?
temp_io = @tempfile ? Tempfile.new(TEMPFILE_PREIX) : StringIO.new
temp_io.binmode
bytes_copied = IO.copy_stream(read_pipe, temp_io, @part_size)
temp_io.rewind
if bytes_copied == 0
if Tempfile === temp_io
temp_io.close
temp_io.unlink
end
nil
else
temp_io
end
end

def upload_in_threads(read_pipe, completed, options)
mutex = Mutex.new
part_number = 0
@thread_count.times.map do
thread = Thread.new do
begin
loop do
body, thread_part_number = mutex.synchronize do
[read_to_part_body(read_pipe), part_number += 1]
end
break unless body
begin
part = options.merge(
body: body,
part_number: thread_part_number,
)
resp = @client.upload_part(part)
completed << {etag: resp.etag, part_number: part[:part_number]}
ensure
if Tempfile === body
body.close
body.unlink
end
end
end
nil
rescue => error
# keep other threads from uploading other parts
mutex.synchronize { read_pipe.close_read }
error
end
end
thread.abort_on_exception = true
thread
end
end
end
end
end

### Aws::S3::Object patches
#
# Customizations ripped from:
#
# https://github.com/aws/aws-sdk-ruby/blob/708140b8/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb
#
# Only the Aws::S3::Object#upload_stream method is backported, which is a
# wrapper method around the above. This is an attempt to reduce the monkey
# patching surface area.
#

module Aws
module S3
class Object
# Unsure if this might have been used by the uploaded above, so I kept it
# in for now.
alias size content_length

# Uploads a stream in a streaming fashion to the current object in S3.
#
# # Passed chunks automatically split into multipart upload parts
# # and the parts are uploaded in parallel. This allows for streaming uploads
# # that never touch the disk.
#
# Note that this is known to have issues in JRuby until jruby-9.1.15.0, so avoid using this with older versions of JRuby.
#
# @example Streaming chunks of data
# obj.upload_stream do |write_stream|
# 10.times { write_stream << 'foo' }
# end
# @example Streaming chunks of data
# obj.upload_stream do |write_stream|
# IO.copy_stream(IO.popen('ls'), write_stream)
# end
# @example Streaming chunks of data
# obj.upload_stream do |write_stream|
# IO.copy_stream(STDIN, write_stream)
# end
#
# @option options [Integer] :thread_count
# The number of parallel multipart uploads
# Default `:thread_count` is `10`.
#
# @option options [Boolean] :tempfile
# Normally read data is stored in memory when building the parts in order to complete
# the underlying multipart upload. By passing `:tempfile => true` data read will be
# temporarily stored on disk reducing the memory footprint vastly.
# Default `:tempfile` is `false`.
#
# @option options [Integer] :part_size
# Define how big each part size but the last should be.
# Default `:part_size` is `5 * 1024 * 1024`.
#
# @raise [MultipartUploadError] If an object is being uploaded in
# parts, and the upload can not be completed, then the upload is
# aborted and this error is raised. The raised error has a `#errors`
# method that returns the failures that caused the upload to be
# aborted.
#
# @return [Boolean] Returns `true` when the object is uploaded
# without any errors.
#
def upload_stream(options = {}, &block)
uploading_options = options.dup
uploader = MultipartStreamUploader.new(
client: client,
thread_count: uploading_options.delete(:thread_count),
tempfile: uploading_options.delete(:tempfile),
part_size: uploading_options.delete(:part_size),
)
uploader.upload(uploading_options.merge(bucket: bucket_name, key: key), &block)
true
end
end
end
end
# rubocop:enable all
Loading