diff --git a/lib/gems/pending/util/extensions/aws-sdk/s3_upload_stream_patch.rb b/lib/gems/pending/util/extensions/aws-sdk/s3_upload_stream_patch.rb new file mode 100644 index 000000000..4ecbb30ee --- /dev/null +++ b/lib/gems/pending/util/extensions/aws-sdk/s3_upload_stream_patch.rb @@ -0,0 +1,281 @@ +# This is a backport of the `Aws::S3::Object#upload_stream` method that is +# introduced in the V3 of the 'aws-sdk' +# +# To my knowledge, the changes from this patch should be completely backwards +# compatible, and the API should easily support this change. +# +# Once upgraded, this file should be removed. + +require 'aws-sdk' + +if defined? Aws::S3::GEM_VERSION # This constant doesn't exist in v2 + raise <<~ERROR + The 'aws-sdk' gem has been updated to v3!!! + + Remove this file! + + #{File.expand_path(__FILE__)} + + And delete any references to it, starting with this one: + + #{caller(1..1).first} + + This functionality was backported and should now exist normally in the + aws-sdk library, so no monkey patching is necessary. + + Horray! + ERROR +else + # make sure the client is autoloaded before we continue + Aws::S3::Client +end + +# The code below comes directly from the aws-sdk, so ignore rubocop warnings +# for the remainder of this file. +# +# rubocop:disable all + +### Aws::S3::MultipartStreamUploader +# +# Ripped from: +# +# https://github.com/aws/aws-sdk-ruby/blob/708140b8/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb + +require 'thread' +require 'set' +require 'tempfile' +require 'stringio' + +module Aws + module S3 + # @api private + class MultipartStreamUploader + # api private + PART_SIZE = 5 * 1024 * 1024 # 5MB + + # api private + THREAD_COUNT = 10 + + # api private + TEMPFILE_PREIX = 'aws-sdk-s3-upload_stream'.freeze + + # @api private + CREATE_OPTIONS = + Set.new(Client.api.operation(:create_multipart_upload).input.shape.member_names) + + # @api private + UPLOAD_PART_OPTIONS = + Set.new(Client.api.operation(:upload_part).input.shape.member_names) + + # @option options [Client] :client + def initialize(options = {}) + @client = options[:client] || Client.new + @tempfile = options[:tempfile] + @part_size = options[:part_size] || PART_SIZE + @thread_count = options[:thread_count] || THREAD_COUNT + end + + # @return [Client] + attr_reader :client + + # @option options [required,String] :bucket + # @option options [required,String] :key + # @return [void] + def upload(options = {}, &block) + upload_id = initiate_upload(options) + parts = upload_parts(upload_id, options, &block) + complete_upload(upload_id, parts, options) + end + + private + + def initiate_upload(options) + @client.create_multipart_upload(create_opts(options)).upload_id + end + + def complete_upload(upload_id, parts, options) + @client.complete_multipart_upload( + bucket: options[:bucket], + key: options[:key], + upload_id: upload_id, + multipart_upload: { parts: parts }) + end + + def upload_parts(upload_id, options, &block) + completed = Queue.new + errors = IO.pipe do |read_pipe, write_pipe| + threads = upload_in_threads(read_pipe, completed, upload_part_opts(options).merge(upload_id: upload_id)) + block.call(write_pipe) + write_pipe.close + threads.map(&:value).compact + end + if errors.empty? + Array.new(completed.size) { completed.pop }.sort_by { |part| part[:part_number] } + else + abort_upload(upload_id, options, errors) + end + end + + def abort_upload(upload_id, options, errors) + @client.abort_multipart_upload( + bucket: options[:bucket], + key: options[:key], + upload_id: upload_id + ) + msg = "multipart upload failed: #{errors.map(&:message).join("; ")}" + raise MultipartUploadError.new(msg, errors) + rescue MultipartUploadError => error + raise error + rescue => error + msg = "failed to abort multipart upload: #{error.message}" + raise MultipartUploadError.new(msg, errors + [error]) + end + + def create_opts(options) + CREATE_OPTIONS.inject({}) do |hash, key| + hash[key] = options[key] if options.key?(key) + hash + end + end + + def upload_part_opts(options) + UPLOAD_PART_OPTIONS.inject({}) do |hash, key| + hash[key] = options[key] if options.key?(key) + hash + end + end + + def read_to_part_body(read_pipe) + return if read_pipe.closed? + temp_io = @tempfile ? Tempfile.new(TEMPFILE_PREIX) : StringIO.new + temp_io.binmode + bytes_copied = IO.copy_stream(read_pipe, temp_io, @part_size) + temp_io.rewind + if bytes_copied == 0 + if Tempfile === temp_io + temp_io.close + temp_io.unlink + end + nil + else + temp_io + end + end + + def upload_in_threads(read_pipe, completed, options) + mutex = Mutex.new + part_number = 0 + @thread_count.times.map do + thread = Thread.new do + begin + loop do + body, thread_part_number = mutex.synchronize do + [read_to_part_body(read_pipe), part_number += 1] + end + break unless body + begin + part = options.merge( + body: body, + part_number: thread_part_number, + ) + resp = @client.upload_part(part) + completed << {etag: resp.etag, part_number: part[:part_number]} + ensure + if Tempfile === body + body.close + body.unlink + end + end + end + nil + rescue => error + # keep other threads from uploading other parts + mutex.synchronize { read_pipe.close_read } + error + end + end + thread.abort_on_exception = true + thread + end + end + end + end +end + +### Aws::S3::Object patches +# +# Customizations ripped from: +# +# https://github.com/aws/aws-sdk-ruby/blob/708140b8/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb +# +# Only the Aws::S3::Object#upload_stream method is backported, which is a +# wrapper method around the above. This is an attempt to reduce the monkey +# patching surface area. +# + +module Aws + module S3 + class Object + # Unsure if this might have been used by the uploaded above, so I kept it + # in for now. + alias size content_length + + # Uploads a stream in a streaming fashion to the current object in S3. + # + # # Passed chunks automatically split into multipart upload parts + # # and the parts are uploaded in parallel. This allows for streaming uploads + # # that never touch the disk. + # + # Note that this is known to have issues in JRuby until jruby-9.1.15.0, so avoid using this with older versions of JRuby. + # + # @example Streaming chunks of data + # obj.upload_stream do |write_stream| + # 10.times { write_stream << 'foo' } + # end + # @example Streaming chunks of data + # obj.upload_stream do |write_stream| + # IO.copy_stream(IO.popen('ls'), write_stream) + # end + # @example Streaming chunks of data + # obj.upload_stream do |write_stream| + # IO.copy_stream(STDIN, write_stream) + # end + # + # @option options [Integer] :thread_count + # The number of parallel multipart uploads + # Default `:thread_count` is `10`. + # + # @option options [Boolean] :tempfile + # Normally read data is stored in memory when building the parts in order to complete + # the underlying multipart upload. By passing `:tempfile => true` data read will be + # temporarily stored on disk reducing the memory footprint vastly. + # Default `:tempfile` is `false`. + # + # @option options [Integer] :part_size + # Define how big each part size but the last should be. + # Default `:part_size` is `5 * 1024 * 1024`. + # + # @raise [MultipartUploadError] If an object is being uploaded in + # parts, and the upload can not be completed, then the upload is + # aborted and this error is raised. The raised error has a `#errors` + # method that returns the failures that caused the upload to be + # aborted. + # + # @return [Boolean] Returns `true` when the object is uploaded + # without any errors. + # + def upload_stream(options = {}, &block) + uploading_options = options.dup + uploader = MultipartStreamUploader.new( + client: client, + thread_count: uploading_options.delete(:thread_count), + tempfile: uploading_options.delete(:tempfile), + part_size: uploading_options.delete(:part_size), + ) + uploader.upload(uploading_options.merge(bucket: bucket_name, key: key), &block) + true + end + end + end +end +# rubocop:enable all diff --git a/lib/gems/pending/util/miq_file_storage.rb b/lib/gems/pending/util/miq_file_storage.rb new file mode 100644 index 000000000..4b5a7b4e8 --- /dev/null +++ b/lib/gems/pending/util/miq_file_storage.rb @@ -0,0 +1,254 @@ +# This class is meant to be a abstract interface for defining a file_storage +# class. +# +# The storage class can either be of a type of "object storage", which includes: +# * protocols like FTP +# * document storage like s3 and OpenStack's Swift +# +# And mountable filesystems like: +# * NFS +# * SMB +# +# The class is meant to allow a shared interface for working with these +# different forms of file storage, while maintaining their differences in +# implementation where necessary. Connection will be handled separately by the +# subclasses, but they must conform to the top level interface. +# +class MiqFileStorage + class InvalidSchemeError < ArgumentError + def initialize(bad_scheme = nil) + super(error_message(bad_scheme)) + end + + def error_message(bad_scheme) + valid_schemes = ::MiqFileStorage.storage_interface_classes.keys.inspect + "#{bad_scheme} is not a valid MiqFileStorage uri scheme. Accepted schemes are #{valid_schemes}" + end + end + + def self.with_interface_class(opts) + klass = fetch_interface_class(opts) + block_given? ? yield(klass) : klass + end + + def self.fetch_interface_class(opts) + return nil unless opts[:uri] + + require 'uri' + scheme, _ = URI.split(URI::DEFAULT_PARSER.escape(opts[:uri])) + klass = storage_interface_classes[scheme] + + raise InvalidSchemeError, scheme if klass.nil? + + klass.new_with_opts(opts) + end + private_class_method :fetch_interface_class + + def self.storage_interface_classes + @storage_interface_classes ||= Interface.descendants.each_with_object({}) do |klass, memo| + memo[klass.uri_scheme] = klass if klass.uri_scheme + end + end + + class Interface + BYTE_HASH_MATCH = /^(?\d+(\.\d+)?)\s*(?K|M|G)?$/i + BYTE_HASH = { + "k" => 1.kilobyte, + "m" => 1.megabyte, + "g" => 1.gigabyte + }.freeze + + attr_reader :remote_file_path, :byte_count, :source_input, :input_writer + + def self.new_with_opts(opts) # rubocop:disable Lint/UnusedMethodArgument + raise NotImplementedError, "#{name}.new_with_opts is not defined" + end + + def self.uri_scheme + nil + end + + # :call-seq: + # add( remote_uri ) { |input_writer| ... } + # add( remote_uri, byte_count ) { |input_writer| ... } + # + # add( local_io, remote_uri ) + # add( local_io, remote_uri, byte_count ) + # + # Add a file to the destination URI. + # + # In the block form of the method, only the remote_uri is required, and it + # is assumed the input will be a generated in the executed block (most + # likely an external process) to a unix pipe that can be written to. The + # pipe generated by this method and passed in to the block as a file + # location to the `input_stream`). + # + # In the non-block form, a source must be provided as the first argument + # either as an IO object that can be read from, or a file path, and the + # second argument is the remote_uri as in the block form. + # + # An additional argument in both forms as the last argument is `byte_count` + # can also be included. If passed, it will be assumed that the resulting + # input will be split, and the naming for the splits will be: + # + # - filename.00001 + # - filename.00002 + # ... + # + # Block form: + # + # nfs_session.add("path/to/file", "200M") do |input_stream| + # `pg_dump -f #{input_stream} vmdb_production` + # end + # + # Non-block form: + # + # nfs_session.add("path/to/local_file", "path/to/remote_file") + # nfs_session.add("path/to/local_file", "path/to/remote_file", "200M") + # + def add(*upload_args, &block) + initialize_upload_vars(*upload_args) + mkdir(File.dirname(@remote_file_path)) + thread = handle_io_block(&block) + result = if byte_count + upload_splits + else + upload_single(@remote_file_path) + end + # `.join` will raise any errors from the thread, so we want to do that + # here (if a thread exists of course). + thread.join if thread + result + ensure + reset_vars + end + alias upload add + + def mkdir(dir) # rubocop:disable Lint/UnusedMethodArgument + raise NotImplementedError, "#{self.class}##{__callee__} is not defined" + end + + # :call-seq: + # download( local_io, remote_uri ) + # download( nil, remote_uri ) { |input_writer| ... } + # + # Download a file from a remote uri. + # + # In non-block form, the remote_uri is saved to the local_io. + # + # In block form, the local_io is omitted, and it is set to a PTY writer + # path that will assumed to be read by the block provided. + def download(local_file, remote_file_uri, &block) + @remote_file_path = remote_file_uri + if block_given? + thread = handle_io_block(&block) + download_single(remote_file_uri, input_writer) + input_writer.close + thread.join + else + download_single(remote_file_uri, local_file) + end + ensure + reset_vars + end + + private + + # NOTE: Needs to be overwritten in the subclass! + # + # Classes that inherit from `MiqFileStorage` need to make sure to create a + # method that overwrites this one to handle the specifics of uploading for + # their particular ObjectStore protocol or MountSession. + # + # `dest_uri` is the current file that will be uploaded. If file splitting + # is occurring, this will update the filename passed into `.add` to include + # a `.0000X` suffix, where the suffix is padded up to 5 digits in total. + # + # `#upload_single` doesn't need to worry about determining the file name + # itself for splitting, but if any relative path munging is necessary, that + # should be done here (see `MiqGenericMountSession#upload_single` for an + # example) + # + # `source_input` available as an attr_reader in this method, and will + # always be a local IO object that is available for reading. + # + # `byte_count` is also an attr_reader that is available, and will either be + # `nil` if no file splitting is occurring, or a integer representing the + # maximum number of bytes to uploaded for this particular `dest_uri`. + # + # + # Ideally, making use of `IO.copy_stream` will simplify this process + # significantly, as you can pass it `source_input`, `dest_uri`, and + # `byte_count` respectively, and it will automatically handle streaming the + # data from one IO object to the other. In mount based situations, where + # `dest_uri` is a file path (in `MiqGenericMountSession#upload_single`, + # this is converted to `relpath`), this does not need to be converted to a + # `File` IO object as `IO.copy_stream` will do that for you. + def upload_single(dest_uri) # rubocop:disable Lint/UnusedMethodArgument + raise NotImplementedError, "#{self.class}#upload_single is not defined" + end + + def upload_splits + @position = 0 + until source_input.eof? + upload_single(next_split_filename) + @position += byte_count + end + end + + def initialize_upload_vars(*upload_args) + upload_args.pop if (@byte_count = parse_byte_value(upload_args.last)) + @remote_file_path = upload_args.pop + + unless upload_args.empty? + source = upload_args.pop + @source_input = source.kind_of?(IO) ? source : File.open(source, "r") + end + end + + def parse_byte_value(bytes) + match = bytes.to_s.match(BYTE_HASH_MATCH) || return + + bytes = match[:BYTE_NUM].to_f + if match[:BYTE_QUALIFIER] + bytes *= BYTE_HASH[match[:BYTE_QUALIFIER].downcase] + end + bytes.to_i + end + + def handle_io_block + if block_given? + require "tmpdir" + + # create pathname, but don't create the file for it (next line) + fifo_path = Pathname.new(Dir::Tmpname.create("") {}) + File.mkfifo(fifo_path) + + # For #Reasons(TM), the reader must be opened first + @source_input = File.open(fifo_path.to_s, IO::RDONLY | IO::NONBLOCK) + @input_writer = File.open(fifo_path.to_s, IO::WRONLY | IO::NONBLOCK) + + Thread.new do + begin + yield fifo_path # send the path to the block to get executed + ensure + @input_writer.close # close the file so we know we hit EOF (for #add) + end + end + end + end + + def reset_vars + File.delete(@input_writer.path) if @input_writer + @position, @byte_count, @remote_file_path, @source_input, @input_writer = nil + end + + def next_split_filename + "#{remote_file_path}.#{'%05d' % (@position / byte_count + 1)}" + end + + def download_single(source, destination) # rubocop:disable Lint/UnusedMethodArgument + raise NotImplementedError, "#{self.class}#download_single is not defined" + end + end +end diff --git a/lib/gems/pending/util/miq_object_storage.rb b/lib/gems/pending/util/miq_object_storage.rb new file mode 100644 index 000000000..b7ea0f2eb --- /dev/null +++ b/lib/gems/pending/util/miq_object_storage.rb @@ -0,0 +1,21 @@ +require 'util/miq_file_storage' + +class MiqObjectStorage < MiqFileStorage::Interface + require 'util/object_storage/miq_s3_storage' + + attr_accessor :settings + attr_writer :logger + + def self.new_with_opts(opts) + new(opts.slice(:uri, :username, :password)) + end + + def initialize(settings) + raise "URI missing" unless settings.key?(:uri) + @settings = settings.dup + end + + def logger + @logger ||= $log.nil? ? :: Logger.new(STDOUT) : $log + end +end diff --git a/lib/gems/pending/util/mount/miq_generic_mount_session.rb b/lib/gems/pending/util/mount/miq_generic_mount_session.rb index bb36a3bb4..b5bb8f81b 100644 --- a/lib/gems/pending/util/mount/miq_generic_mount_session.rb +++ b/lib/gems/pending/util/mount/miq_generic_mount_session.rb @@ -6,10 +6,11 @@ require 'util/miq-exception' require 'util/miq-uuid' +require 'util/miq_file_storage' -class MiqGenericMountSession +class MiqGenericMountSession < MiqFileStorage::Interface + require 'util/mount/miq_local_mount_session' require 'util/mount/miq_nfs_session' - require 'util/mount/miq_s3_session' require 'util/mount/miq_smb_session' require 'util/mount/miq_glusterfs_session' @@ -54,17 +55,19 @@ def self.in_depot_session(opts, &_block) def self.new_session(opts) klass = uri_scheme_to_class(opts[:uri]) - session = klass.new(opts.slice(:uri, :username, :password, :region)) + session = klass.new_with_opts(opts) session.connect session end + def self.new_with_opts(opts) + new(opts.slice(:uri, :username, :password)) + end + def self.uri_scheme_to_class(uri) require 'uri' scheme, userinfo, host, port, registry, share, opaque, query, fragment = URI.split(URI.encode(uri)) case scheme - when 's3' - MiqS3Session when 'smb' MiqSmbSession when 'nfs' @@ -246,55 +249,59 @@ def verify res end - def add(source, dest_uri) + def add(*upload_args) + dest_uri = nil log_header = "MIQ(#{self.class.name}-add)" - logger.info("#{log_header} Source: [#{source}], Destination: [#{dest_uri}]...") + # Don't think this log line is possible when using MiqFileStorage::Interface + # + # logger.info("#{log_header} Source: [#{source}], Destination: [#{dest_uri}]...") begin reconnect! - relpath = File.join(@mnt_point, relative_to_mount(dest_uri)) - if File.exist?(relpath) - logger.info("#{log_header} Skipping add since URI: [#{dest_uri}] already exists") - return dest_uri - end - logger.info("#{log_header} Building relative path: [#{relpath}]...") - FileUtils.mkdir_p(File.dirname(relpath)) - logger.info("#{log_header} Building relative path: [#{relpath}]...complete") - - logger.info("#{log_header} Copying file [#{source}] to [#{relpath}]...") - FileUtils.cp(source, relpath) - logger.info("#{log_header} Copying file [#{source}] to [#{relpath}] complete") + dest_uri = super rescue => err - msg = "Adding [#{source}] to [#{dest_uri}], failed due to error: '#{err.message}'" + msg = "Adding [#{source_for_log}] to [#{remote_file_path}], failed due to error: '#{err.message}'" logger.error("#{log_header} #{msg}") raise ensure disconnect end - logger.info("#{log_header} File URI added: [#{dest_uri}] complete") + logger.info("#{log_header} File URI added: [#{remote_file_path}] complete") dest_uri end - alias_method :upload, :add + def upload_single(dest_uri) + log_header = "MIQ(#{self.class.name}-upload_single)" + relpath = uri_to_local_path(dest_uri) + if File.exist?(relpath) + logger.info("#{log_header} Skipping add since URI: [#{dest_uri}] already exists") + return dest_uri + end + + logger.info("#{log_header} Copying file [#{source_for_log}] to [#{dest_uri}]...") + IO.copy_stream(source_input, relpath, byte_count) + logger.info("#{log_header} Copying file [#{source_for_log}] to [#{dest_uri}] complete") + dest_uri + end - def download(local_file, remote_file) + def download_single(remote_file, local_file) log_header = "MIQ(#{self.class.name}-download)" logger.info("#{log_header} Target: [#{local_file}], Remote file: [#{remote_file}]...") begin reconnect! - relpath = File.join(@mnt_point, relative_to_mount(remote_file)) + relpath = uri_to_local_path(remote_file) unless File.exist?(relpath) logger.warn("#{log_header} Remote file: [#{remote_file}] does not exist!") return end logger.info("#{log_header} Copying file [#{relpath}] to [#{local_file}]...") - FileUtils.cp(relpath, local_file) + IO.copy_stream(relpath, local_file) logger.info("#{log_header} Copying file [#{relpath}] to [#{local_file}] complete") rescue => err msg = "Downloading [#{remote_file}] to [#{local_file}], failed due to error: '#{err.message}'" @@ -387,7 +394,11 @@ def glob(pattern) def mkdir(path) with_mounted_exception_handling do - FileUtils.mkdir_p("#{mount_root}/#{path}") + log_header = "MIQ(#{self.class.name}-mkdir)" + new_path = uri_to_local_path(path) + logger.info("#{log_header} Building relative path: [#{new_path}]...") + FileUtils.mkdir_p(new_path) + logger.info("#{log_header} Building relative path: [#{new_path}]...complete") end end @@ -477,4 +488,8 @@ def settings_mount_point return nil if @settings[:mount_point].blank? # Check if settings contains the mount_point to use FileUtils.mkdir_p(@settings[:mount_point]).first end + + def source_for_log + @input_writer ? "" : @source_input.path + end end diff --git a/lib/gems/pending/util/mount/miq_glusterfs_session.rb b/lib/gems/pending/util/mount/miq_glusterfs_session.rb index 6c1882535..d550963a8 100644 --- a/lib/gems/pending/util/mount/miq_glusterfs_session.rb +++ b/lib/gems/pending/util/mount/miq_glusterfs_session.rb @@ -3,6 +3,10 @@ class MiqGlusterfsSession < MiqGenericMountSession PORTS = [2049, 111].freeze + def self.uri_scheme + "glusterfs".freeze + end + def initialize(log_settings) super(log_settings.merge(:ports => PORTS)) end diff --git a/lib/gems/pending/util/mount/miq_local_mount_session.rb b/lib/gems/pending/util/mount/miq_local_mount_session.rb new file mode 100644 index 000000000..0395e7c20 --- /dev/null +++ b/lib/gems/pending/util/mount/miq_local_mount_session.rb @@ -0,0 +1,28 @@ +require 'util/mount/miq_generic_mount_session' + +# MiqLocalMountSession is meant to be a representation of the local file system +# that conforms to the same interface as MiqLocalMountSession (and by proxy, +# MiqFileSystem::Interface). +# +# See MiqGenericMountSession for info on methods available. +class MiqLocalMountSession < MiqGenericMountSession + def self.uri_scheme + "file".freeze + end + + # no-op these since they are not relavent to the local file system + # + # rubocop:disable Style/SingleLineMethods, Layout/EmptyLineBetweenDefs + def connect; end # :nodoc: + def disconnect; end # :nodoc: + def mount_share; end # :nodoc: + # rubocop:enable Style/SingleLineMethods, Layout/EmptyLineBetweenDefs + + def relative_to_mount(remote_file) # :nodoc: + remote_file + end + + def uri_to_local_path(remote_file) # :nodoc: + File.expand_path(remote_file) + end +end diff --git a/lib/gems/pending/util/mount/miq_nfs_session.rb b/lib/gems/pending/util/mount/miq_nfs_session.rb index eb52a4c3f..920366575 100644 --- a/lib/gems/pending/util/mount/miq_nfs_session.rb +++ b/lib/gems/pending/util/mount/miq_nfs_session.rb @@ -3,6 +3,10 @@ class MiqNfsSession < MiqGenericMountSession PORTS = [2049, 111] + def self.uri_scheme + "nfs".freeze + end + def initialize(log_settings) super(log_settings.merge(:ports => PORTS)) end diff --git a/lib/gems/pending/util/mount/miq_s3_session.rb b/lib/gems/pending/util/mount/miq_s3_session.rb deleted file mode 100644 index 3f31f27ce..000000000 --- a/lib/gems/pending/util/mount/miq_s3_session.rb +++ /dev/null @@ -1,89 +0,0 @@ -require 'util/mount/miq_generic_mount_session' - -class MiqS3Session < MiqGenericMountSession - def initialize(log_settings) - super(log_settings) - # NOTE: This line to be removed once manageiq-ui-class region change implemented. - @settings[:region] = "us-east-1" if @settings[:region].nil? - raise "username, password, and region are required values!" if @settings[:username].nil? || @settings[:password].nil? || @settings[:region].nil? - @host = URI(@settings[:uri]).host - @mount_path = URI(@settings[:uri]).path - end - - def self.raw_disconnect(mnt_point) - return if mnt_point.nil? - FileUtils.rm_rf(mnt_point) if File.exist?(mnt_point) - end - - def uri_to_local_path(remote_file) - # Strip off the leading "s3:/" from the URI" - File.join(@mnt_point, URI(remote_file).host, URI(remote_file).path) - end - - def uri_to_object_path(remote_file) - # Strip off the leading "s3://" and the bucket name from the URI" - # Also remove the leading delimiter. - URI(remote_file).path[1..-1] - end - - def add(local_file, uri) - require 'aws-sdk' - bucket_name = URI(uri).host - if (dump_bucket = s3.bucket(bucket_name)).exists? - logger.debug("Found bucket #{bucket_name}") - else - logger.debug("Bucket #{bucket_name} does not exist, creating.") - dump_bucket.create - end - object_file = uri_to_object_path(uri) - # write dump file to s3 - logger.debug("Writing [#{local_file}] to Bucket [#{bucket_name}] using object file name [#{object_file}]") - begin - dump_bucket.object(object_file).upload_file(local_file) - rescue Aws::S3::Errors::AccessDenied, Aws::S3::Errors::Forbidden => err - disconnect - logger.error("Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}") - msg = "Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}" - raise err, msg, err.backtrace - rescue => err - disconnect - logger.error("Error uploading #{local_file} to S3 bucket #{bucket_name}. #{err}") - msg = "Error uploading #{local_file} to S3 bucket #{bucket_name}. #{err}" - raise err, msg, err.backtrace - end - end - - def download(local_file, remote_file) - require 'aws-sdk' - bucket_name = URI(remote_file).host - if (dump_bucket = s3.bucket(bucket_name)).exists? - logger.debug("Found bucket #{bucket_name}") - else - logger.error("Bucket #{bucket_name} does not exist, unable to download [#{remote_file}].") - raise "Bucket #{bucket_name} does not exist, unable to download [#{remote_file}]." - end - object_file = uri_to_object_path(remote_file) - local_file = File.join(@mnt_point, File.basename(local_file)) - logger.debug("Downloading [#{object_file}] from bucket [#{bucket_name}] to local file [#{local_file}]") - begin - dump_bucket.object(object_file).download_file(local_file) - rescue Aws::S3::Errors::AccessDenied, Aws::S3::Errors::Forbidden => err - disconnect - logger.error("Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}") - msg = "Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}" - raise err, msg, err.backtrace - rescue => err - disconnect - logger.error("Error downloading #{remote_file} from S3. #{err}") - msg = "Error downloading #{remote_file} from S3. #{err}" - raise err, msg, err.backtrace - end - local_file - end - - private - - def s3 - @s3 ||= Aws::S3::Resource.new(:region => @settings[:region], :access_key_id => @settings[:username], :secret_access_key => @settings[:password]) - end -end diff --git a/lib/gems/pending/util/mount/miq_smb_session.rb b/lib/gems/pending/util/mount/miq_smb_session.rb index e6c42124c..535c94adc 100644 --- a/lib/gems/pending/util/mount/miq_smb_session.rb +++ b/lib/gems/pending/util/mount/miq_smb_session.rb @@ -3,6 +3,10 @@ class MiqSmbSession < MiqGenericMountSession PORTS = [445, 139] + def self.uri_scheme + "smb".freeze + end + def initialize(log_settings) super(log_settings.merge(:ports => PORTS)) raise "username is a required value!" if @settings[:username].nil? diff --git a/lib/gems/pending/util/object_storage/miq_s3_storage.rb b/lib/gems/pending/util/object_storage/miq_s3_storage.rb new file mode 100644 index 000000000..06cb66511 --- /dev/null +++ b/lib/gems/pending/util/object_storage/miq_s3_storage.rb @@ -0,0 +1,101 @@ +require 'util/miq_object_storage' + +class MiqS3Storage < MiqObjectStorage + attr_reader :bucket_name + + def self.uri_scheme + "s3".freeze + end + + def self.new_with_opts(opts) + new(opts.slice(:uri, :username, :password, :region)) + end + + def initialize(settings) + super(settings) + + # NOTE: This line to be removed once manageiq-ui-class region change implemented. + @settings[:region] ||= "us-east-1" + @bucket_name = URI(@settings[:uri]).host + + raise "username, password, and region are required values!" if @settings[:username].nil? || @settings[:password].nil? || @settings[:region].nil? + end + + # Extract the path from the URI, so strip off the "s3://" scheme, the bucket + # hostname, leaving only the path minus the leading '/' + def uri_to_object_key(remote_file) + # `path` is `[5]` in the returned result of URI.split + URI.split(remote_file)[5][1..-1] + end + + def upload_single(dest_uri) + object_key = uri_to_object_key(dest_uri) + logger.debug("Writing [#{source_input}] to => Bucket [#{bucket_name}] Key [#{dest_uri}]") + + with_standard_s3_error_handling("uploading", source_input) do + bucket.object(object_key).upload_stream do |write_stream| + IO.copy_stream(source_input, write_stream, byte_count) + end + end + end + + def download_single(source, destination) + object_key = uri_to_object_key(remote_file) + logger.debug("Downloading [#{source}] from bucket [#{bucket_name}] to local file [#{destination}]") + + with_standard_s3_error_handling("downloading", source) do + if destination.kind_of?(IO) + s3.client.get_object(:bucket => bucket_name, :key => object_key) do |chunk| + destination.write(chunk) + end + else # assume file path + bucket.object(source).download_file(destination) + end + end + local_file + end + + # no-op mostly + # + # dirs don't need to be created ahead of time in s3, unlike mounted file + # systems. + # + # For convenience though, calling bucket, which will initialize and create + # (if needed) the s3 bucket to be used for this instance. + def mkdir(_dir) + bucket + end + + def bucket + @bucket ||= s3.bucket(bucket_name).tap do |bucket| + if bucket.exists? + logger.debug("Found bucket #{bucket_name}") + else + logger.debug("Bucket #{bucket_name} does not exist, creating.") + bucket.create + end + end + end + + private + + def s3 + require 'aws-sdk' + require 'util/extensions/aws-sdk/s3_upload_stream_patch' + @s3 ||= Aws::S3::Resource.new(:region => @settings[:region], + :access_key_id => @settings[:username], + :secret_access_key => @settings[:password]) + end + + def with_standard_s3_error_handling(action, object) + yield + rescue Aws::S3::Errors::AccessDenied, Aws::S3::Errors::Forbidden => err + logger.error("Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}") + msg = "Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}" + raise err, msg, err.backtrace + rescue => err + logger.error("Error #{action} #{object} from S3. #{err}") + msg = "Error #{action} #{object} from S3. #{err}" + raise err, msg, err.backtrace + end +end diff --git a/spec/util/miq_file_storage_spec.rb b/spec/util/miq_file_storage_spec.rb new file mode 100644 index 000000000..add48b824 --- /dev/null +++ b/spec/util/miq_file_storage_spec.rb @@ -0,0 +1,399 @@ +require "util/mount/miq_generic_mount_session" + +describe MiqFileStorage do + def opts_for_nfs + opts[:uri] = "nfs://example.com/share/path/to/file.txt" + end + + def opts_for_smb + opts[:uri] = "smb://example.com/share/path/to/file.txt" + opts[:username] = "user" + opts[:password] = "pass" + end + + def opts_for_glusterfs + opts[:uri] = "glusterfs://example.com/share/path/to/file.txt" + end + + def opts_for_fakefs + opts[:uri] = "foo://example.com/share/path/to/file.txt" + end + + describe ".with_interface_class" do + let(:opts) { {} } + + shared_examples ".with_interface_class implementation" do |class_name| + let(:klass) { Object.const_get(class_name) } + + it "instanciates as #{class_name}" do + interface_instance = described_class.with_interface_class(opts) + expect(interface_instance.class).to eq(klass) + end + + it "with a block, passes the instance, and returns the result" do + instance_double = double(class_name.to_s) + interface_block = ->(instance) { instance.add } + + expect(klass).to receive(:new).and_return(instance_double) + expect(instance_double).to receive(:add).and_return(:foo) + + expect(described_class.with_interface_class(opts, &interface_block)).to eq(:foo) + end + end + + context "with a nil uri" do + it "returns nil" do + expect(described_class.with_interface_class(opts)).to eq(nil) + end + end + + context "with an nfs:// uri" do + before { opts_for_nfs } + + include_examples ".with_interface_class implementation", "MiqNfsSession" + end + + context "with an smb:// uri" do + before { opts_for_smb } + + include_examples ".with_interface_class implementation", "MiqSmbSession" + end + + context "with an glusterfs:// uri" do + before { opts_for_glusterfs } + + include_examples ".with_interface_class implementation", "MiqGlusterfsSession" + end + + context "with an unknown uri scheme" do + before { opts_for_fakefs } + + it "raises an MiqFileStorage::InvalidSchemeError" do + valid_schemes = MiqFileStorage.storage_interface_classes.keys + error_class = MiqFileStorage::InvalidSchemeError + error_message = "foo is not a valid MiqFileStorage uri scheme. Accepted schemes are #{valid_schemes}" + + expect { described_class.with_interface_class(opts) }.to raise_error(error_class).with_message(error_message) + end + end + end + + ##### Interface Methods ##### + + describe MiqFileStorage::Interface do + shared_examples "an interface method" do |method_str, *args| + subject { method_str[0] == "#" ? described_class.new : described_class } + let(:method) { method_str[1..-1] } + + it "raises NotImplementedError" do + expected_error_message = "MiqFileStorage::Interface#{method_str} is not defined" + expect { subject.send(method, *args) }.to raise_error(NotImplementedError, expected_error_message) + end + end + + shared_examples "upload functionality" do |method| + let(:local_io) { IO.pipe.first } + let(:remote_file_path) { "baz/bar/foo" } + let(:byte_count) { 1234 } + let(:args) { [local_io, remote_file_path] } + + before do + subject.instance_variable_set(:@position, 0) + expect(subject).to receive(:initialize_upload_vars).with(*args).and_call_original + expect(subject).to receive(:handle_io_block).with(no_args) + expect(subject).to receive(:mkdir).with("baz/bar") + end + + it "resets all vars" do + subject.instance_variable_set(:@position, 10) + subject.instance_variable_set(:@byte_count, 10) + allow(subject).to receive(:upload_single) + allow(subject).to receive(:upload_splits) + + subject.send(method, *args) + + expect(subject.instance_variable_get(:@position)).to be nil + expect(subject.byte_count).to be nil + expect(subject.remote_file_path).to be nil + expect(subject.source_input).to be nil + expect(subject.input_writer).to be nil + end + + context "without a byte_count" do + it "calls #upload_single" do + expect(subject).to receive(:upload_single).with(remote_file_path).once + expect(subject).to receive(:upload_splits).never + subject.send(method, *args) + end + end + + context "with a byte_count" do + let(:args) { [local_io, remote_file_path, byte_count] } + + it "calls #upload_splits" do + expect(subject).to receive(:upload_splits).once + expect(subject).to receive(:upload_single).never + subject.send(method, *args) + end + end + end + + describe "#add" do + include_examples "upload functionality", :add + end + + describe "#upload" do + include_examples "upload functionality", :upload + end + + describe "#mkdir" do + it_behaves_like "an interface method", "#mkdir", "foo/bar/baz" + end + + describe "#upload_single" do + it_behaves_like "an interface method", "#upload_single", "path/to/file" + end + + describe "#download_single" do + it_behaves_like "an interface method", "#download_single", "nfs://1.2.3.4/foo", "foo" + end + + describe ".new_with_opts" do + it_behaves_like "an interface method", ".new_with_opts", {} + end + + describe ".uri_scheme" do + it "returns nil by default" do + expect(described_class.uri_scheme).to eq(nil) + end + end + + describe "#upload_splits" do + let(:file_name) { "path/to/file" } + + it "uploads multiple files of the byte count size" do + subject.instance_variable_set(:@position, 0) + subject.instance_variable_set(:@byte_count, 10) + subject.instance_variable_set(:@remote_file_path, file_name) + + source_input_stub = double('@source_input') + allow(subject).to receive(:source_input).and_return(source_input_stub) + allow(source_input_stub).to receive(:eof?).and_return(false, false, true) + + expect(subject).to receive(:upload_single).with("#{file_name}.00001") + expect(subject).to receive(:upload_single).with("#{file_name}.00002") + + subject.send(:upload_splits) + end + end + + describe "#initialize_upload_vars (private)" do + let(:local_io) { File.open(local_io_str) } + let(:local_io_str) { Tempfile.new.path } + let(:remote_path) { "/path/to/remote_file" } + let(:byte_count_int) { 1024 } + let(:byte_count_str) { "5M" } + let(:upload_args) { [] } + let(:pty_master) { double("pty_master") } + let(:pty_slave) { double("pty_slave") } + + before do + subject.send(:initialize_upload_vars, *upload_args) + end + after { FileUtils.rm_rf local_io_str } + + context "with byte_count passed" do + let(:upload_args) { [remote_path, byte_count_int] } + + it "assigns @byte_count to the parse value" do + expect(subject.byte_count).to eq(1024) + end + + it "assigns @remote_file_path" do + expect(subject.remote_file_path).to eq("/path/to/remote_file") + end + + it "assigns @source_input nil (set in #handle_io_block)" do + expect(subject.source_input).to eq(nil) + end + + it "assigns @input_writer nil (set in #handle_io_block)" do + expect(subject.input_writer).to eq(nil) + end + + context "with local_io as an IO object passed" do + let(:upload_args) { [local_io, remote_path, byte_count_str] } + + it "assigns @byte_count to the parse value" do + expect(subject.byte_count).to eq(5.megabytes) + end + + it "assigns @source_input to the passed value" do + expect(subject.source_input).to eq(local_io) + end + + it "@input_writer is nil" do + expect(subject.input_writer).to eq(nil) + end + end + + context "with local_io passed" do + let(:upload_args) { [local_io_str, remote_path, byte_count_str] } + + it "assigns @byte_count to the parse value" do + expect(subject.byte_count).to eq(5.megabytes) + end + + it "assigns @source_input to the passed value" do + expect(File.identical?(subject.source_input, local_io_str)).to be true + end + + it "@input_writer is nil" do + expect(subject.input_writer).to eq(nil) + end + end + end + + context "without byte_count passed" do + let(:upload_args) { [remote_path] } + + it "@byte_count is nil" do + expect(subject.byte_count).to eq(nil) + end + + it "assigns @remote_file_path" do + expect(subject.remote_file_path).to eq("/path/to/remote_file") + end + + it "assigns @source_input nil (set in #handle_io_block)" do + expect(subject.source_input).to eq(nil) + end + + it "assigns @input_writer nil (set in #handle_io_block)" do + expect(subject.input_writer).to eq(nil) + end + + context "with local_io passed" do + let(:upload_args) { [local_io, remote_path] } + + it "assigns @byte_count to the parse value" do + expect(subject.byte_count).to eq(nil) + end + + it "assigns @source_input to the passed value" do + expect(subject.source_input).to eq(local_io) + end + + it "@input_writer is nil" do + expect(subject.input_writer).to eq(nil) + end + end + + context "with local_io passed" do + let(:upload_args) { [local_io_str, remote_path] } + + it "assigns @byte_count to the parse value" do + expect(subject.byte_count).to eq(nil) + end + + it "assigns @source_input to the passed value" do + expect(File.identical?(subject.source_input, local_io_str)).to be true + end + + it "@input_writer is nil" do + expect(subject.input_writer).to eq(nil) + end + end + end + end + + describe "#parse_byte_value (private)" do + it "returns 2 for '2'" do + expect(subject.send(:parse_byte_value, "2")).to eq(2) + end + + it "returns 2048 for '2k'" do + expect(subject.send(:parse_byte_value, "2k")).to eq(2048) + end + + it "returns 1536 for '1.5K'" do + expect(subject.send(:parse_byte_value, "1.5K")).to eq(1536) + end + + it "returns 3145728 for '3M'" do + expect(subject.send(:parse_byte_value, "3M")).to eq(3.megabytes) + end + + it "returns 1073741824 for '1g'" do + expect(subject.send(:parse_byte_value, "1g")).to eq(1.gigabyte) + end + + it "returns nil for nil" do + expect(subject.send(:parse_byte_value, nil)).to eq(nil) + end + + it "returns 100 for 100 (integer)" do + expect(subject.send(:parse_byte_value, 100)).to eq(100) + end + end + + describe "#handle_io_block" do + let(:input_writer) { Tempfile.new } + let(:source_input) { Tempfile.new } + + after do + input_writer.unlink + source_input.unlink + end + + context "with a block" do + let(:block) { ->(_input_writer) { sleep 0.1 } } + + before do + expect(File).to receive(:mkfifo) + expect(File).to receive(:open).and_return(source_input, input_writer) + end + + it "creates a thread for handling the input IO" do + thread_count = Thread.list.count + thread = subject.send(:handle_io_block, &block) + expect(Thread.list.count).to eq(thread_count + 1) + thread.join + end + + it "closes input_writer" do + expect(input_writer.closed?).to eq(false) + thread = subject.send(:handle_io_block, &block) + thread.join + expect(input_writer.closed?).to eq(true) + end + end + + context "without a block" do + it "doesn't create a new thread for IO generation" do + thread_count = Thread.list.count + nil_result = subject.send(:handle_io_block) + + expect(nil_result).to be(nil) + expect(Thread.list.count).to eq(thread_count) + end + end + + context "with a block that causes an error" do + let(:err_block) { ->(_input_writer) { raise "err-mah-gerd" } } + + before do + expect(File).to receive(:mkfifo) + expect(File).to receive(:open).and_return(source_input, input_writer) + end + + it "does not hang the process and closes the writer" do + expect(input_writer.closed?).to eq(false) + thread = subject.send(:handle_io_block, &err_block) + expect { thread.join }.to raise_error StandardError + expect(input_writer.closed?).to eq(true) + end + end + end + end +end diff --git a/spec/util/mount/miq_local_mount_session_spec.rb b/spec/util/mount/miq_local_mount_session_spec.rb new file mode 100644 index 000000000..ed9892f43 --- /dev/null +++ b/spec/util/mount/miq_local_mount_session_spec.rb @@ -0,0 +1,111 @@ +require 'util/mount/miq_local_mount_session' +require 'tempfile' + +describe MiqLocalMountSession do + shared_context "generated tmp files" do + let!(:tmpfile_size) { 10.megabytes } + let!(:source_path) { Pathname.new(source_file.path) } + let!(:source_file) do + Tempfile.new("source_file").tap do |file| + file.write("0" * tmpfile_size) + file.close + end + end + let!(:dest_path) do + Pathname.new(Dir::Tmpname.create("") {}) + end + + after do + source_file.unlink + Dir["#{source_path.expand_path}.*"].each do |file| + File.delete(file) + end + end + end + + subject { described_class.new(:uri => "file://") } + + describe "#add" do + include_context "generated tmp files" + + it "copies single files" do + expect(subject.add(source_path.to_s, dest_path.to_s)).to eq(dest_path.to_s) + expect(File.exist?(dest_path)).to be true + expect(Pathname.new(dest_path).lstat.size).to eq(10.megabytes) + end + + it "copies file to splits" do + expected_splitfiles = (1..5).map do |suffix| + source_path.dirname.join("#{dest_path.basename}.0000#{suffix}") + end + + File.open(source_path) do |f| # with an IO object this time + subject.add(f, dest_path.to_s, "2M") + end + + expected_splitfiles.each do |filename| + expect(File.exist?(filename)).to be true + expect(Pathname.new(filename).lstat.size).to eq(2.megabytes) + end + end + + it "can take input from a command" do + expected_splitfiles = (1..5).map do |suffix| + source_path.dirname.join("#{dest_path.basename}.0000#{suffix}") + end + + subject.add(dest_path.to_s, "2M") do |input_writer| + `#{Gem.ruby} -e "File.write('#{input_writer}', '0' * #{tmpfile_size})"` + end + + expected_splitfiles.each do |filename| + expect(File.exist?(filename)).to be true + expect(Pathname.new(filename).lstat.size).to eq(2.megabytes) + end + end + + context "with a slightly smaller input file than 10MB" do + let(:tmpfile_size) { 10.megabytes - 1.kilobyte } + + it "properly chunks the file" do + expected_splitfiles = (1..10).map do |suffix| + name = "#{dest_path.basename}.%05d" % {:suffix => suffix} + source_path.dirname.join(name) + end + + # using pathnames this time + subject.add(source_path, dest_path.to_s, 1.megabyte) + + expected_splitfiles[0, 9].each do |filename| + expect(File.exist?(filename)).to be true + expect(Pathname.new(filename).lstat.size).to eq(1.megabyte) + end + + last_split = expected_splitfiles.last + expect(File.exist?(last_split)).to be true + expect(Pathname.new(last_split).lstat.size).to eq(1.megabyte - 1.kilobyte) + end + end + end + + describe "#download" do + include_context "generated tmp files" + + it "downloads the file" do + subject.download(dest_path.to_s, source_path.to_s) + expect(File.exist?(dest_path)).to be true + expect(Pathname.new(dest_path).lstat.size).to eq(10.megabytes) + end + + it "can take input from a command" do + source_data = nil + subject.download(nil, source_path) do |input_writer| + source_data = `#{Gem.ruby} -e "print File.read('#{input_writer}')"` + end + + expect(File.exist?(dest_path)).to be false + expect(source_data.size).to eq(10.megabytes) + expect(source_data).to eq(File.read(source_path)) + end + end +end diff --git a/spec/util/mount/miq_s3_session_spec.rb b/spec/util/mount/miq_s3_session_spec.rb deleted file mode 100644 index fc1112f14..000000000 --- a/spec/util/mount/miq_s3_session_spec.rb +++ /dev/null @@ -1,51 +0,0 @@ -require "util/mount/miq_s3_session" - -describe MiqS3Session do - before(:each) do - @uri = "s3://tmp/abc/def" - @session = described_class.new(:uri => @uri, :username => 'user', :password => 'pass', :region => 'region') - @session.connect - end - - after(:each) do - @session.disconnect - end - - it "#connect returns a string pointing to the mount point" do - allow(described_class).to receive(:raw_disconnect) - @session.logger = Logger.new("/dev/null") - @session.disconnect - - result = @session.connect - expect(result).to be_kind_of(String) - expect(result).to_not be_blank - end - - it "#mount_share is unique" do - expect(@session.mount_share).to_not eq(described_class.new(:uri => @uri, :username => 'user', :password => 'pass', :region => 'region').mount_share) - end - - it ".runcmd will retry with sudo if needed" do - cmd = "mount X Y" - expect(described_class).to receive(:`).once.with("#{cmd} 2>&1") - expect(described_class).to receive(:`).with("sudo #{cmd} 2>&1") - expect($CHILD_STATUS).to receive(:exitstatus).once.and_return(1) - - described_class.runcmd(cmd) - end - - it "#@mnt_point starts with '/tmp/miq_'" do - result = @session.mnt_point - expect(result).to start_with("/tmp/miq_") - end - - it "#uri_to_local_path returns a new local path" do - result = @session.uri_to_local_path(@uri) - expect(result).to match(/^\/tmp\/miq_.*\/tmp\/abc\/def$/) - end - - it "#uri_to_object_path returns a new object path" do - result = @session.uri_to_object_path(@uri) - expect(result).to eq("abc/def") - end -end diff --git a/spec/util/object_storage/miq_s3_storage_spec.rb b/spec/util/object_storage/miq_s3_storage_spec.rb new file mode 100644 index 000000000..1d62c998a --- /dev/null +++ b/spec/util/object_storage/miq_s3_storage_spec.rb @@ -0,0 +1,13 @@ +require "util/object_storage/miq_s3_storage" + +describe MiqS3Storage do + before(:each) do + @uri = "s3://tmp/abc/def" + @session = described_class.new(:uri => @uri, :username => 'user', :password => 'pass', :region => 'region') + end + + it "#uri_to_object_path returns a new object path" do + result = @session.uri_to_object_key(@uri) + expect(result).to eq("abc/def") + end +end