-
Notifications
You must be signed in to change notification settings - Fork 79
/
Copy pathmiq_file_storage.rb
254 lines (226 loc) · 8.67 KB
/
miq_file_storage.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# This class is meant to be a abstract interface for defining a file_storage
# class.
#
# The storage class can either be of a type of "object storage", which includes:
# * protocols like FTP
# * document storage like s3 and OpenStack's Swift
#
# And mountable filesystems like:
# * NFS
# * SMB
#
# The class is meant to allow a shared interface for working with these
# different forms of file storage, while maintaining their differences in
# implementation where necessary. Connection will be handled separately by the
# subclasses, but they must conform to the top level interface.
#
class MiqFileStorage
class InvalidSchemeError < ArgumentError
def initialize(bad_scheme = nil)
super(error_message(bad_scheme))
end
def error_message(bad_scheme)
valid_schemes = ::MiqFileStorage.storage_interface_classes.keys.inspect
"#{bad_scheme} is not a valid MiqFileStorage uri scheme. Accepted schemes are #{valid_schemes}"
end
end
def self.with_interface_class(opts)
klass = fetch_interface_class(opts)
block_given? ? yield(klass) : klass
end
def self.fetch_interface_class(opts)
return nil unless opts[:uri]
require 'uri'
scheme, _ = URI.split(URI::DEFAULT_PARSER.escape(opts[:uri]))
klass = storage_interface_classes[scheme]
raise InvalidSchemeError, scheme if klass.nil?
klass.new_with_opts(opts)
end
private_class_method :fetch_interface_class
def self.storage_interface_classes
@storage_interface_classes ||= Interface.descendants.each_with_object({}) do |klass, memo|
memo[klass.uri_scheme] = klass if klass.uri_scheme
end
end
class Interface
BYTE_HASH_MATCH = /^(?<BYTE_NUM>\d+(\.\d+)?)\s*(?<BYTE_QUALIFIER>K|M|G)?$/i
BYTE_HASH = {
"k" => 1.kilobyte,
"m" => 1.megabyte,
"g" => 1.gigabyte
}.freeze
attr_reader :remote_file_path, :byte_count, :source_input, :input_writer
def self.new_with_opts(opts) # rubocop:disable Lint/UnusedMethodArgument
raise NotImplementedError, "#{name}.new_with_opts is not defined"
end
def self.uri_scheme
nil
end
# :call-seq:
# add( remote_uri ) { |input_writer| ... }
# add( remote_uri, byte_count ) { |input_writer| ... }
#
# add( local_io, remote_uri )
# add( local_io, remote_uri, byte_count )
#
# Add a file to the destination URI.
#
# In the block form of the method, only the remote_uri is required, and it
# is assumed the input will be a generated in the executed block (most
# likely an external process) to a unix pipe that can be written to. The
# pipe generated by this method and passed in to the block as a file
# location to the `input_stream`).
#
# In the non-block form, a source must be provided as the first argument
# either as an IO object that can be read from, or a file path, and the
# second argument is the remote_uri as in the block form.
#
# An additional argument in both forms as the last argument is `byte_count`
# can also be included. If passed, it will be assumed that the resulting
# input will be split, and the naming for the splits will be:
#
# - filename.00001
# - filename.00002
# ...
#
# Block form:
#
# nfs_session.add("path/to/file", "200M") do |input_stream|
# `pg_dump -f #{input_stream} vmdb_production`
# end
#
# Non-block form:
#
# nfs_session.add("path/to/local_file", "path/to/remote_file")
# nfs_session.add("path/to/local_file", "path/to/remote_file", "200M")
#
def add(*upload_args, &block)
initialize_upload_vars(*upload_args)
mkdir(File.dirname(@remote_file_path))
thread = handle_io_block(&block)
result = if byte_count
upload_splits
else
upload_single(@remote_file_path)
end
# `.join` will raise any errors from the thread, so we want to do that
# here (if a thread exists of course).
thread.join if thread
result
ensure
reset_vars
end
alias upload add
def mkdir(dir) # rubocop:disable Lint/UnusedMethodArgument
raise NotImplementedError, "#{self.class}##{__callee__} is not defined"
end
# :call-seq:
# download( local_io, remote_uri )
# download( nil, remote_uri ) { |input_writer| ... }
#
# Download a file from a remote uri.
#
# In non-block form, the remote_uri is saved to the local_io.
#
# In block form, the local_io is omitted, and it is set to a PTY writer
# path that will assumed to be read by the block provided.
def download(local_file, remote_file_uri, &block)
@remote_file_path = remote_file_uri
if block_given?
thread = handle_io_block(&block)
download_single(remote_file_uri, input_writer)
input_writer.close
thread.join
else
download_single(remote_file_uri, local_file)
end
ensure
reset_vars
end
private
# NOTE: Needs to be overwritten in the subclass!
#
# Classes that inherit from `MiqFileStorage` need to make sure to create a
# method that overwrites this one to handle the specifics of uploading for
# their particular ObjectStore protocol or MountSession.
#
# `dest_uri` is the current file that will be uploaded. If file splitting
# is occurring, this will update the filename passed into `.add` to include
# a `.0000X` suffix, where the suffix is padded up to 5 digits in total.
#
# `#upload_single` doesn't need to worry about determining the file name
# itself for splitting, but if any relative path munging is necessary, that
# should be done here (see `MiqGenericMountSession#upload_single` for an
# example)
#
# `source_input` available as an attr_reader in this method, and will
# always be a local IO object that is available for reading.
#
# `byte_count` is also an attr_reader that is available, and will either be
# `nil` if no file splitting is occurring, or a integer representing the
# maximum number of bytes to uploaded for this particular `dest_uri`.
#
#
# Ideally, making use of `IO.copy_stream` will simplify this process
# significantly, as you can pass it `source_input`, `dest_uri`, and
# `byte_count` respectively, and it will automatically handle streaming the
# data from one IO object to the other. In mount based situations, where
# `dest_uri` is a file path (in `MiqGenericMountSession#upload_single`,
# this is converted to `relpath`), this does not need to be converted to a
# `File` IO object as `IO.copy_stream` will do that for you.
def upload_single(dest_uri) # rubocop:disable Lint/UnusedMethodArgument
raise NotImplementedError, "#{self.class}#upload_single is not defined"
end
def upload_splits
@position = 0
until source_input.eof?
upload_single(next_split_filename)
@position += byte_count
end
end
def initialize_upload_vars(*upload_args)
upload_args.pop if (@byte_count = parse_byte_value(upload_args.last))
@remote_file_path = upload_args.pop
unless upload_args.empty?
source = upload_args.pop
@source_input = source.kind_of?(IO) ? source : File.open(source, "r")
end
end
def parse_byte_value(bytes)
match = bytes.to_s.match(BYTE_HASH_MATCH) || return
bytes = match[:BYTE_NUM].to_f
if match[:BYTE_QUALIFIER]
bytes *= BYTE_HASH[match[:BYTE_QUALIFIER].downcase]
end
bytes.to_i
end
def handle_io_block
if block_given?
require "tmpdir"
# create pathname, but don't create the file for it (next line)
fifo_path = Pathname.new(Dir::Tmpname.create("") {})
File.mkfifo(fifo_path)
# For #Reasons(TM), the reader must be opened first
@source_input = File.open(fifo_path.to_s, IO::RDONLY | IO::NONBLOCK)
@input_writer = File.open(fifo_path.to_s, IO::WRONLY | IO::NONBLOCK)
Thread.new do
begin
yield fifo_path # send the path to the block to get executed
ensure
@input_writer.close # close the file so we know we hit EOF (for #add)
end
end
end
end
def reset_vars
File.delete(@input_writer.path) if @input_writer
@position, @byte_count, @remote_file_path, @source_input, @input_writer = nil
end
def next_split_filename
"#{remote_file_path}.#{'%05d' % (@position / byte_count + 1)}"
end
def download_single(source, destination) # rubocop:disable Lint/UnusedMethodArgument
raise NotImplementedError, "#{self.class}#download_single is not defined"
end
end
end