From 1c5c8c575cd8f13e25548d3ed44fc8675efb4094 Mon Sep 17 00:00:00 2001 From: Nick LaMuro Date: Fri, 3 Aug 2018 18:55:02 -0500 Subject: [PATCH 1/5] Add MiqFileStorage and MiqFileStorage::Interface `MiqFileStorage` is meant to be an top level class for fetching classes that interface with `MiqFileStorage::Interface`. `MiqFileStorage::Interface` is meant to be the super class for all of the class to inherit from, to make sure that they conform to a spec that `MiqFileStorage` can work with. --- lib/gems/pending/util/miq_file_storage.rb | 254 +++++++++++ .../util/mount/miq_generic_mount_session.rb | 9 +- .../util/mount/miq_glusterfs_session.rb | 4 + .../pending/util/mount/miq_nfs_session.rb | 4 + .../pending/util/mount/miq_smb_session.rb | 4 + spec/util/miq_file_storage_spec.rb | 399 ++++++++++++++++++ 6 files changed, 672 insertions(+), 2 deletions(-) create mode 100644 lib/gems/pending/util/miq_file_storage.rb create mode 100644 spec/util/miq_file_storage_spec.rb diff --git a/lib/gems/pending/util/miq_file_storage.rb b/lib/gems/pending/util/miq_file_storage.rb new file mode 100644 index 000000000..4b5a7b4e8 --- /dev/null +++ b/lib/gems/pending/util/miq_file_storage.rb @@ -0,0 +1,254 @@ +# This class is meant to be a abstract interface for defining a file_storage +# class. +# +# The storage class can either be of a type of "object storage", which includes: +# * protocols like FTP +# * document storage like s3 and OpenStack's Swift +# +# And mountable filesystems like: +# * NFS +# * SMB +# +# The class is meant to allow a shared interface for working with these +# different forms of file storage, while maintaining their differences in +# implementation where necessary. Connection will be handled separately by the +# subclasses, but they must conform to the top level interface. +# +class MiqFileStorage + class InvalidSchemeError < ArgumentError + def initialize(bad_scheme = nil) + super(error_message(bad_scheme)) + end + + def error_message(bad_scheme) + valid_schemes = ::MiqFileStorage.storage_interface_classes.keys.inspect + "#{bad_scheme} is not a valid MiqFileStorage uri scheme. Accepted schemes are #{valid_schemes}" + end + end + + def self.with_interface_class(opts) + klass = fetch_interface_class(opts) + block_given? ? yield(klass) : klass + end + + def self.fetch_interface_class(opts) + return nil unless opts[:uri] + + require 'uri' + scheme, _ = URI.split(URI::DEFAULT_PARSER.escape(opts[:uri])) + klass = storage_interface_classes[scheme] + + raise InvalidSchemeError, scheme if klass.nil? + + klass.new_with_opts(opts) + end + private_class_method :fetch_interface_class + + def self.storage_interface_classes + @storage_interface_classes ||= Interface.descendants.each_with_object({}) do |klass, memo| + memo[klass.uri_scheme] = klass if klass.uri_scheme + end + end + + class Interface + BYTE_HASH_MATCH = /^(?\d+(\.\d+)?)\s*(?K|M|G)?$/i + BYTE_HASH = { + "k" => 1.kilobyte, + "m" => 1.megabyte, + "g" => 1.gigabyte + }.freeze + + attr_reader :remote_file_path, :byte_count, :source_input, :input_writer + + def self.new_with_opts(opts) # rubocop:disable Lint/UnusedMethodArgument + raise NotImplementedError, "#{name}.new_with_opts is not defined" + end + + def self.uri_scheme + nil + end + + # :call-seq: + # add( remote_uri ) { |input_writer| ... } + # add( remote_uri, byte_count ) { |input_writer| ... } + # + # add( local_io, remote_uri ) + # add( local_io, remote_uri, byte_count ) + # + # Add a file to the destination URI. + # + # In the block form of the method, only the remote_uri is required, and it + # is assumed the input will be a generated in the executed block (most + # likely an external process) to a unix pipe that can be written to. The + # pipe generated by this method and passed in to the block as a file + # location to the `input_stream`). + # + # In the non-block form, a source must be provided as the first argument + # either as an IO object that can be read from, or a file path, and the + # second argument is the remote_uri as in the block form. + # + # An additional argument in both forms as the last argument is `byte_count` + # can also be included. If passed, it will be assumed that the resulting + # input will be split, and the naming for the splits will be: + # + # - filename.00001 + # - filename.00002 + # ... + # + # Block form: + # + # nfs_session.add("path/to/file", "200M") do |input_stream| + # `pg_dump -f #{input_stream} vmdb_production` + # end + # + # Non-block form: + # + # nfs_session.add("path/to/local_file", "path/to/remote_file") + # nfs_session.add("path/to/local_file", "path/to/remote_file", "200M") + # + def add(*upload_args, &block) + initialize_upload_vars(*upload_args) + mkdir(File.dirname(@remote_file_path)) + thread = handle_io_block(&block) + result = if byte_count + upload_splits + else + upload_single(@remote_file_path) + end + # `.join` will raise any errors from the thread, so we want to do that + # here (if a thread exists of course). + thread.join if thread + result + ensure + reset_vars + end + alias upload add + + def mkdir(dir) # rubocop:disable Lint/UnusedMethodArgument + raise NotImplementedError, "#{self.class}##{__callee__} is not defined" + end + + # :call-seq: + # download( local_io, remote_uri ) + # download( nil, remote_uri ) { |input_writer| ... } + # + # Download a file from a remote uri. + # + # In non-block form, the remote_uri is saved to the local_io. + # + # In block form, the local_io is omitted, and it is set to a PTY writer + # path that will assumed to be read by the block provided. + def download(local_file, remote_file_uri, &block) + @remote_file_path = remote_file_uri + if block_given? + thread = handle_io_block(&block) + download_single(remote_file_uri, input_writer) + input_writer.close + thread.join + else + download_single(remote_file_uri, local_file) + end + ensure + reset_vars + end + + private + + # NOTE: Needs to be overwritten in the subclass! + # + # Classes that inherit from `MiqFileStorage` need to make sure to create a + # method that overwrites this one to handle the specifics of uploading for + # their particular ObjectStore protocol or MountSession. + # + # `dest_uri` is the current file that will be uploaded. If file splitting + # is occurring, this will update the filename passed into `.add` to include + # a `.0000X` suffix, where the suffix is padded up to 5 digits in total. + # + # `#upload_single` doesn't need to worry about determining the file name + # itself for splitting, but if any relative path munging is necessary, that + # should be done here (see `MiqGenericMountSession#upload_single` for an + # example) + # + # `source_input` available as an attr_reader in this method, and will + # always be a local IO object that is available for reading. + # + # `byte_count` is also an attr_reader that is available, and will either be + # `nil` if no file splitting is occurring, or a integer representing the + # maximum number of bytes to uploaded for this particular `dest_uri`. + # + # + # Ideally, making use of `IO.copy_stream` will simplify this process + # significantly, as you can pass it `source_input`, `dest_uri`, and + # `byte_count` respectively, and it will automatically handle streaming the + # data from one IO object to the other. In mount based situations, where + # `dest_uri` is a file path (in `MiqGenericMountSession#upload_single`, + # this is converted to `relpath`), this does not need to be converted to a + # `File` IO object as `IO.copy_stream` will do that for you. + def upload_single(dest_uri) # rubocop:disable Lint/UnusedMethodArgument + raise NotImplementedError, "#{self.class}#upload_single is not defined" + end + + def upload_splits + @position = 0 + until source_input.eof? + upload_single(next_split_filename) + @position += byte_count + end + end + + def initialize_upload_vars(*upload_args) + upload_args.pop if (@byte_count = parse_byte_value(upload_args.last)) + @remote_file_path = upload_args.pop + + unless upload_args.empty? + source = upload_args.pop + @source_input = source.kind_of?(IO) ? source : File.open(source, "r") + end + end + + def parse_byte_value(bytes) + match = bytes.to_s.match(BYTE_HASH_MATCH) || return + + bytes = match[:BYTE_NUM].to_f + if match[:BYTE_QUALIFIER] + bytes *= BYTE_HASH[match[:BYTE_QUALIFIER].downcase] + end + bytes.to_i + end + + def handle_io_block + if block_given? + require "tmpdir" + + # create pathname, but don't create the file for it (next line) + fifo_path = Pathname.new(Dir::Tmpname.create("") {}) + File.mkfifo(fifo_path) + + # For #Reasons(TM), the reader must be opened first + @source_input = File.open(fifo_path.to_s, IO::RDONLY | IO::NONBLOCK) + @input_writer = File.open(fifo_path.to_s, IO::WRONLY | IO::NONBLOCK) + + Thread.new do + begin + yield fifo_path # send the path to the block to get executed + ensure + @input_writer.close # close the file so we know we hit EOF (for #add) + end + end + end + end + + def reset_vars + File.delete(@input_writer.path) if @input_writer + @position, @byte_count, @remote_file_path, @source_input, @input_writer = nil + end + + def next_split_filename + "#{remote_file_path}.#{'%05d' % (@position / byte_count + 1)}" + end + + def download_single(source, destination) # rubocop:disable Lint/UnusedMethodArgument + raise NotImplementedError, "#{self.class}#download_single is not defined" + end + end +end diff --git a/lib/gems/pending/util/mount/miq_generic_mount_session.rb b/lib/gems/pending/util/mount/miq_generic_mount_session.rb index bb36a3bb4..6151a754f 100644 --- a/lib/gems/pending/util/mount/miq_generic_mount_session.rb +++ b/lib/gems/pending/util/mount/miq_generic_mount_session.rb @@ -6,8 +6,9 @@ require 'util/miq-exception' require 'util/miq-uuid' +require 'util/miq_file_storage' -class MiqGenericMountSession +class MiqGenericMountSession < MiqFileStorage::Interface require 'util/mount/miq_nfs_session' require 'util/mount/miq_s3_session' require 'util/mount/miq_smb_session' @@ -54,11 +55,15 @@ def self.in_depot_session(opts, &_block) def self.new_session(opts) klass = uri_scheme_to_class(opts[:uri]) - session = klass.new(opts.slice(:uri, :username, :password, :region)) + session = klass.new_with_opts(opts) session.connect session end + def self.new_with_opts(opts) + new(opts.slice(:uri, :username, :password, :region)) + end + def self.uri_scheme_to_class(uri) require 'uri' scheme, userinfo, host, port, registry, share, opaque, query, fragment = URI.split(URI.encode(uri)) diff --git a/lib/gems/pending/util/mount/miq_glusterfs_session.rb b/lib/gems/pending/util/mount/miq_glusterfs_session.rb index 6c1882535..d550963a8 100644 --- a/lib/gems/pending/util/mount/miq_glusterfs_session.rb +++ b/lib/gems/pending/util/mount/miq_glusterfs_session.rb @@ -3,6 +3,10 @@ class MiqGlusterfsSession < MiqGenericMountSession PORTS = [2049, 111].freeze + def self.uri_scheme + "glusterfs".freeze + end + def initialize(log_settings) super(log_settings.merge(:ports => PORTS)) end diff --git a/lib/gems/pending/util/mount/miq_nfs_session.rb b/lib/gems/pending/util/mount/miq_nfs_session.rb index eb52a4c3f..920366575 100644 --- a/lib/gems/pending/util/mount/miq_nfs_session.rb +++ b/lib/gems/pending/util/mount/miq_nfs_session.rb @@ -3,6 +3,10 @@ class MiqNfsSession < MiqGenericMountSession PORTS = [2049, 111] + def self.uri_scheme + "nfs".freeze + end + def initialize(log_settings) super(log_settings.merge(:ports => PORTS)) end diff --git a/lib/gems/pending/util/mount/miq_smb_session.rb b/lib/gems/pending/util/mount/miq_smb_session.rb index e6c42124c..535c94adc 100644 --- a/lib/gems/pending/util/mount/miq_smb_session.rb +++ b/lib/gems/pending/util/mount/miq_smb_session.rb @@ -3,6 +3,10 @@ class MiqSmbSession < MiqGenericMountSession PORTS = [445, 139] + def self.uri_scheme + "smb".freeze + end + def initialize(log_settings) super(log_settings.merge(:ports => PORTS)) raise "username is a required value!" if @settings[:username].nil? diff --git a/spec/util/miq_file_storage_spec.rb b/spec/util/miq_file_storage_spec.rb new file mode 100644 index 000000000..add48b824 --- /dev/null +++ b/spec/util/miq_file_storage_spec.rb @@ -0,0 +1,399 @@ +require "util/mount/miq_generic_mount_session" + +describe MiqFileStorage do + def opts_for_nfs + opts[:uri] = "nfs://example.com/share/path/to/file.txt" + end + + def opts_for_smb + opts[:uri] = "smb://example.com/share/path/to/file.txt" + opts[:username] = "user" + opts[:password] = "pass" + end + + def opts_for_glusterfs + opts[:uri] = "glusterfs://example.com/share/path/to/file.txt" + end + + def opts_for_fakefs + opts[:uri] = "foo://example.com/share/path/to/file.txt" + end + + describe ".with_interface_class" do + let(:opts) { {} } + + shared_examples ".with_interface_class implementation" do |class_name| + let(:klass) { Object.const_get(class_name) } + + it "instanciates as #{class_name}" do + interface_instance = described_class.with_interface_class(opts) + expect(interface_instance.class).to eq(klass) + end + + it "with a block, passes the instance, and returns the result" do + instance_double = double(class_name.to_s) + interface_block = ->(instance) { instance.add } + + expect(klass).to receive(:new).and_return(instance_double) + expect(instance_double).to receive(:add).and_return(:foo) + + expect(described_class.with_interface_class(opts, &interface_block)).to eq(:foo) + end + end + + context "with a nil uri" do + it "returns nil" do + expect(described_class.with_interface_class(opts)).to eq(nil) + end + end + + context "with an nfs:// uri" do + before { opts_for_nfs } + + include_examples ".with_interface_class implementation", "MiqNfsSession" + end + + context "with an smb:// uri" do + before { opts_for_smb } + + include_examples ".with_interface_class implementation", "MiqSmbSession" + end + + context "with an glusterfs:// uri" do + before { opts_for_glusterfs } + + include_examples ".with_interface_class implementation", "MiqGlusterfsSession" + end + + context "with an unknown uri scheme" do + before { opts_for_fakefs } + + it "raises an MiqFileStorage::InvalidSchemeError" do + valid_schemes = MiqFileStorage.storage_interface_classes.keys + error_class = MiqFileStorage::InvalidSchemeError + error_message = "foo is not a valid MiqFileStorage uri scheme. Accepted schemes are #{valid_schemes}" + + expect { described_class.with_interface_class(opts) }.to raise_error(error_class).with_message(error_message) + end + end + end + + ##### Interface Methods ##### + + describe MiqFileStorage::Interface do + shared_examples "an interface method" do |method_str, *args| + subject { method_str[0] == "#" ? described_class.new : described_class } + let(:method) { method_str[1..-1] } + + it "raises NotImplementedError" do + expected_error_message = "MiqFileStorage::Interface#{method_str} is not defined" + expect { subject.send(method, *args) }.to raise_error(NotImplementedError, expected_error_message) + end + end + + shared_examples "upload functionality" do |method| + let(:local_io) { IO.pipe.first } + let(:remote_file_path) { "baz/bar/foo" } + let(:byte_count) { 1234 } + let(:args) { [local_io, remote_file_path] } + + before do + subject.instance_variable_set(:@position, 0) + expect(subject).to receive(:initialize_upload_vars).with(*args).and_call_original + expect(subject).to receive(:handle_io_block).with(no_args) + expect(subject).to receive(:mkdir).with("baz/bar") + end + + it "resets all vars" do + subject.instance_variable_set(:@position, 10) + subject.instance_variable_set(:@byte_count, 10) + allow(subject).to receive(:upload_single) + allow(subject).to receive(:upload_splits) + + subject.send(method, *args) + + expect(subject.instance_variable_get(:@position)).to be nil + expect(subject.byte_count).to be nil + expect(subject.remote_file_path).to be nil + expect(subject.source_input).to be nil + expect(subject.input_writer).to be nil + end + + context "without a byte_count" do + it "calls #upload_single" do + expect(subject).to receive(:upload_single).with(remote_file_path).once + expect(subject).to receive(:upload_splits).never + subject.send(method, *args) + end + end + + context "with a byte_count" do + let(:args) { [local_io, remote_file_path, byte_count] } + + it "calls #upload_splits" do + expect(subject).to receive(:upload_splits).once + expect(subject).to receive(:upload_single).never + subject.send(method, *args) + end + end + end + + describe "#add" do + include_examples "upload functionality", :add + end + + describe "#upload" do + include_examples "upload functionality", :upload + end + + describe "#mkdir" do + it_behaves_like "an interface method", "#mkdir", "foo/bar/baz" + end + + describe "#upload_single" do + it_behaves_like "an interface method", "#upload_single", "path/to/file" + end + + describe "#download_single" do + it_behaves_like "an interface method", "#download_single", "nfs://1.2.3.4/foo", "foo" + end + + describe ".new_with_opts" do + it_behaves_like "an interface method", ".new_with_opts", {} + end + + describe ".uri_scheme" do + it "returns nil by default" do + expect(described_class.uri_scheme).to eq(nil) + end + end + + describe "#upload_splits" do + let(:file_name) { "path/to/file" } + + it "uploads multiple files of the byte count size" do + subject.instance_variable_set(:@position, 0) + subject.instance_variable_set(:@byte_count, 10) + subject.instance_variable_set(:@remote_file_path, file_name) + + source_input_stub = double('@source_input') + allow(subject).to receive(:source_input).and_return(source_input_stub) + allow(source_input_stub).to receive(:eof?).and_return(false, false, true) + + expect(subject).to receive(:upload_single).with("#{file_name}.00001") + expect(subject).to receive(:upload_single).with("#{file_name}.00002") + + subject.send(:upload_splits) + end + end + + describe "#initialize_upload_vars (private)" do + let(:local_io) { File.open(local_io_str) } + let(:local_io_str) { Tempfile.new.path } + let(:remote_path) { "/path/to/remote_file" } + let(:byte_count_int) { 1024 } + let(:byte_count_str) { "5M" } + let(:upload_args) { [] } + let(:pty_master) { double("pty_master") } + let(:pty_slave) { double("pty_slave") } + + before do + subject.send(:initialize_upload_vars, *upload_args) + end + after { FileUtils.rm_rf local_io_str } + + context "with byte_count passed" do + let(:upload_args) { [remote_path, byte_count_int] } + + it "assigns @byte_count to the parse value" do + expect(subject.byte_count).to eq(1024) + end + + it "assigns @remote_file_path" do + expect(subject.remote_file_path).to eq("/path/to/remote_file") + end + + it "assigns @source_input nil (set in #handle_io_block)" do + expect(subject.source_input).to eq(nil) + end + + it "assigns @input_writer nil (set in #handle_io_block)" do + expect(subject.input_writer).to eq(nil) + end + + context "with local_io as an IO object passed" do + let(:upload_args) { [local_io, remote_path, byte_count_str] } + + it "assigns @byte_count to the parse value" do + expect(subject.byte_count).to eq(5.megabytes) + end + + it "assigns @source_input to the passed value" do + expect(subject.source_input).to eq(local_io) + end + + it "@input_writer is nil" do + expect(subject.input_writer).to eq(nil) + end + end + + context "with local_io passed" do + let(:upload_args) { [local_io_str, remote_path, byte_count_str] } + + it "assigns @byte_count to the parse value" do + expect(subject.byte_count).to eq(5.megabytes) + end + + it "assigns @source_input to the passed value" do + expect(File.identical?(subject.source_input, local_io_str)).to be true + end + + it "@input_writer is nil" do + expect(subject.input_writer).to eq(nil) + end + end + end + + context "without byte_count passed" do + let(:upload_args) { [remote_path] } + + it "@byte_count is nil" do + expect(subject.byte_count).to eq(nil) + end + + it "assigns @remote_file_path" do + expect(subject.remote_file_path).to eq("/path/to/remote_file") + end + + it "assigns @source_input nil (set in #handle_io_block)" do + expect(subject.source_input).to eq(nil) + end + + it "assigns @input_writer nil (set in #handle_io_block)" do + expect(subject.input_writer).to eq(nil) + end + + context "with local_io passed" do + let(:upload_args) { [local_io, remote_path] } + + it "assigns @byte_count to the parse value" do + expect(subject.byte_count).to eq(nil) + end + + it "assigns @source_input to the passed value" do + expect(subject.source_input).to eq(local_io) + end + + it "@input_writer is nil" do + expect(subject.input_writer).to eq(nil) + end + end + + context "with local_io passed" do + let(:upload_args) { [local_io_str, remote_path] } + + it "assigns @byte_count to the parse value" do + expect(subject.byte_count).to eq(nil) + end + + it "assigns @source_input to the passed value" do + expect(File.identical?(subject.source_input, local_io_str)).to be true + end + + it "@input_writer is nil" do + expect(subject.input_writer).to eq(nil) + end + end + end + end + + describe "#parse_byte_value (private)" do + it "returns 2 for '2'" do + expect(subject.send(:parse_byte_value, "2")).to eq(2) + end + + it "returns 2048 for '2k'" do + expect(subject.send(:parse_byte_value, "2k")).to eq(2048) + end + + it "returns 1536 for '1.5K'" do + expect(subject.send(:parse_byte_value, "1.5K")).to eq(1536) + end + + it "returns 3145728 for '3M'" do + expect(subject.send(:parse_byte_value, "3M")).to eq(3.megabytes) + end + + it "returns 1073741824 for '1g'" do + expect(subject.send(:parse_byte_value, "1g")).to eq(1.gigabyte) + end + + it "returns nil for nil" do + expect(subject.send(:parse_byte_value, nil)).to eq(nil) + end + + it "returns 100 for 100 (integer)" do + expect(subject.send(:parse_byte_value, 100)).to eq(100) + end + end + + describe "#handle_io_block" do + let(:input_writer) { Tempfile.new } + let(:source_input) { Tempfile.new } + + after do + input_writer.unlink + source_input.unlink + end + + context "with a block" do + let(:block) { ->(_input_writer) { sleep 0.1 } } + + before do + expect(File).to receive(:mkfifo) + expect(File).to receive(:open).and_return(source_input, input_writer) + end + + it "creates a thread for handling the input IO" do + thread_count = Thread.list.count + thread = subject.send(:handle_io_block, &block) + expect(Thread.list.count).to eq(thread_count + 1) + thread.join + end + + it "closes input_writer" do + expect(input_writer.closed?).to eq(false) + thread = subject.send(:handle_io_block, &block) + thread.join + expect(input_writer.closed?).to eq(true) + end + end + + context "without a block" do + it "doesn't create a new thread for IO generation" do + thread_count = Thread.list.count + nil_result = subject.send(:handle_io_block) + + expect(nil_result).to be(nil) + expect(Thread.list.count).to eq(thread_count) + end + end + + context "with a block that causes an error" do + let(:err_block) { ->(_input_writer) { raise "err-mah-gerd" } } + + before do + expect(File).to receive(:mkfifo) + expect(File).to receive(:open).and_return(source_input, input_writer) + end + + it "does not hang the process and closes the writer" do + expect(input_writer.closed?).to eq(false) + thread = subject.send(:handle_io_block, &err_block) + expect { thread.join }.to raise_error StandardError + expect(input_writer.closed?).to eq(true) + end + end + end + end +end From e1b467becb8504a9989f1f24db61b55120dff043 Mon Sep 17 00:00:00 2001 From: Nick LaMuro Date: Fri, 10 Aug 2018 19:28:19 -0500 Subject: [PATCH 2/5] Update MiqGenericMountSession to MiqFileStorage::Interface --- .../util/mount/miq_generic_mount_session.rb | 54 +++++++++++-------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/lib/gems/pending/util/mount/miq_generic_mount_session.rb b/lib/gems/pending/util/mount/miq_generic_mount_session.rb index 6151a754f..73bbf7f37 100644 --- a/lib/gems/pending/util/mount/miq_generic_mount_session.rb +++ b/lib/gems/pending/util/mount/miq_generic_mount_session.rb @@ -251,55 +251,59 @@ def verify res end - def add(source, dest_uri) + def add(*upload_args) + dest_uri = nil log_header = "MIQ(#{self.class.name}-add)" - logger.info("#{log_header} Source: [#{source}], Destination: [#{dest_uri}]...") + # Don't think this log line is possible when using MiqFileStorage::Interface + # + # logger.info("#{log_header} Source: [#{source}], Destination: [#{dest_uri}]...") begin reconnect! - relpath = File.join(@mnt_point, relative_to_mount(dest_uri)) - if File.exist?(relpath) - logger.info("#{log_header} Skipping add since URI: [#{dest_uri}] already exists") - return dest_uri - end - - logger.info("#{log_header} Building relative path: [#{relpath}]...") - FileUtils.mkdir_p(File.dirname(relpath)) - logger.info("#{log_header} Building relative path: [#{relpath}]...complete") - logger.info("#{log_header} Copying file [#{source}] to [#{relpath}]...") - FileUtils.cp(source, relpath) - logger.info("#{log_header} Copying file [#{source}] to [#{relpath}] complete") + dest_uri = super rescue => err - msg = "Adding [#{source}] to [#{dest_uri}], failed due to error: '#{err.message}'" + msg = "Adding [#{source_for_log}] to [#{remote_file_path}], failed due to error: '#{err.message}'" logger.error("#{log_header} #{msg}") raise ensure disconnect end - logger.info("#{log_header} File URI added: [#{dest_uri}] complete") + logger.info("#{log_header} File URI added: [#{remote_file_path}] complete") dest_uri end - alias_method :upload, :add + def upload_single(dest_uri) + log_header = "MIQ(#{self.class.name}-upload_single)" + relpath = uri_to_local_path(dest_uri) + if File.exist?(relpath) + logger.info("#{log_header} Skipping add since URI: [#{dest_uri}] already exists") + return dest_uri + end + + logger.info("#{log_header} Copying file [#{source_for_log}] to [#{dest_uri}]...") + IO.copy_stream(source_input, relpath, byte_count) + logger.info("#{log_header} Copying file [#{source_for_log}] to [#{dest_uri}] complete") + dest_uri + end - def download(local_file, remote_file) + def download_single(remote_file, local_file) log_header = "MIQ(#{self.class.name}-download)" logger.info("#{log_header} Target: [#{local_file}], Remote file: [#{remote_file}]...") begin reconnect! - relpath = File.join(@mnt_point, relative_to_mount(remote_file)) + relpath = File.join(mnt_point, relative_to_mount(remote_file)) unless File.exist?(relpath) logger.warn("#{log_header} Remote file: [#{remote_file}] does not exist!") return end logger.info("#{log_header} Copying file [#{relpath}] to [#{local_file}]...") - FileUtils.cp(relpath, local_file) + IO.copy_stream(relpath, local_file) logger.info("#{log_header} Copying file [#{relpath}] to [#{local_file}] complete") rescue => err msg = "Downloading [#{remote_file}] to [#{local_file}], failed due to error: '#{err.message}'" @@ -392,7 +396,11 @@ def glob(pattern) def mkdir(path) with_mounted_exception_handling do - FileUtils.mkdir_p("#{mount_root}/#{path}") + log_header = "MIQ(#{self.class.name}-mkdir)" + new_path = uri_to_local_path(path) + logger.info("#{log_header} Building relative path: [#{new_path}]...") + FileUtils.mkdir_p(new_path) + logger.info("#{log_header} Building relative path: [#{new_path}]...complete") end end @@ -482,4 +490,8 @@ def settings_mount_point return nil if @settings[:mount_point].blank? # Check if settings contains the mount_point to use FileUtils.mkdir_p(@settings[:mount_point]).first end + + def source_for_log + @input_writer ? "" : @source_input.path + end end From 1bd3433c5227f00e60111cfe125ff9b8b4808761 Mon Sep 17 00:00:00 2001 From: Nick LaMuro Date: Fri, 10 Aug 2018 21:14:20 -0500 Subject: [PATCH 3/5] Add MiqLocalMountSession This mount session is intended as being a pass through for when local files are used. The same API's for `#add` from MiqGenericMountSession are still usable for this class, and it will just write to the local file system instead. This also allows us to test the file splitting code without needing a mount session in place to do so. --- .../util/mount/miq_generic_mount_session.rb | 3 +- .../util/mount/miq_local_mount_session.rb | 28 +++++ .../mount/miq_local_mount_session_spec.rb | 111 ++++++++++++++++++ 3 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 lib/gems/pending/util/mount/miq_local_mount_session.rb create mode 100644 spec/util/mount/miq_local_mount_session_spec.rb diff --git a/lib/gems/pending/util/mount/miq_generic_mount_session.rb b/lib/gems/pending/util/mount/miq_generic_mount_session.rb index 73bbf7f37..e840242d0 100644 --- a/lib/gems/pending/util/mount/miq_generic_mount_session.rb +++ b/lib/gems/pending/util/mount/miq_generic_mount_session.rb @@ -9,6 +9,7 @@ require 'util/miq_file_storage' class MiqGenericMountSession < MiqFileStorage::Interface + require 'util/mount/miq_local_mount_session' require 'util/mount/miq_nfs_session' require 'util/mount/miq_s3_session' require 'util/mount/miq_smb_session' @@ -296,7 +297,7 @@ def download_single(remote_file, local_file) begin reconnect! - relpath = File.join(mnt_point, relative_to_mount(remote_file)) + relpath = uri_to_local_path(remote_file) unless File.exist?(relpath) logger.warn("#{log_header} Remote file: [#{remote_file}] does not exist!") return diff --git a/lib/gems/pending/util/mount/miq_local_mount_session.rb b/lib/gems/pending/util/mount/miq_local_mount_session.rb new file mode 100644 index 000000000..0395e7c20 --- /dev/null +++ b/lib/gems/pending/util/mount/miq_local_mount_session.rb @@ -0,0 +1,28 @@ +require 'util/mount/miq_generic_mount_session' + +# MiqLocalMountSession is meant to be a representation of the local file system +# that conforms to the same interface as MiqLocalMountSession (and by proxy, +# MiqFileSystem::Interface). +# +# See MiqGenericMountSession for info on methods available. +class MiqLocalMountSession < MiqGenericMountSession + def self.uri_scheme + "file".freeze + end + + # no-op these since they are not relavent to the local file system + # + # rubocop:disable Style/SingleLineMethods, Layout/EmptyLineBetweenDefs + def connect; end # :nodoc: + def disconnect; end # :nodoc: + def mount_share; end # :nodoc: + # rubocop:enable Style/SingleLineMethods, Layout/EmptyLineBetweenDefs + + def relative_to_mount(remote_file) # :nodoc: + remote_file + end + + def uri_to_local_path(remote_file) # :nodoc: + File.expand_path(remote_file) + end +end diff --git a/spec/util/mount/miq_local_mount_session_spec.rb b/spec/util/mount/miq_local_mount_session_spec.rb new file mode 100644 index 000000000..ed9892f43 --- /dev/null +++ b/spec/util/mount/miq_local_mount_session_spec.rb @@ -0,0 +1,111 @@ +require 'util/mount/miq_local_mount_session' +require 'tempfile' + +describe MiqLocalMountSession do + shared_context "generated tmp files" do + let!(:tmpfile_size) { 10.megabytes } + let!(:source_path) { Pathname.new(source_file.path) } + let!(:source_file) do + Tempfile.new("source_file").tap do |file| + file.write("0" * tmpfile_size) + file.close + end + end + let!(:dest_path) do + Pathname.new(Dir::Tmpname.create("") {}) + end + + after do + source_file.unlink + Dir["#{source_path.expand_path}.*"].each do |file| + File.delete(file) + end + end + end + + subject { described_class.new(:uri => "file://") } + + describe "#add" do + include_context "generated tmp files" + + it "copies single files" do + expect(subject.add(source_path.to_s, dest_path.to_s)).to eq(dest_path.to_s) + expect(File.exist?(dest_path)).to be true + expect(Pathname.new(dest_path).lstat.size).to eq(10.megabytes) + end + + it "copies file to splits" do + expected_splitfiles = (1..5).map do |suffix| + source_path.dirname.join("#{dest_path.basename}.0000#{suffix}") + end + + File.open(source_path) do |f| # with an IO object this time + subject.add(f, dest_path.to_s, "2M") + end + + expected_splitfiles.each do |filename| + expect(File.exist?(filename)).to be true + expect(Pathname.new(filename).lstat.size).to eq(2.megabytes) + end + end + + it "can take input from a command" do + expected_splitfiles = (1..5).map do |suffix| + source_path.dirname.join("#{dest_path.basename}.0000#{suffix}") + end + + subject.add(dest_path.to_s, "2M") do |input_writer| + `#{Gem.ruby} -e "File.write('#{input_writer}', '0' * #{tmpfile_size})"` + end + + expected_splitfiles.each do |filename| + expect(File.exist?(filename)).to be true + expect(Pathname.new(filename).lstat.size).to eq(2.megabytes) + end + end + + context "with a slightly smaller input file than 10MB" do + let(:tmpfile_size) { 10.megabytes - 1.kilobyte } + + it "properly chunks the file" do + expected_splitfiles = (1..10).map do |suffix| + name = "#{dest_path.basename}.%05d" % {:suffix => suffix} + source_path.dirname.join(name) + end + + # using pathnames this time + subject.add(source_path, dest_path.to_s, 1.megabyte) + + expected_splitfiles[0, 9].each do |filename| + expect(File.exist?(filename)).to be true + expect(Pathname.new(filename).lstat.size).to eq(1.megabyte) + end + + last_split = expected_splitfiles.last + expect(File.exist?(last_split)).to be true + expect(Pathname.new(last_split).lstat.size).to eq(1.megabyte - 1.kilobyte) + end + end + end + + describe "#download" do + include_context "generated tmp files" + + it "downloads the file" do + subject.download(dest_path.to_s, source_path.to_s) + expect(File.exist?(dest_path)).to be true + expect(Pathname.new(dest_path).lstat.size).to eq(10.megabytes) + end + + it "can take input from a command" do + source_data = nil + subject.download(nil, source_path) do |input_writer| + source_data = `#{Gem.ruby} -e "print File.read('#{input_writer}')"` + end + + expect(File.exist?(dest_path)).to be false + expect(source_data.size).to eq(10.megabytes) + expect(source_data).to eq(File.read(source_path)) + end + end +end From d79deabbfb80a6b5c3f0bec614aa223427d266d2 Mon Sep 17 00:00:00 2001 From: Nick LaMuro Date: Mon, 20 Aug 2018 16:42:21 -0500 Subject: [PATCH 4/5] Add MiqObjectStore This is the top level class for Object Storage based file storages, which will include types like s3 and FTP. Doesn't do much, but shared methods between all of the Object Storage classes will be put here. --- lib/gems/pending/util/miq_object_storage.rb | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 lib/gems/pending/util/miq_object_storage.rb diff --git a/lib/gems/pending/util/miq_object_storage.rb b/lib/gems/pending/util/miq_object_storage.rb new file mode 100644 index 000000000..845a0bbfd --- /dev/null +++ b/lib/gems/pending/util/miq_object_storage.rb @@ -0,0 +1,19 @@ +require 'util/miq_file_storage' + +class MiqObjectStorage < MiqFileStorage::Interface + attr_accessor :settings + attr_writer :logger + + def self.new_with_opts(opts) + new(opts.slice(:uri, :username, :password)) + end + + def initialize(settings) + raise "URI missing" unless settings.key?(:uri) + @settings = settings.dup + end + + def logger + @logger ||= $log.nil? ? :: Logger.new(STDOUT) : $log + end +end From f3e3e9c54f1e33f9ff046ab5cdc668c0012210d2 Mon Sep 17 00:00:00 2001 From: Nick LaMuro Date: Wed, 22 Aug 2018 00:50:54 -0500 Subject: [PATCH 5/5] Convert MiqS3Session to MiqS3Storage Converts MiqS3Session to an MiqObjectStorage subclass, MiqS3Storage. A decent amount here didn't change, but it updates the `#add` and `#download` methods to use the MiqFileStorage::Interface and instead override `#upload_single` and `#download_single`. Many of the tests didn't make it over since a lot of it was in regards to "mounting", which ObjectStorage classes don't do and is now obsolete, as well as cleaned up MiqGenericMountSession from any references that were s3 specific. Also dried up the code a bit from what was implemented in MiqS3Session. Regarding the elephant in the room... ------------------------------------- Yes, I backported Aws::S3::MultipartStreamUploader from the v3 portion of the aws-sdk as part of this commit. Some form of "upload streaming" was necessary for file splitting to work. I could have implemented something myself with probably a little less code, but what was already part of the v3 API, is probably tested extensively, and works with the v2 Seahorse client and lib made more sense to implement via this way instead of trying to role my own. It is a lot of extra code, but I have a flag in place for when we do upgrade to aws-sdk v3 in the future, an error should be raised in the test suite to have this patch removed. --- .../aws-sdk/s3_upload_stream_patch.rb | 281 ++++++++++++++++++ lib/gems/pending/util/miq_object_storage.rb | 2 + .../util/mount/miq_generic_mount_session.rb | 5 +- lib/gems/pending/util/mount/miq_s3_session.rb | 89 ------ .../util/object_storage/miq_s3_storage.rb | 101 +++++++ spec/util/mount/miq_s3_session_spec.rb | 51 ---- .../object_storage/miq_s3_storage_spec.rb | 13 + 7 files changed, 398 insertions(+), 144 deletions(-) create mode 100644 lib/gems/pending/util/extensions/aws-sdk/s3_upload_stream_patch.rb delete mode 100644 lib/gems/pending/util/mount/miq_s3_session.rb create mode 100644 lib/gems/pending/util/object_storage/miq_s3_storage.rb delete mode 100644 spec/util/mount/miq_s3_session_spec.rb create mode 100644 spec/util/object_storage/miq_s3_storage_spec.rb diff --git a/lib/gems/pending/util/extensions/aws-sdk/s3_upload_stream_patch.rb b/lib/gems/pending/util/extensions/aws-sdk/s3_upload_stream_patch.rb new file mode 100644 index 000000000..4ecbb30ee --- /dev/null +++ b/lib/gems/pending/util/extensions/aws-sdk/s3_upload_stream_patch.rb @@ -0,0 +1,281 @@ +# This is a backport of the `Aws::S3::Object#upload_stream` method that is +# introduced in the V3 of the 'aws-sdk' +# +# To my knowledge, the changes from this patch should be completely backwards +# compatible, and the API should easily support this change. +# +# Once upgraded, this file should be removed. + +require 'aws-sdk' + +if defined? Aws::S3::GEM_VERSION # This constant doesn't exist in v2 + raise <<~ERROR + The 'aws-sdk' gem has been updated to v3!!! + + Remove this file! + + #{File.expand_path(__FILE__)} + + And delete any references to it, starting with this one: + + #{caller(1..1).first} + + This functionality was backported and should now exist normally in the + aws-sdk library, so no monkey patching is necessary. + + Horray! + ERROR +else + # make sure the client is autoloaded before we continue + Aws::S3::Client +end + +# The code below comes directly from the aws-sdk, so ignore rubocop warnings +# for the remainder of this file. +# +# rubocop:disable all + +### Aws::S3::MultipartStreamUploader +# +# Ripped from: +# +# https://github.com/aws/aws-sdk-ruby/blob/708140b8/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb + +require 'thread' +require 'set' +require 'tempfile' +require 'stringio' + +module Aws + module S3 + # @api private + class MultipartStreamUploader + # api private + PART_SIZE = 5 * 1024 * 1024 # 5MB + + # api private + THREAD_COUNT = 10 + + # api private + TEMPFILE_PREIX = 'aws-sdk-s3-upload_stream'.freeze + + # @api private + CREATE_OPTIONS = + Set.new(Client.api.operation(:create_multipart_upload).input.shape.member_names) + + # @api private + UPLOAD_PART_OPTIONS = + Set.new(Client.api.operation(:upload_part).input.shape.member_names) + + # @option options [Client] :client + def initialize(options = {}) + @client = options[:client] || Client.new + @tempfile = options[:tempfile] + @part_size = options[:part_size] || PART_SIZE + @thread_count = options[:thread_count] || THREAD_COUNT + end + + # @return [Client] + attr_reader :client + + # @option options [required,String] :bucket + # @option options [required,String] :key + # @return [void] + def upload(options = {}, &block) + upload_id = initiate_upload(options) + parts = upload_parts(upload_id, options, &block) + complete_upload(upload_id, parts, options) + end + + private + + def initiate_upload(options) + @client.create_multipart_upload(create_opts(options)).upload_id + end + + def complete_upload(upload_id, parts, options) + @client.complete_multipart_upload( + bucket: options[:bucket], + key: options[:key], + upload_id: upload_id, + multipart_upload: { parts: parts }) + end + + def upload_parts(upload_id, options, &block) + completed = Queue.new + errors = IO.pipe do |read_pipe, write_pipe| + threads = upload_in_threads(read_pipe, completed, upload_part_opts(options).merge(upload_id: upload_id)) + block.call(write_pipe) + write_pipe.close + threads.map(&:value).compact + end + if errors.empty? + Array.new(completed.size) { completed.pop }.sort_by { |part| part[:part_number] } + else + abort_upload(upload_id, options, errors) + end + end + + def abort_upload(upload_id, options, errors) + @client.abort_multipart_upload( + bucket: options[:bucket], + key: options[:key], + upload_id: upload_id + ) + msg = "multipart upload failed: #{errors.map(&:message).join("; ")}" + raise MultipartUploadError.new(msg, errors) + rescue MultipartUploadError => error + raise error + rescue => error + msg = "failed to abort multipart upload: #{error.message}" + raise MultipartUploadError.new(msg, errors + [error]) + end + + def create_opts(options) + CREATE_OPTIONS.inject({}) do |hash, key| + hash[key] = options[key] if options.key?(key) + hash + end + end + + def upload_part_opts(options) + UPLOAD_PART_OPTIONS.inject({}) do |hash, key| + hash[key] = options[key] if options.key?(key) + hash + end + end + + def read_to_part_body(read_pipe) + return if read_pipe.closed? + temp_io = @tempfile ? Tempfile.new(TEMPFILE_PREIX) : StringIO.new + temp_io.binmode + bytes_copied = IO.copy_stream(read_pipe, temp_io, @part_size) + temp_io.rewind + if bytes_copied == 0 + if Tempfile === temp_io + temp_io.close + temp_io.unlink + end + nil + else + temp_io + end + end + + def upload_in_threads(read_pipe, completed, options) + mutex = Mutex.new + part_number = 0 + @thread_count.times.map do + thread = Thread.new do + begin + loop do + body, thread_part_number = mutex.synchronize do + [read_to_part_body(read_pipe), part_number += 1] + end + break unless body + begin + part = options.merge( + body: body, + part_number: thread_part_number, + ) + resp = @client.upload_part(part) + completed << {etag: resp.etag, part_number: part[:part_number]} + ensure + if Tempfile === body + body.close + body.unlink + end + end + end + nil + rescue => error + # keep other threads from uploading other parts + mutex.synchronize { read_pipe.close_read } + error + end + end + thread.abort_on_exception = true + thread + end + end + end + end +end + +### Aws::S3::Object patches +# +# Customizations ripped from: +# +# https://github.com/aws/aws-sdk-ruby/blob/708140b8/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb +# +# Only the Aws::S3::Object#upload_stream method is backported, which is a +# wrapper method around the above. This is an attempt to reduce the monkey +# patching surface area. +# + +module Aws + module S3 + class Object + # Unsure if this might have been used by the uploaded above, so I kept it + # in for now. + alias size content_length + + # Uploads a stream in a streaming fashion to the current object in S3. + # + # # Passed chunks automatically split into multipart upload parts + # # and the parts are uploaded in parallel. This allows for streaming uploads + # # that never touch the disk. + # + # Note that this is known to have issues in JRuby until jruby-9.1.15.0, so avoid using this with older versions of JRuby. + # + # @example Streaming chunks of data + # obj.upload_stream do |write_stream| + # 10.times { write_stream << 'foo' } + # end + # @example Streaming chunks of data + # obj.upload_stream do |write_stream| + # IO.copy_stream(IO.popen('ls'), write_stream) + # end + # @example Streaming chunks of data + # obj.upload_stream do |write_stream| + # IO.copy_stream(STDIN, write_stream) + # end + # + # @option options [Integer] :thread_count + # The number of parallel multipart uploads + # Default `:thread_count` is `10`. + # + # @option options [Boolean] :tempfile + # Normally read data is stored in memory when building the parts in order to complete + # the underlying multipart upload. By passing `:tempfile => true` data read will be + # temporarily stored on disk reducing the memory footprint vastly. + # Default `:tempfile` is `false`. + # + # @option options [Integer] :part_size + # Define how big each part size but the last should be. + # Default `:part_size` is `5 * 1024 * 1024`. + # + # @raise [MultipartUploadError] If an object is being uploaded in + # parts, and the upload can not be completed, then the upload is + # aborted and this error is raised. The raised error has a `#errors` + # method that returns the failures that caused the upload to be + # aborted. + # + # @return [Boolean] Returns `true` when the object is uploaded + # without any errors. + # + def upload_stream(options = {}, &block) + uploading_options = options.dup + uploader = MultipartStreamUploader.new( + client: client, + thread_count: uploading_options.delete(:thread_count), + tempfile: uploading_options.delete(:tempfile), + part_size: uploading_options.delete(:part_size), + ) + uploader.upload(uploading_options.merge(bucket: bucket_name, key: key), &block) + true + end + end + end +end +# rubocop:enable all diff --git a/lib/gems/pending/util/miq_object_storage.rb b/lib/gems/pending/util/miq_object_storage.rb index 845a0bbfd..b7ea0f2eb 100644 --- a/lib/gems/pending/util/miq_object_storage.rb +++ b/lib/gems/pending/util/miq_object_storage.rb @@ -1,6 +1,8 @@ require 'util/miq_file_storage' class MiqObjectStorage < MiqFileStorage::Interface + require 'util/object_storage/miq_s3_storage' + attr_accessor :settings attr_writer :logger diff --git a/lib/gems/pending/util/mount/miq_generic_mount_session.rb b/lib/gems/pending/util/mount/miq_generic_mount_session.rb index e840242d0..b5bb8f81b 100644 --- a/lib/gems/pending/util/mount/miq_generic_mount_session.rb +++ b/lib/gems/pending/util/mount/miq_generic_mount_session.rb @@ -11,7 +11,6 @@ class MiqGenericMountSession < MiqFileStorage::Interface require 'util/mount/miq_local_mount_session' require 'util/mount/miq_nfs_session' - require 'util/mount/miq_s3_session' require 'util/mount/miq_smb_session' require 'util/mount/miq_glusterfs_session' @@ -62,15 +61,13 @@ def self.new_session(opts) end def self.new_with_opts(opts) - new(opts.slice(:uri, :username, :password, :region)) + new(opts.slice(:uri, :username, :password)) end def self.uri_scheme_to_class(uri) require 'uri' scheme, userinfo, host, port, registry, share, opaque, query, fragment = URI.split(URI.encode(uri)) case scheme - when 's3' - MiqS3Session when 'smb' MiqSmbSession when 'nfs' diff --git a/lib/gems/pending/util/mount/miq_s3_session.rb b/lib/gems/pending/util/mount/miq_s3_session.rb deleted file mode 100644 index 3f31f27ce..000000000 --- a/lib/gems/pending/util/mount/miq_s3_session.rb +++ /dev/null @@ -1,89 +0,0 @@ -require 'util/mount/miq_generic_mount_session' - -class MiqS3Session < MiqGenericMountSession - def initialize(log_settings) - super(log_settings) - # NOTE: This line to be removed once manageiq-ui-class region change implemented. - @settings[:region] = "us-east-1" if @settings[:region].nil? - raise "username, password, and region are required values!" if @settings[:username].nil? || @settings[:password].nil? || @settings[:region].nil? - @host = URI(@settings[:uri]).host - @mount_path = URI(@settings[:uri]).path - end - - def self.raw_disconnect(mnt_point) - return if mnt_point.nil? - FileUtils.rm_rf(mnt_point) if File.exist?(mnt_point) - end - - def uri_to_local_path(remote_file) - # Strip off the leading "s3:/" from the URI" - File.join(@mnt_point, URI(remote_file).host, URI(remote_file).path) - end - - def uri_to_object_path(remote_file) - # Strip off the leading "s3://" and the bucket name from the URI" - # Also remove the leading delimiter. - URI(remote_file).path[1..-1] - end - - def add(local_file, uri) - require 'aws-sdk' - bucket_name = URI(uri).host - if (dump_bucket = s3.bucket(bucket_name)).exists? - logger.debug("Found bucket #{bucket_name}") - else - logger.debug("Bucket #{bucket_name} does not exist, creating.") - dump_bucket.create - end - object_file = uri_to_object_path(uri) - # write dump file to s3 - logger.debug("Writing [#{local_file}] to Bucket [#{bucket_name}] using object file name [#{object_file}]") - begin - dump_bucket.object(object_file).upload_file(local_file) - rescue Aws::S3::Errors::AccessDenied, Aws::S3::Errors::Forbidden => err - disconnect - logger.error("Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}") - msg = "Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}" - raise err, msg, err.backtrace - rescue => err - disconnect - logger.error("Error uploading #{local_file} to S3 bucket #{bucket_name}. #{err}") - msg = "Error uploading #{local_file} to S3 bucket #{bucket_name}. #{err}" - raise err, msg, err.backtrace - end - end - - def download(local_file, remote_file) - require 'aws-sdk' - bucket_name = URI(remote_file).host - if (dump_bucket = s3.bucket(bucket_name)).exists? - logger.debug("Found bucket #{bucket_name}") - else - logger.error("Bucket #{bucket_name} does not exist, unable to download [#{remote_file}].") - raise "Bucket #{bucket_name} does not exist, unable to download [#{remote_file}]." - end - object_file = uri_to_object_path(remote_file) - local_file = File.join(@mnt_point, File.basename(local_file)) - logger.debug("Downloading [#{object_file}] from bucket [#{bucket_name}] to local file [#{local_file}]") - begin - dump_bucket.object(object_file).download_file(local_file) - rescue Aws::S3::Errors::AccessDenied, Aws::S3::Errors::Forbidden => err - disconnect - logger.error("Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}") - msg = "Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}" - raise err, msg, err.backtrace - rescue => err - disconnect - logger.error("Error downloading #{remote_file} from S3. #{err}") - msg = "Error downloading #{remote_file} from S3. #{err}" - raise err, msg, err.backtrace - end - local_file - end - - private - - def s3 - @s3 ||= Aws::S3::Resource.new(:region => @settings[:region], :access_key_id => @settings[:username], :secret_access_key => @settings[:password]) - end -end diff --git a/lib/gems/pending/util/object_storage/miq_s3_storage.rb b/lib/gems/pending/util/object_storage/miq_s3_storage.rb new file mode 100644 index 000000000..06cb66511 --- /dev/null +++ b/lib/gems/pending/util/object_storage/miq_s3_storage.rb @@ -0,0 +1,101 @@ +require 'util/miq_object_storage' + +class MiqS3Storage < MiqObjectStorage + attr_reader :bucket_name + + def self.uri_scheme + "s3".freeze + end + + def self.new_with_opts(opts) + new(opts.slice(:uri, :username, :password, :region)) + end + + def initialize(settings) + super(settings) + + # NOTE: This line to be removed once manageiq-ui-class region change implemented. + @settings[:region] ||= "us-east-1" + @bucket_name = URI(@settings[:uri]).host + + raise "username, password, and region are required values!" if @settings[:username].nil? || @settings[:password].nil? || @settings[:region].nil? + end + + # Extract the path from the URI, so strip off the "s3://" scheme, the bucket + # hostname, leaving only the path minus the leading '/' + def uri_to_object_key(remote_file) + # `path` is `[5]` in the returned result of URI.split + URI.split(remote_file)[5][1..-1] + end + + def upload_single(dest_uri) + object_key = uri_to_object_key(dest_uri) + logger.debug("Writing [#{source_input}] to => Bucket [#{bucket_name}] Key [#{dest_uri}]") + + with_standard_s3_error_handling("uploading", source_input) do + bucket.object(object_key).upload_stream do |write_stream| + IO.copy_stream(source_input, write_stream, byte_count) + end + end + end + + def download_single(source, destination) + object_key = uri_to_object_key(remote_file) + logger.debug("Downloading [#{source}] from bucket [#{bucket_name}] to local file [#{destination}]") + + with_standard_s3_error_handling("downloading", source) do + if destination.kind_of?(IO) + s3.client.get_object(:bucket => bucket_name, :key => object_key) do |chunk| + destination.write(chunk) + end + else # assume file path + bucket.object(source).download_file(destination) + end + end + local_file + end + + # no-op mostly + # + # dirs don't need to be created ahead of time in s3, unlike mounted file + # systems. + # + # For convenience though, calling bucket, which will initialize and create + # (if needed) the s3 bucket to be used for this instance. + def mkdir(_dir) + bucket + end + + def bucket + @bucket ||= s3.bucket(bucket_name).tap do |bucket| + if bucket.exists? + logger.debug("Found bucket #{bucket_name}") + else + logger.debug("Bucket #{bucket_name} does not exist, creating.") + bucket.create + end + end + end + + private + + def s3 + require 'aws-sdk' + require 'util/extensions/aws-sdk/s3_upload_stream_patch' + @s3 ||= Aws::S3::Resource.new(:region => @settings[:region], + :access_key_id => @settings[:username], + :secret_access_key => @settings[:password]) + end + + def with_standard_s3_error_handling(action, object) + yield + rescue Aws::S3::Errors::AccessDenied, Aws::S3::Errors::Forbidden => err + logger.error("Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}") + msg = "Access to S3 bucket #{bucket_name} restricted. Try a different name. #{err}" + raise err, msg, err.backtrace + rescue => err + logger.error("Error #{action} #{object} from S3. #{err}") + msg = "Error #{action} #{object} from S3. #{err}" + raise err, msg, err.backtrace + end +end diff --git a/spec/util/mount/miq_s3_session_spec.rb b/spec/util/mount/miq_s3_session_spec.rb deleted file mode 100644 index fc1112f14..000000000 --- a/spec/util/mount/miq_s3_session_spec.rb +++ /dev/null @@ -1,51 +0,0 @@ -require "util/mount/miq_s3_session" - -describe MiqS3Session do - before(:each) do - @uri = "s3://tmp/abc/def" - @session = described_class.new(:uri => @uri, :username => 'user', :password => 'pass', :region => 'region') - @session.connect - end - - after(:each) do - @session.disconnect - end - - it "#connect returns a string pointing to the mount point" do - allow(described_class).to receive(:raw_disconnect) - @session.logger = Logger.new("/dev/null") - @session.disconnect - - result = @session.connect - expect(result).to be_kind_of(String) - expect(result).to_not be_blank - end - - it "#mount_share is unique" do - expect(@session.mount_share).to_not eq(described_class.new(:uri => @uri, :username => 'user', :password => 'pass', :region => 'region').mount_share) - end - - it ".runcmd will retry with sudo if needed" do - cmd = "mount X Y" - expect(described_class).to receive(:`).once.with("#{cmd} 2>&1") - expect(described_class).to receive(:`).with("sudo #{cmd} 2>&1") - expect($CHILD_STATUS).to receive(:exitstatus).once.and_return(1) - - described_class.runcmd(cmd) - end - - it "#@mnt_point starts with '/tmp/miq_'" do - result = @session.mnt_point - expect(result).to start_with("/tmp/miq_") - end - - it "#uri_to_local_path returns a new local path" do - result = @session.uri_to_local_path(@uri) - expect(result).to match(/^\/tmp\/miq_.*\/tmp\/abc\/def$/) - end - - it "#uri_to_object_path returns a new object path" do - result = @session.uri_to_object_path(@uri) - expect(result).to eq("abc/def") - end -end diff --git a/spec/util/object_storage/miq_s3_storage_spec.rb b/spec/util/object_storage/miq_s3_storage_spec.rb new file mode 100644 index 000000000..1d62c998a --- /dev/null +++ b/spec/util/object_storage/miq_s3_storage_spec.rb @@ -0,0 +1,13 @@ +require "util/object_storage/miq_s3_storage" + +describe MiqS3Storage do + before(:each) do + @uri = "s3://tmp/abc/def" + @session = described_class.new(:uri => @uri, :username => 'user', :password => 'pass', :region => 'region') + end + + it "#uri_to_object_path returns a new object path" do + result = @session.uri_to_object_key(@uri) + expect(result).to eq("abc/def") + end +end