From 1703a734c46afdcf069d7f1a0521117da58b35fb Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Tue, 9 Feb 2021 17:05:25 +0900 Subject: [PATCH 01/19] wip --- smart_open/s3.py | 375 ++++++++++++---------------- smart_open/tests/test_s3.py | 75 +++--- smart_open/tests/test_smart_open.py | 81 +++--- 3 files changed, 236 insertions(+), 295 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 279803af..ef431c92 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -127,41 +127,46 @@ def _consolidate_params(uri, transport_params): """ transport_params = dict(transport_params) - session = transport_params.get('session') - if session is not None and (uri['access_id'] or uri['access_secret']): + def inject(**kwargs): + try: + client_kwargs = transport_params['client_kwargs'] + except KeyError: + client_kwargs = transport_params['client_kwargs'] = {} + + try: + ctor_kwargs = client_kwargs['S3.Client'] + except KeyError: + ctor_kwargs = client_kwargs['S3.Client'] = {} + + ctor_kwargs.update(**kwargs) + + client = transport_params.get('client') + if client is not None and (uri['access_id'] or uri['access_secret']): logger.warning( 'ignoring credentials parsed from URL because they conflict with ' - 'transport_params["session"]. Set transport_params["session"] to None ' + 'transport_params["client"]. Set transport_params["client"] to None ' 'to suppress this warning.' ) uri.update(access_id=None, access_secret=None) elif (uri['access_id'] and uri['access_secret']): - transport_params['session'] = boto3.Session( + inject( aws_access_key_id=uri['access_id'], aws_secret_access_key=uri['access_secret'], ) uri.update(access_id=None, access_secret=None) - if uri['host'] != DEFAULT_HOST: - endpoint_url = 'https://%(host)s:%(port)d' % uri - _override_endpoint_url(transport_params, endpoint_url) - - return uri, transport_params - - -def _override_endpoint_url(transport_params, url): - try: - resource_kwargs = transport_params['resource_kwargs'] - except KeyError: - resource_kwargs = transport_params['resource_kwargs'] = {} - - if resource_kwargs.get('endpoint_url'): + if client is not None and uri['host'] != DEFAULT_HOST: logger.warning( - 'ignoring endpoint_url parsed from URL because it conflicts ' - 'with transport_params["resource_kwargs"]["endpoint_url"]' + 'ignoring endpoint_url parsed from URL because they conflict with ' + 'transport_params["client"]. Set transport_params["client"] to None ' + 'to suppress this warning.' ) - else: - resource_kwargs.update(endpoint_url=url) + uri.update(host=None) + elif uri['host'] != DEFAULT_HOST: + inject(endpoint_url='https://%(host)s:%(port)d' % uri) + uri.update(host=None) + + return uri, transport_params def open_uri(uri, mode, transport_params): @@ -178,14 +183,10 @@ def open( version_id=None, buffer_size=DEFAULT_BUFFER_SIZE, min_part_size=DEFAULT_MIN_PART_SIZE, - session=None, - resource=None, - resource_kwargs=None, - multipart_upload_kwargs=None, multipart_upload=True, - singlepart_upload_kwargs=None, - object_kwargs=None, defer_seek=False, + client=None, + client_kwargs=None, ): """Open an S3 object for reading or writing. @@ -201,22 +202,6 @@ def open( The buffer size to use when performing I/O. min_part_size: int, optional The minimum part size for multipart uploads. For writing only. - session: object, optional - The S3 session to use when working with boto3. - If you don't specify this, then smart_open will create a new session for you. - resource: object, optional - The S3 resource to use when working with boto3. - If you don't specify this, then smart_open will create a new resource for you. - resource_kwargs: dict, optional - Keyword arguments to use when creating the S3 resource for reading or writing. - Will be ignored if you specify the resource object explicitly. - multipart_upload_kwargs: dict, optional - Additional parameters to pass to boto3's initiate_multipart_upload function. - For writing only. - singlepart_upload_kwargs: dict, optional - Additional parameters to pass to boto3's S3.Object.put function when using single - part upload. - For writing only. multipart_upload: bool, optional Default: `True` If set to `True`, will use multipart upload for writing to S3. If set @@ -226,14 +211,16 @@ def open( version_id: str, optional Version of the object, used when reading object. If None, will fetch the most recent version. - object_kwargs: dict, optional - Additional parameters to pass to boto3's object.get function. - Used during reading only. defer_seek: boolean, optional Default: `False` If set to `True` on a file opened for reading, GetObject will not be called until the first seek() or read(). Avoids redundant API queries when seeking before reading. + client: object, optional + The S3 client to use when working with boto3. + If you don't specify this, then smart_open will create a new client for you. + client_kwargs: dict, optional + Additional parameters to pass to the relevant functions of the client. """ logger.debug('%r', locals()) if mode not in constants.BINARY_MODES: @@ -248,11 +235,9 @@ def open( key_id, version_id=version_id, buffer_size=buffer_size, - session=session, - resource=resource, - resource_kwargs=resource_kwargs, - object_kwargs=object_kwargs, defer_seek=defer_seek, + client=client, + client_kwargs=client_kwargs, ) elif mode == constants.WRITE_BINARY: if multipart_upload: @@ -260,19 +245,15 @@ def open( bucket_id, key_id, min_part_size=min_part_size, - session=session, - resource=resource, - upload_kwargs=multipart_upload_kwargs, - resource_kwargs=resource_kwargs, + client=client, + client_kwargs=client_kwargs, ) else: fileobj = SinglepartWriter( bucket_id, key_id, - session=session, - resource=resource, - upload_kwargs=singlepart_upload_kwargs, - resource_kwargs=resource_kwargs, + client=client, + client_kwargs=client_kwargs, ) else: assert False, 'unexpected mode: %r' % mode @@ -281,15 +262,16 @@ def open( return fileobj -def _get(s3_object, version=None, **kwargs): +def _get(client, bucket, key, version, **get_object_kwargs): if version is not None: - kwargs['VersionId'] = version + get_object_kwargs['VersionId'] = version + try: - return s3_object.get(**kwargs) + return client.get_object(Bucket=bucket, Key=key, **get_object_kwargs) except botocore.client.ClientError as error: wrapped_error = IOError( 'unable to access bucket: %r key: %r version: %r error: %s' % ( - s3_object.bucket_name, s3_object.key, version, error + bucket, key, version, error ) ) wrapped_error.backend_error = error @@ -312,16 +294,21 @@ class _SeekableRawReader(object): def __init__( self, - s3_object, + client, + bucket, + key, version_id=None, - object_kwargs=None, + client_kwargs=None, ): - self._object = s3_object - self._content_length = None + self._client = client + self._bucket = bucket + self._key = key self._version_id = version_id + + self._content_length = None self._position = 0 self._body = None - self._object_kwargs = object_kwargs if object_kwargs else {} + self._client_kwargs = client_kwargs if client_kwargs else {} def seek(self, offset, whence=constants.WHENCE_START): """Seek to the specified position. @@ -391,10 +378,12 @@ def _open_body(self, start=None, stop=None): try: # Optimistically try to fetch the requested content range. response = _get( - self._object, + self._client, + self._bucket, + self._key, version=self._version_id, Range=range_string, - **self._object_kwargs + **self._client_kwargs.get('S3.Client.get_object', {}), ) except IOError as ioe: # Handle requested content range exceeding content size. @@ -468,43 +457,20 @@ def read(self, size=-1): raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt)) def __str__(self): - return 'smart_open.s3._SeekableReader(%r, %r)' % ( - self._object.bucket_name, - self._object.key, - ) + return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key) -def _initialize_boto3(rw, session, resource, resource_kwargs): +def _initialize_boto3(rw, client, client_kwargs): """Created the required objects for accessing S3. Ideally, they have - been already created for us and we can just reuse them. - - We only really need one thing: the resource. There are multiple ways of - getting one, in order of effort: + been already created for us and we can just reuse them.""" + if client_kwargs is None: + client_kwargs = {} + rw._client_kwargs = client_kwargs - 1) Directly from the user - 2) From the session directly specified by the user - 3) From an entirely new session - - Once we have the resource, we no longer need the session. - """ - if resource_kwargs is None: - resource_kwargs = {} - - if resource: - if session: - logger.warning('ignoring session because resource was passed explicitly') - if resource_kwargs: - logger.warning('ignoring resource_kwargs because resource was passed explicitly') - rw._session = None - rw._resource = resource - elif session: - rw._session = session - rw._resource = rw._session.resource('s3', **resource_kwargs) - rw._resource_kwargs = resource_kwargs + if client: + rw._client = client else: - rw._session = boto3.Session() - rw._resource = rw._session.resource('s3', **resource_kwargs) - rw._resource_kwargs = resource_kwargs + rw._client = boto3.client('s3', **client_kwargs.get('S3.Client', {})) class Reader(io.BufferedIOBase): @@ -519,29 +485,23 @@ def __init__( version_id=None, buffer_size=DEFAULT_BUFFER_SIZE, line_terminator=constants.BINARY_NEWLINE, - session=None, - resource=None, - resource_kwargs=None, - object_kwargs=None, defer_seek=False, + client=None, + client_kwargs=None, ): + self._bucket = bucket + self._key = key + self._version_id = version_id self._buffer_size = buffer_size - if resource_kwargs is None: - resource_kwargs = {} - if object_kwargs is None: - object_kwargs = {} - - _initialize_boto3(self, session, resource, resource_kwargs) - - self._object_kwargs = object_kwargs - self._object = self._resource.Object(bucket, key) - self._version_id = version_id + _initialize_boto3(self, client, client_kwargs) self._raw_reader = _SeekableRawReader( - self._object, + self._client, + bucket, + key, self._version_id, - self._object_kwargs, + client_kwargs=self._client_kwargs, ) self._current_pos = 0 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size) @@ -562,7 +522,7 @@ def __init__( def close(self): """Flush and close this stream.""" - self._object = None + pass def readable(self): """Return True if the stream can be read from.""" @@ -668,7 +628,7 @@ def terminate(self): """Do nothing.""" pass - def to_boto3(self): + def to_boto3(self, resource=None): """Create an **independent** `boto3.s3.Object` instance that points to the same resource as this instance. @@ -677,13 +637,13 @@ def to_boto3(self): `boto3.s3.Object` may not necessarily affect the current instance. """ + if not resource: + resource = boto3.resource('s3') + obj = resource.Object(self._bucket, self._key) if self._version_id is not None: - return self._resource.Object( - self._object.bucket_name, - self._object.key, - ).Version(self._version_id) + return obj.Version(self._version_id) else: - return self._resource.Object(self._object.bucket_name, self._object.key) + return obj # # Internal methods. @@ -704,9 +664,7 @@ def _fill_buffer(self, size=-1): self._eof = True def __str__(self): - return "smart_open.s3.Reader(%r, %r)" % ( - self._object.bucket_name, self._object.key - ) + return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key) def __repr__(self): return ( @@ -715,17 +673,13 @@ def __repr__(self): "key=%r, " "version_id=%r, " "buffer_size=%r, " - "line_terminator=%r, " - "session=%r, " - "resource_kwargs=%r)" + "line_terminator=%r)" ) % ( - self._object.bucket_name, - self._object.key, + self._bucket, + self._key, self._version_id, self._buffer_size, self._line_terminator, - self._session, - self._resource_kwargs, ) @@ -739,27 +693,27 @@ def __init__( bucket, key, min_part_size=DEFAULT_MIN_PART_SIZE, - session=None, - resource=None, - resource_kwargs=None, - upload_kwargs=None, + client=None, + client_kwargs=None, ): + self._bucket = bucket + self._key = key + if min_part_size < MIN_MIN_PART_SIZE: logger.warning("S3 requires minimum part size >= 5MB; \ multipart upload may fail") + self._min_part_size = min_part_size - _initialize_boto3(self, session, resource, resource_kwargs) - - if upload_kwargs is None: - upload_kwargs = {} - - self._upload_kwargs = upload_kwargs + _initialize_boto3(self, client, client_kwargs) try: - self._object = self._resource.Object(bucket, key) - self._min_part_size = min_part_size - partial = functools.partial(self._object.initiate_multipart_upload, **self._upload_kwargs) - self._mp = _retry_if_failed(partial) + partial = functools.partial( + self._client.create_multipart_upload, + Bucket=bucket, + Key=key, + **self._client_kwargs.get('S3.Client.create_multipart_upload', {}), + ) + self._upload_id = _retry_if_failed(partial)['UploadId'] except botocore.client.ClientError as error: raise ValueError( 'the bucket %r does not exist, or is forbidden for access (%r)' % ( @@ -787,11 +741,17 @@ def close(self): if self._buf.tell(): self._upload_next_part() - if self._total_bytes and self._mp: - partial = functools.partial(self._mp.complete, MultipartUpload={'Parts': self._parts}) + if self._total_bytes and self._upload_id: + partial = functools.partial( + self._client.complete_multipart_upload, + Bucket=self._bucket, + Key=self._key, + UploadId=self._upload_id, + MultipartUpload={'Parts': self._parts}, + ) _retry_if_failed(partial) logger.debug('%s: completed multipart upload', self) - elif self._mp: + elif self._upload_id: # # AWS complains with "The XML you provided was not well-formed or # did not validate against our published schema" when the input is @@ -799,15 +759,19 @@ def close(self): # # We work around this by creating an empty file explicitly. # - assert self._mp, "no multipart upload in progress" - self._mp.abort() - self._object.put(Body=b'') + assert self._upload_id, "no multipart upload in progress" + self._client.abort_multipart_upload( + Bucket=self._bucket, + Key=self._key, + UploadId=self._upload_id, + ) + self._client.put_object(Bucket=self._bucket, Key=self._key, Body=b'') logger.debug('%s: wrote 0 bytes to imitate multipart upload', self) - self._mp = None + self._upload_id = None @property def closed(self): - return self._mp is None + return self._upload_id is None def writable(self): """Return True if the stream supports writing.""" @@ -842,11 +806,15 @@ def write(self, b): def terminate(self): """Cancel the underlying multipart upload.""" - assert self._mp, "no multipart upload in progress" - self._mp.abort() - self._mp = None + assert self._upload_id, "no multipart upload in progress" + self._client.abort_multipart_upload( + Bucket=self._bucket, + Key=self._key, + UploadId=self._upload_id, + ) + self._upload_id = None - def to_boto3(self): + def to_boto3(self, resource=None): """Create an **independent** `boto3.s3.Object` instance that points to the same resource as this instance. @@ -855,7 +823,9 @@ def to_boto3(self): `boto3.s3.Object` may not necessary affect the current instance. """ - return self._resource.Object(self._object.bucket_name, self._object.key) + if not resource: + resource = boto3.resource('s3') + return resource.Object(self._bucket, self._key) # # Internal methods. @@ -870,7 +840,6 @@ def _upload_next_part(self): self._total_bytes / 1024.0 ** 3, ) self._buf.seek(0) - part = self._mp.Part(part_num) # # Network problems in the middle of an upload are particularly @@ -878,7 +847,16 @@ def _upload_next_part(self): # of a temporary connection problem, so this part needs to be # especially robust. # - upload = _retry_if_failed(functools.partial(part.upload, Body=self._buf)) + upload = _retry_if_failed( + functools.partial( + self._client.upload_part, + Bucket=self._bucket, + Key=self._key, + UploadId=self._upload_id, + PartNumber=part_num, + Body=self._buf, + ) + ) self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num}) logger.debug("%s: upload of part_num #%i finished", self, part_num) @@ -896,21 +874,13 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __str__(self): - return "smart_open.s3.MultipartWriter(%r, %r)" % ( - self._object.bucket_name, self._object.key, - ) + return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key) def __repr__(self): - return ( - "smart_open.s3.MultipartWriter(bucket=%r, key=%r, " - "min_part_size=%r, session=%r, resource_kwargs=%r, upload_kwargs=%r)" - ) % ( - self._object.bucket_name, - self._object.key, + return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, min_part_size=%r)" % ( + self._bucket, + self._key, self._min_part_size, - self._session, - self._resource_kwargs, - self._upload_kwargs, ) @@ -923,25 +893,19 @@ class SinglepartWriter(io.BufferedIOBase): the data be written to S3 and the buffer is released.""" def __init__( - self, - bucket, - key, - session=None, - resource=None, - resource_kwargs=None, - upload_kwargs=None, - ): - - _initialize_boto3(self, session, resource, resource_kwargs) - - if upload_kwargs is None: - upload_kwargs = {} + self, + bucket, + key, + client=None, + client_kwargs=None, + ): + self._bucket = bucket + self._key = key - self._upload_kwargs = upload_kwargs + _initialize_boto3(self, client, client_kwargs) try: - self._object = self._resource.Object(bucket, key) - self._resource.meta.client.head_bucket(Bucket=bucket) + self._object = self._client.head_bucket(Bucket=bucket) except botocore.client.ClientError as e: raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e @@ -966,10 +930,15 @@ def close(self): self._buf.seek(0) try: - self._object.put(Body=self._buf, **self._upload_kwargs) + self._client.put_object( + Bucket=self._bucket, + Key=self._key, + Body=self._buf, + **self._client_kwargs.get('S3.Client.put_object', {}), + ) except botocore.client.ClientError as e: raise ValueError( - 'the bucket %r does not exist, or is forbidden for access' % self._object.bucket_name) from e + 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e logger.debug("%s: direct upload finished", self) self._buf = None @@ -1023,16 +992,7 @@ def __str__(self): return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._object.bucket_name, self._object.key) def __repr__(self): - return ( - "smart_open.s3.SinglepartWriter(bucket=%r, key=%r, session=%r, " - "resource_kwargs=%r, upload_kwargs=%r)" - ) % ( - self._object.bucket_name, - self._object.key, - self._session, - self._resource_kwargs, - self._upload_kwargs, - ) + return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key) def _retry_if_failed( @@ -1057,13 +1017,6 @@ def _retry_if_failed( raise IOError('Unable to connect to the endpoint after %d attempts' % attempts) -# -# For backward compatibility -# -SeekableBufferedInputBase = Reader -BufferedOutputBase = MultipartWriter - - def _accept_all(key): return True diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index fcd063d2..e9d5fe36 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -170,13 +170,12 @@ def read(self, size=-1): return the_bytes -class CrapObject: +class CrapClient: def __init__(self, data, modulus=2): self._datasize = len(data) self._body = CrapStream(data, modulus=modulus) - self.bucket_name, self.key = 'crap', 'object' - def get(self, *args, **kwargs): + def get_object(self, *args, **kwargs): return { 'ActualObjectSize': self._datasize, 'ContentLength': self._datasize, @@ -188,7 +187,7 @@ def get(self, *args, **kwargs): class IncrementalBackoffTest(unittest.TestCase): def test_every_read_fails(self): - reader = smart_open.s3._SeekableRawReader(CrapObject(b'hello', 1)) + reader = smart_open.s3._SeekableRawReader(CrapClient(b'hello', 1), 'bucket', 'key') with mock.patch('time.sleep') as mock_sleep: with self.assertRaises(IOError): reader.read() @@ -200,7 +199,7 @@ def test_every_read_fails(self): def test_every_second_read_fails(self): """Can we read from a stream that raises exceptions from time to time?""" - reader = smart_open.s3._SeekableRawReader(CrapObject(b'hello')) + reader = smart_open.s3._SeekableRawReader(CrapClient(b'hello'), 'bucket', 'key') with mock.patch('time.sleep') as mock_sleep: assert reader.read(1) == b'h' mock_sleep.assert_not_called() @@ -222,7 +221,7 @@ def test_every_second_read_fails(self): @moto.mock_s3 -class SeekableBufferedInputBaseTest(BaseTest): +class ReaderTest(BaseTest): def setUp(self): # lower the multipart upload size, to speed up these tests self.old_min_part_size = smart_open.s3.DEFAULT_MIN_PART_SIZE @@ -244,7 +243,7 @@ def test_iter(self): # connect to fake s3 and read from the fake key we filled above with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) output = [line.rstrip(b'\n') for line in fin] self.assertEqual(output, expected.split(b'\n')) @@ -253,7 +252,7 @@ def test_iter_context_manager(self): expected = u"hello wořld\nhow are you?".encode('utf8') put_to_bucket(contents=expected) with self.assertApiCalls(GetObject=1): - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) as fin: output = [line.rstrip(b'\n') for line in fin] self.assertEqual(output, expected.split(b'\n')) @@ -264,7 +263,7 @@ def test_read(self): logger.debug('content: %r len: %r', content, len(content)) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) self.assertEqual(content[:6], fin.read(6)) self.assertEqual(content[6:14], fin.read(8)) # ř is 2 bytes self.assertEqual(content[14:], fin.read()) # read the rest @@ -275,7 +274,7 @@ def test_seek_beginning(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) self.assertEqual(content[:6], fin.read(6)) self.assertEqual(content[6:14], fin.read(8)) # ř is 2 bytes @@ -293,7 +292,7 @@ def test_seek_start(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) seek = fin.seek(6) self.assertEqual(seek, 6) self.assertEqual(fin.tell(), 6) @@ -305,7 +304,7 @@ def test_seek_current(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) self.assertEqual(fin.read(5), b'hello') with self.assertApiCalls(GetObject=1): @@ -319,7 +318,7 @@ def test_seek_end(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) seek = fin.seek(-4, whence=smart_open.constants.WHENCE_END) self.assertEqual(seek, len(content) - 4) self.assertEqual(fin.read(), b'you?') @@ -338,7 +337,7 @@ def test_detect_eof(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) fin.read() eof = fin.tell() self.assertEqual(eof, len(content)) @@ -358,7 +357,7 @@ def test_read_gzip(self): # # Make sure we're reading things correctly. # - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) as fin: self.assertEqual(fin.read(), buf.getvalue()) # @@ -370,7 +369,7 @@ def test_read_gzip(self): logger.debug('starting actual test') with self.assertApiCalls(GetObject=1): - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) as fin: with gzip.GzipFile(fileobj=fin) as zipfile: actual = zipfile.read() @@ -381,7 +380,7 @@ def test_readline(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=2): - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) as fin: fin.readline() self.assertEqual(fin.tell(), content.index(b'\n')+1) @@ -397,7 +396,7 @@ def test_readline_tiny_buffer(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, buffer_size=8) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, buffer_size=8) as fin: actual = list(fin) expected = [b'englishman\n', b'in\n', b'new\n', b'york\n'] @@ -409,7 +408,7 @@ def test_read0_does_not_return_data(self): with self.assertApiCalls(): # set defer_seek to verify that read(0) doesn't trigger an unnecessary API call - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) as fin: data = fin.read(0) self.assertEqual(data, b'') @@ -420,7 +419,7 @@ def test_to_boto3(self): with self.assertApiCalls(): # set defer_seek to verify that to_boto3() doesn't trigger an unnecessary API call - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) as fin: returned_obj = fin.to_boto3() boto3_body = returned_obj.get()['Body'].read() @@ -439,12 +438,12 @@ def test_defer_seek(self): put_to_bucket(contents=content) with self.assertApiCalls(): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) with self.assertApiCalls(GetObject=1): self.assertEqual(fin.read(), content) with self.assertApiCalls(): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) with self.assertApiCalls(GetObject=1): fin.seek(10) self.assertEqual(fin.read(), content[10:]) @@ -543,7 +542,7 @@ def test_gzip(self): with gzip.GzipFile(fileobj=fout, mode='w') as zipfile: zipfile.write(expected) - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, WRITE_KEY_NAME) as fin: + with smart_open.s3.Reader(BUCKET_NAME, WRITE_KEY_NAME) as fin: with gzip.GzipFile(fileobj=fin) as zipfile: actual = zipfile.read() @@ -939,8 +938,8 @@ def test_failure(self): @moto.mock_s3() -def test_resource_propagation_singlepart(): - """Does the resource parameter make it from the caller to Boto3?""" +def test_client_propagation_singlepart(): + """Does the client parameter make it from the caller to Boto3?""" # # Not sure why we need to create the bucket here, as setUpModule should # have done that for us by now. @@ -950,34 +949,38 @@ def test_resource_propagation_singlepart(): bucket = resource.create_bucket(Bucket=BUCKET_NAME) bucket.wait_until_exists() + client = session.client('s3') + with smart_open.s3.open( BUCKET_NAME, WRITE_KEY_NAME, mode='wb', - resource=resource, + client=client, multipart_upload=False, ) as writer: - assert writer._resource == resource - assert id(writer._resource) == id(resource) + assert writer._client == client + assert id(writer._client) == id(client) @moto.mock_s3() -def test_resource_propagation_multipart(): +def test_client_propagation_multipart(): """Does the resource parameter make it from the caller to Boto3?""" session = boto3.Session() resource = session.resource('s3') bucket = resource.create_bucket(Bucket=BUCKET_NAME) bucket.wait_until_exists() + client = session.client('s3') + with smart_open.s3.open( BUCKET_NAME, WRITE_KEY_NAME, mode='wb', - resource=resource, + client=client, multipart_upload=True, ) as writer: - assert writer._resource == resource - assert id(writer._resource) == id(resource) + assert writer._client == client + assert id(writer._client) == id(client) @moto.mock_s3() @@ -988,12 +991,14 @@ def test_resource_propagation_reader(): bucket = resource.create_bucket(Bucket=BUCKET_NAME) bucket.wait_until_exists() + client = session.client('s3') + with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, mode='wb') as writer: writer.write(b'hello world') - with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, mode='rb', resource=resource) as reader: - assert reader._resource == resource - assert id(reader._resource) == id(resource) + with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, mode='rb', client=client) as reader: + assert reader._client == client + assert id(reader._client) == id(client) if __name__ == '__main__': diff --git a/smart_open/tests/test_smart_open.py b/smart_open/tests/test_smart_open.py index 72a570d6..0490b6fd 100644 --- a/smart_open/tests/test_smart_open.py +++ b/smart_open/tests/test_smart_open.py @@ -1162,11 +1162,14 @@ def test_no_kwargs(self, mock_session): smart_open.open('s3://mybucket/mykey', transport_params=dict(defer_seek=True)) mock_session.return_value.resource.assert_called_with('s3') - @mock.patch('boto3.Session') - def test_credentials(self, mock_session): + @mock.patch('boto3.client') + def test_credentials(self, mock_client): smart_open.open('s3://access_id:access_secret@mybucket/mykey', transport_params=dict(defer_seek=True)) - mock_session.assert_called_with(aws_access_key_id='access_id', aws_secret_access_key='access_secret') - mock_session.return_value.resource.assert_called_with('s3') + mock_client.assert_called_with( + 's3', + aws_access_key_id='access_id', + aws_secret_access_key='access_secret', + ) @mock.patch('boto3.Session') def test_host(self, mock_session): @@ -1181,48 +1184,24 @@ def test_host(self, mock_session): endpoint_url='http://aa.domain.com', ) - @mock.patch('boto3.Session') - def test_s3_upload(self, mock_session): - smart_open.open( - "s3://bucket/key", 'wb', transport_params={ - 'multipart_upload_kwargs': { + @mock.patch('boto3.client') + def test_s3_upload(self, mock_client): + tp = { + 'client_kwargs': { + 'S3.Client.create_multipart_upload': { 'ServerSideEncryption': 'AES256', 'ContentType': 'application/json', } } - ) - - # Locate the s3.Object instance (mock) - s3_resource = mock_session.return_value.resource.return_value - s3_object = s3_resource.Object.return_value - - # Check that `initiate_multipart_upload` was called - # with the desired args - s3_object.initiate_multipart_upload.assert_called_with( + } + smart_open.open("s3://bucket/key", 'wb', transport_params=tp) + mock_client.return_value.create_multipart_upload.assert_called_with( + Bucket='bucket', + Key='key', ServerSideEncryption='AES256', - ContentType='application/json' + ContentType='application/json', ) - def test_session_read_mode(self): - """ - Read stream should use a custom boto3.Session - """ - session = boto3.Session() - session.resource = mock.MagicMock() - - smart_open.open('s3://bucket/key', transport_params={'session': session, 'defer_seek': True}) - session.resource.assert_called_with('s3') - - def test_session_write_mode(self): - """ - Write stream should use a custom boto3.Session - """ - session = boto3.Session() - session.resource = mock.MagicMock() - - smart_open.open('s3://bucket/key', 'wb', transport_params={'session': session}) - session.resource.assert_called_with('s3') - class SmartOpenTest(unittest.TestCase): """ @@ -1789,15 +1768,19 @@ def test_write_text_gzip(self): @mock.patch('smart_open.s3.Reader') def test_transport_params_is_not_mutable(self, mock_open): smart_open.open('s3://access_key:secret_key@host@bucket/key') - smart_open.open('s3://bucket/key') + actual = mock_open.call_args_list[0][1]['client_kwargs'] + expected = { + 'S3.Client': { + 'aws_access_key_id': 'access_key', + 'aws_secret_access_key': 'secret_key', + 'endpoint_url': 'https://host:443', + } + } + assert actual == expected - # - # The first call should have a non-null session, because the session - # keys were explicitly specified in the URL. The second call should - # _not_ have a session. - # - self.assertIsNone(mock_open.call_args_list[1][1]['session']) - self.assertIsNotNone(mock_open.call_args_list[0][1]['session']) + smart_open.open('s3://bucket/key') + actual = mock_open.call_args_list[1][1].get('client_kwargs') + assert actual is None @mock.patch('smart_open.s3.Reader') def test_respects_endpoint_url_read(self, mock_open): @@ -1805,7 +1788,7 @@ def test_respects_endpoint_url_read(self, mock_open): smart_open.open(url) expected = {'endpoint_url': 'https://play.min.io:9000'} - self.assertEqual(mock_open.call_args[1]['resource_kwargs'], expected) + self.assertEqual(mock_open.call_args[1]['client_kwargs']['S3.Client'], expected) @mock.patch('smart_open.s3.MultipartWriter') def test_respects_endpoint_url_write(self, mock_open): @@ -1813,7 +1796,7 @@ def test_respects_endpoint_url_write(self, mock_open): smart_open.open(url, 'wb') expected = {'endpoint_url': 'https://play.min.io:9000'} - self.assertEqual(mock_open.call_args[1]['resource_kwargs'], expected) + self.assertEqual(mock_open.call_args[1]['client_kwargs']['S3.Client'], expected) def function(a, b, c, foo='bar', baz='boz'): From bf42a0bc42d039cd54d46222ce747e04768c0d43 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 10 Feb 2021 10:43:51 +0900 Subject: [PATCH 02/19] wip --- smart_open/s3.py | 109 +++++++++++++++++++++------- smart_open/tests/test_smart_open.py | 67 ++++++++++------- 2 files changed, 123 insertions(+), 53 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index ef431c92..80accdf1 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -52,6 +52,26 @@ _OUT_OF_RANGE = 'InvalidRange' +def _inject(client, method_name, client_kwargs, **kwargs): + """Inject the appropriate client keyword arguments for the method. + + :param client: The client object to retrieve the method from. + :param method_name: The name of the method. + :param client_kwargs: A dictionary keyed by fully qualified method names. + :param kwargs: Keyword arguments to pass to the method directly. + + The function names are as documented here: + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client + + For example, S3.Client.create_multipart_upload. + + Returns a callable. + """ + method = getattr(client, method_name) + kwargs.update(client_kwargs.get('S3.Client.%s' % method_name, {})) + return functools.partial(method, **kwargs) + + def parse_uri(uri_as_string): # # Restrictions on bucket names and labels: @@ -134,11 +154,11 @@ def inject(**kwargs): client_kwargs = transport_params['client_kwargs'] = {} try: - ctor_kwargs = client_kwargs['S3.Client'] + init_kwargs = client_kwargs['S3.Client'] except KeyError: - ctor_kwargs = client_kwargs['S3.Client'] = {} + init_kwargs = client_kwargs['S3.Client'] = {} - ctor_kwargs.update(**kwargs) + init_kwargs.update(**kwargs) client = transport_params.get('client') if client is not None and (uri['access_id'] or uri['access_secret']): @@ -262,12 +282,20 @@ def open( return fileobj -def _get(client, bucket, key, version, **get_object_kwargs): - if version is not None: - get_object_kwargs['VersionId'] = version - +def _get(client, client_kwargs, bucket, key, version, range_string): + get_object = _inject( + client, + 'get_object', + client_kwargs, + Bucket=bucket, + Key=key, + Range=range_string, + ) try: - return client.get_object(Bucket=bucket, Key=key, **get_object_kwargs) + if version: + return get_object(VersionId=version) + else: + return get_object() except botocore.client.ClientError as error: wrapped_error = IOError( 'unable to access bucket: %r key: %r version: %r error: %s' % ( @@ -379,11 +407,11 @@ def _open_body(self, start=None, stop=None): # Optimistically try to fetch the requested content range. response = _get( self._client, + self._client_kwargs, self._bucket, self._key, - version=self._version_id, - Range=range_string, - **self._client_kwargs.get('S3.Client.get_object', {}), + self._version_id, + range_string, ) except IOError as ioe: # Handle requested content range exceeding content size. @@ -707,11 +735,12 @@ def __init__( _initialize_boto3(self, client, client_kwargs) try: - partial = functools.partial( - self._client.create_multipart_upload, + partial = _inject( + self._client, + 'create_multipart_upload', + self._client_kwargs, Bucket=bucket, Key=key, - **self._client_kwargs.get('S3.Client.create_multipart_upload', {}), ) self._upload_id = _retry_if_failed(partial)['UploadId'] except botocore.client.ClientError as error: @@ -742,8 +771,10 @@ def close(self): self._upload_next_part() if self._total_bytes and self._upload_id: - partial = functools.partial( - self._client.complete_multipart_upload, + partial = _inject( + self._client, + 'complete_multipart_upload', + self._client_kwargs, Bucket=self._bucket, Key=self._key, UploadId=self._upload_id, @@ -760,12 +791,24 @@ def close(self): # We work around this by creating an empty file explicitly. # assert self._upload_id, "no multipart upload in progress" - self._client.abort_multipart_upload( + abort = _inject( + self._client, + 'abort_multipart_upload', + self._client_kwargs, Bucket=self._bucket, Key=self._key, UploadId=self._upload_id, ) - self._client.put_object(Bucket=self._bucket, Key=self._key, Body=b'') + abort() + put = _inject( + self._client, + 'put_object', + self._client_kwargs, + Bucket=self._bucket, + Key=self._key, + Body=b'', + ) + put() logger.debug('%s: wrote 0 bytes to imitate multipart upload', self) self._upload_id = None @@ -807,11 +850,15 @@ def write(self, b): def terminate(self): """Cancel the underlying multipart upload.""" assert self._upload_id, "no multipart upload in progress" - self._client.abort_multipart_upload( + abort = _inject( + self._client, + 'abort_multipart_upload', + self._client_kwargs, Bucket=self._bucket, Key=self._key, UploadId=self._upload_id, ) + abort() self._upload_id = None def to_boto3(self, resource=None): @@ -848,8 +895,10 @@ def _upload_next_part(self): # especially robust. # upload = _retry_if_failed( - functools.partial( - self._client.upload_part, + _inject( + self._client, + 'upload_part', + self._client_kwargs, Bucket=self._bucket, Key=self._key, UploadId=self._upload_id, @@ -904,8 +953,9 @@ def __init__( _initialize_boto3(self, client, client_kwargs) + head = _inject(self._client, 'head_bucket', self._client_kwargs, Bucket=bucket) try: - self._object = self._client.head_bucket(Bucket=bucket) + head() except botocore.client.ClientError as e: raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e @@ -929,13 +979,16 @@ def close(self): self._buf.seek(0) + put = _inject( + self._client, + 'put_object', + self._client_kwargs, + Bucket=self._bucket, + Key=self._key, + Body=self._buf, + ) try: - self._client.put_object( - Bucket=self._bucket, - Key=self._key, - Body=self._buf, - **self._client_kwargs.get('S3.Client.put_object', {}), - ) + put() except botocore.client.ClientError as e: raise ValueError( 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e diff --git a/smart_open/tests/test_smart_open.py b/smart_open/tests/test_smart_open.py index 0490b6fd..ff880ecc 100644 --- a/smart_open/tests/test_smart_open.py +++ b/smart_open/tests/test_smart_open.py @@ -21,9 +21,9 @@ import boto3 import mock from moto import mock_s3 -import responses import parameterizedtestcase import pytest +import responses import smart_open from smart_open import smart_open_lib @@ -1157,10 +1157,10 @@ def test_s3_seek_moto(self): class SmartOpenS3KwargsTest(unittest.TestCase): - @mock.patch('boto3.Session') - def test_no_kwargs(self, mock_session): + @mock.patch('boto3.client') + def test_no_kwargs(self, mock_client): smart_open.open('s3://mybucket/mykey', transport_params=dict(defer_seek=True)) - mock_session.return_value.resource.assert_called_with('s3') + mock_client.assert_called_with('s3') @mock.patch('boto3.client') def test_credentials(self, mock_client): @@ -1171,16 +1171,19 @@ def test_credentials(self, mock_client): aws_secret_access_key='access_secret', ) - @mock.patch('boto3.Session') - def test_host(self, mock_session): - transport_params = {'resource_kwargs': {'endpoint_url': 'http://aa.domain.com'}, 'defer_seek': True} - smart_open.open("s3://access_id:access_secret@mybucket/mykey", transport_params=transport_params) - mock_session.assert_called_with( + @mock.patch('boto3.client') + def test_host(self, mock_client): + tp = { + 'client_kwargs': { + 'S3.Client': {'endpoint_url': 'http://aa.domain.com'}, + }, + 'defer_seek': True, + } + smart_open.open("s3://access_id:access_secret@mybucket/mykey", transport_params=tp) + mock_client.assert_called_with( + 's3', aws_access_key_id='access_id', aws_secret_access_key='access_secret', - ) - mock_session.return_value.resource.assert_called_with( - 's3', endpoint_url='http://aa.domain.com', ) @@ -1303,16 +1306,18 @@ def test_newline_csv(self): assert content == expected - @mock.patch('boto3.Session') - def test_s3_mode_mock(self, mock_session): + @mock.patch('boto3.client') + def test_s3_mode_mock(self, mock_client): """Are s3:// open modes passed correctly?""" # correct write mode, correct s3 URI - transport_params = {'resource_kwargs': {'endpoint_url': 'http://s3.amazonaws.com'}} + transport_params = { + 'client_kwargs': { + 'S3.Client': {'endpoint_url': 'http://s3.amazonaws.com'}, + } + } smart_open.open("s3://mybucket/mykey", "w", transport_params=transport_params) - mock_session.return_value.resource.assert_called_with( - 's3', endpoint_url='http://s3.amazonaws.com' - ) + mock_client.assert_called_with('s3', endpoint_url='http://s3.amazonaws.com') @mock.patch('smart_open.hdfs.subprocess') def test_hdfs(self, mock_subprocess): @@ -1362,15 +1367,19 @@ def test_s3_metadata_write(self): s3 = boto3.resource('s3') s3.create_bucket(Bucket='mybucket') - # Write data, with multipart_upload options - write_stream = smart_open.open( - 's3://mybucket/crime-and-punishment.txt.gz', 'wb', - transport_params={ - 'multipart_upload_kwargs': { + tp = { + 'client_kwargs': { + 'S3.Client.create_multipart_upload': { 'ContentType': 'text/plain', 'ContentEncoding': 'gzip', } } + } + + # Write data, with multipart_upload options + write_stream = smart_open.open( + 's3://mybucket/crime-and-punishment.txt.gz', 'wb', + transport_params=tp, ) with write_stream as fout: fout.write(data) @@ -1787,7 +1796,11 @@ def test_respects_endpoint_url_read(self, mock_open): url = 's3://key_id:secret_key@play.min.io:9000@smart-open-test/README.rst' smart_open.open(url) - expected = {'endpoint_url': 'https://play.min.io:9000'} + expected = { + 'aws_access_key_id': 'key_id', + 'aws_secret_access_key': 'secret_key', + 'endpoint_url': 'https://play.min.io:9000', + } self.assertEqual(mock_open.call_args[1]['client_kwargs']['S3.Client'], expected) @mock.patch('smart_open.s3.MultipartWriter') @@ -1795,7 +1808,11 @@ def test_respects_endpoint_url_write(self, mock_open): url = 's3://key_id:secret_key@play.min.io:9000@smart-open-test/README.rst' smart_open.open(url, 'wb') - expected = {'endpoint_url': 'https://play.min.io:9000'} + expected = { + 'aws_access_key_id': 'key_id', + 'aws_secret_access_key': 'secret_key', + 'endpoint_url': 'https://play.min.io:9000', + } self.assertEqual(mock_open.call_args[1]['client_kwargs']['S3.Client'], expected) From a916e2c4605e1446da23d40740a0eea754ee5054 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 10 Feb 2021 11:01:58 +0900 Subject: [PATCH 03/19] all tests passing --- smart_open/s3.py | 110 +++++++++++++----------------------- smart_open/tests/test_s3.py | 12 ++-- 2 files changed, 44 insertions(+), 78 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 80accdf1..dd79d3d3 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -52,24 +52,24 @@ _OUT_OF_RANGE = 'InvalidRange' -def _inject(client, method_name, client_kwargs, **kwargs): - """Inject the appropriate client keyword arguments for the method. - - :param client: The client object to retrieve the method from. - :param method_name: The name of the method. - :param client_kwargs: A dictionary keyed by fully qualified method names. - :param kwargs: Keyword arguments to pass to the method directly. - - The function names are as documented here: - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client +class _ClientWrapper: + """Wraps a client to inject the appropriate keyword args into each method call. + The keyword args are a dictionary keyed by the fully qualified method name. For example, S3.Client.create_multipart_upload. - Returns a callable. + See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client + + This wrapper behaves identically to the client otherwise. """ - method = getattr(client, method_name) - kwargs.update(client_kwargs.get('S3.Client.%s' % method_name, {})) - return functools.partial(method, **kwargs) + def __init__(self, client, kwargs): + self.client = client + self.kwargs = kwargs + + def __getattr__(self, method_name): + method = getattr(self.client, method_name) + kwargs = self.kwargs.get('S3.Client.%s' % method_name, {}) + return functools.partial(method, **kwargs) def parse_uri(uri_as_string): @@ -282,20 +282,12 @@ def open( return fileobj -def _get(client, client_kwargs, bucket, key, version, range_string): - get_object = _inject( - client, - 'get_object', - client_kwargs, - Bucket=bucket, - Key=key, - Range=range_string, - ) +def _get(client, bucket, key, version, range_string): try: if version: - return get_object(VersionId=version) + return client.get_object(Bucket=bucket, Key=key, VersionId=version, Range=range_string) else: - return get_object() + return client.get_object(Bucket=bucket, Key=key, Range=range_string) except botocore.client.ClientError as error: wrapped_error = IOError( 'unable to access bucket: %r key: %r version: %r error: %s' % ( @@ -326,7 +318,6 @@ def __init__( bucket, key, version_id=None, - client_kwargs=None, ): self._client = client self._bucket = bucket @@ -336,7 +327,6 @@ def __init__( self._content_length = None self._position = 0 self._body = None - self._client_kwargs = client_kwargs if client_kwargs else {} def seek(self, offset, whence=constants.WHENCE_START): """Seek to the specified position. @@ -407,7 +397,6 @@ def _open_body(self, start=None, stop=None): # Optimistically try to fetch the requested content range. response = _get( self._client, - self._client_kwargs, self._bucket, self._key, self._version_id, @@ -493,12 +482,13 @@ def _initialize_boto3(rw, client, client_kwargs): been already created for us and we can just reuse them.""" if client_kwargs is None: client_kwargs = {} - rw._client_kwargs = client_kwargs - if client: - rw._client = client - else: - rw._client = boto3.client('s3', **client_kwargs.get('S3.Client', {})) + if client is None: + init_kwargs = client_kwargs.get('S3.Client', {}) + client = boto3.client('s3', **init_kwargs) + assert client + + rw._client = _ClientWrapper(client, client_kwargs) class Reader(io.BufferedIOBase): @@ -529,7 +519,6 @@ def __init__( bucket, key, self._version_id, - client_kwargs=self._client_kwargs, ) self._current_pos = 0 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size) @@ -735,10 +724,8 @@ def __init__( _initialize_boto3(self, client, client_kwargs) try: - partial = _inject( - self._client, - 'create_multipart_upload', - self._client_kwargs, + partial = functools.partial( + self._client.create_multipart_upload, Bucket=bucket, Key=key, ) @@ -771,10 +758,8 @@ def close(self): self._upload_next_part() if self._total_bytes and self._upload_id: - partial = _inject( - self._client, - 'complete_multipart_upload', - self._client_kwargs, + partial = functools.partial( + self._client.complete_multipart_upload, Bucket=self._bucket, Key=self._key, UploadId=self._upload_id, @@ -791,24 +776,16 @@ def close(self): # We work around this by creating an empty file explicitly. # assert self._upload_id, "no multipart upload in progress" - abort = _inject( - self._client, - 'abort_multipart_upload', - self._client_kwargs, + self._client.abort_multipart_upload( Bucket=self._bucket, Key=self._key, UploadId=self._upload_id, ) - abort() - put = _inject( - self._client, - 'put_object', - self._client_kwargs, + self._client.put_object( Bucket=self._bucket, Key=self._key, Body=b'', ) - put() logger.debug('%s: wrote 0 bytes to imitate multipart upload', self) self._upload_id = None @@ -850,15 +827,11 @@ def write(self, b): def terminate(self): """Cancel the underlying multipart upload.""" assert self._upload_id, "no multipart upload in progress" - abort = _inject( - self._client, - 'abort_multipart_upload', - self._client_kwargs, + self._client.abort_multipart_upload( Bucket=self._bucket, Key=self._key, UploadId=self._upload_id, ) - abort() self._upload_id = None def to_boto3(self, resource=None): @@ -895,10 +868,8 @@ def _upload_next_part(self): # especially robust. # upload = _retry_if_failed( - _inject( - self._client, - 'upload_part', - self._client_kwargs, + functools.partial( + self._client.upload_part, Bucket=self._bucket, Key=self._key, UploadId=self._upload_id, @@ -953,9 +924,8 @@ def __init__( _initialize_boto3(self, client, client_kwargs) - head = _inject(self._client, 'head_bucket', self._client_kwargs, Bucket=bucket) try: - head() + self._client.head_bucket(Bucket=bucket) except botocore.client.ClientError as e: raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e @@ -979,16 +949,12 @@ def close(self): self._buf.seek(0) - put = _inject( - self._client, - 'put_object', - self._client_kwargs, - Bucket=self._bucket, - Key=self._key, - Body=self._buf, - ) try: - put() + self._client.put_object( + Bucket=self._bucket, + Key=self._key, + Body=self._buf, + ) except botocore.client.ClientError as e: raise ValueError( 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index e9d5fe36..56e73af4 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -958,8 +958,8 @@ def test_client_propagation_singlepart(): client=client, multipart_upload=False, ) as writer: - assert writer._client == client - assert id(writer._client) == id(client) + assert writer._client.client == client + assert id(writer._client.client) == id(client) @moto.mock_s3() @@ -979,8 +979,8 @@ def test_client_propagation_multipart(): client=client, multipart_upload=True, ) as writer: - assert writer._client == client - assert id(writer._client) == id(client) + assert writer._client.client == client + assert id(writer._client.client) == id(client) @moto.mock_s3() @@ -997,8 +997,8 @@ def test_resource_propagation_reader(): writer.write(b'hello world') with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, mode='rb', client=client) as reader: - assert reader._client == client - assert id(reader._client) == id(client) + assert reader._client.client == client + assert id(reader._client.client) == id(client) if __name__ == '__main__': From 685c349615dcd99b4bce6bf308779647f670a423 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 10 Feb 2021 11:14:02 +0900 Subject: [PATCH 04/19] update documentation --- README.rst | 19 ++++++++----------- howto.md | 44 +++++++++++++------------------------------- 2 files changed, 21 insertions(+), 42 deletions(-) diff --git a/README.rst b/README.rst index 2b267fa1..cb999920 100644 --- a/README.rst +++ b/README.rst @@ -154,7 +154,7 @@ For the sake of simplicity, the examples below assume you have all the dependenc ... aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'], ... ) >>> url = 's3://smart-open-py37-benchmark-results/test.txt' - >>> with open(url, 'wb', transport_params={'session': session}) as fout: + >>> with open(url, 'wb', transport_params={'client': session.client('s3')}) as fout: ... bytes_written = fout.write(b'hello world!') ... print(bytes_written) 12 @@ -182,12 +182,9 @@ For the sake of simplicity, the examples below assume you have all the dependenc print(line) # Stream to Digital Ocean Spaces bucket providing credentials from boto3 profile - transport_params = { - 'session': boto3.Session(profile_name='digitalocean'), - 'resource_kwargs': { - 'endpoint_url': 'https://ams3.digitaloceanspaces.com', - } - } + session = boto3.Session(profile_name='digitalocean') + client = session.client('s3', endpoint_url='https://ams3.digitaloceanspaces.com') + transport_params = {'client': client} with open('s3://bucket/key.txt', 'wb', transport_params=transport_params) as fout: fout.write(b'here we stand') @@ -202,7 +199,7 @@ For the sake of simplicity, the examples below assume you have all the dependenc # stream from Azure Blob Storage connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING'] transport_params = { - client: azure.storage.blob.BlobServiceClient.from_connection_string(connect_str) + 'client': azure.storage.blob.BlobServiceClient.from_connection_string(connect_str), } for line in open('azure://mycontainer/myfile.txt', transport_params=transport_params): print(line) @@ -210,7 +207,7 @@ For the sake of simplicity, the examples below assume you have all the dependenc # stream content *into* Azure Blob Storage (write mode): connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING'] transport_params = { - client: azure.storage.blob.BlobServiceClient.from_connection_string(connect_str) + 'client': azure.storage.blob.BlobServiceClient.from_connection_string(connect_str), } with open('azure://mycontainer/my_file.txt', 'wb', transport_params=transport_params) as fout: fout.write(b'hello world') @@ -264,7 +261,7 @@ Here are some examples of using this parameter: .. code-block:: python >>> import boto3 - >>> fin = open('s3://commoncrawl/robots.txt', transport_params=dict(session=boto3.Session())) + >>> fin = open('s3://commoncrawl/robots.txt', transport_params=dict(client=boto3.client('s3'))) >>> fin = open('s3://commoncrawl/robots.txt', transport_params=dict(buffer_size=1024)) For the full list of keyword arguments supported by each transport option, see the documentation: @@ -292,7 +289,7 @@ You can customize the credentials when constructing the session. aws_secret_access_key=SECRET_KEY, aws_session_token=SESSION_TOKEN, ) - fin = open('s3://bucket/key', transport_params=dict(session=session), ...) + fin = open('s3://bucket/key', transport_params=dict(client=session.client('s3')), ...) Your second option is to specify the credentials within the S3 URL itself: diff --git a/howto.md b/howto.md index b96b9238..64e5de24 100644 --- a/howto.md +++ b/howto.md @@ -73,11 +73,12 @@ The `boto3` library that `smart_open` uses for accessing S3 signs each request u If you'd like to access S3 without using an S3 account, then you need disable this signing mechanism. ```python +>>> import boto3 >>> import botocore >>> import botocore.client >>> from smart_open import open >>> config = botocore.client.Config(signature_version=botocore.UNSIGNED) ->>> params = {'resource_kwargs': {'config': config}} +>>> params = {'client': boto3.client('s3', config=config)} >>> with open('s3://commoncrawl/robots.txt', transport_params=params) as fin: ... fin.readline() 'User-Agent: *\n' @@ -175,15 +176,15 @@ s3.ObjectVersion(bucket_name='smart-open-versioned', object_key='demo.txt', id=' ## How to Read from S3 Efficiently -Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3. -By default, calling `smart_open.open` with an S3 URL will create its own boto3 session and resource. +Under the covers, `smart_open` uses the [boto3 client API](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client) to read from S3. +By default, calling `smart_open.open` with an S3 URL will create its own boto3 client. These are expensive operations: they require both CPU time to construct the objects from a low-level API definition, and memory to store the objects once they have been created. It is possible to save both CPU time and memory by sharing the same resource across multiple `smart_open.open` calls, for example: ```python >>> import boto3 >>> from smart_open import open ->>> tp = {'resource': boto3.resource('s3')} +>>> tp = {'client': boto3.client('s3')} >>> for month in (1, 2, 3): ... url = 's3://nyc-tlc/trip data/yellow_tripdata_2020-%02d.csv' % month ... with open(url, transport_params=tp) as fin: @@ -195,15 +196,7 @@ It is possible to save both CPU time and memory by sharing the same resource acr ``` -The above sharing is safe because it is all happening in the same thread and subprocess (see below for details). - -## How to Work in a Parallelized Environment - -Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3. -This API is not thread-safe or multiprocess-safe. -Do not share the same `smart_open` objects across different threads or subprocesses. -`smart_open` will create its own session and resource objects for each individual `open` call, so you don't have to worry about managing boto3 objects. -This comes at a price: each session and resource requires CPU time to create and memory to store, so be wary of keeping hundreds of threads or subprocesses reading/writing from/to S3. +Clients are thread-safe and multiprocess-safe, so you may share them between other threads and subprocesses. ## How to Specify the Request Payer (S3 only) @@ -214,7 +207,7 @@ To access such buckets, you need to pass some special transport parameters: ```python >>> from smart_open import open ->>> params = {'object_kwargs': {'RequestPayer': 'requester'}} +>>> params = {'client_kwargs': {'S3.Client.get_object': {RequestPayer': 'requester'}}} >>> with open('s3://arxiv/pdf/arXiv_pdf_manifest.xml', transport_params=params) as fin: ... print(fin.readline()) @@ -229,28 +222,15 @@ This works only when reading and writing via S3. Boto3 has a [built-in mechanism](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) for retrying after a recoverable error. You can fine-tune it using several ways: -### Pre-configuring a boto3 resource and then passing the resource to smart_open +### Pre-configuring a boto3 client and then passing the client to smart_open ```python >>> import boto3 >>> import botocore.config >>> import smart_open >>> config = botocore.config.Config(retries={'mode': 'standard'}) ->>> resource = boto3.resource('s3', config=config) ->>> tp = {'resource': resource} ->>> with smart_open.open('s3://commoncrawl/robots.txt', transport_params=tp) as fin: -... print(fin.readline()) -User-Agent: * -``` - -### Directly passing configuration as transport parameters to smart_open - -```python ->>> import boto3 ->>> import botocore.config ->>> import smart_open ->>> config = botocore.config.Config(retries={'mode': 'standard'}) ->>> tp = {'resource_kwargs': {'config': config}} +>>> client = boto3.client('s3', config=config) +>>> tp = {'client': client} >>> with smart_open.open('s3://commoncrawl/robots.txt', transport_params=tp) as fin: ... print(fin.readline()) User-Agent: * @@ -286,8 +266,10 @@ where `http://localhost:4566` is the default host/port that localstack uses to l You can now read/write to the bucket the same way you would to a real S3 bucket: ```python +>>> import boto3 >>> from smart_open import open ->>> tparams = {'resource_kwargs': {'endpoint_url': 'http://localhost:4566'}} +>>> client = boto3.client('s3', endpoint_url='http://localhost:4566') +>>> tparams = {'client': client} >>> with open('s3://mybucket/hello.txt', 'wt', transport_params=tparams) as fout: ... fout.write('hello world!') >>> with open('s3://mybucket/hello.txt', 'rt', transport_params=tparams) as fin: From c93e810a7ef983f5f43d8b3633543301fcb5fb56 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 10 Feb 2021 11:24:42 +0900 Subject: [PATCH 05/19] fix test --- smart_open/tests/test_s3.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index 56e73af4..662cde63 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -136,20 +136,19 @@ def mock_make_request(self, operation_model, *args, **kwargs): 'The test case needs a Moto server running on the local 5000 port.' ) class SeekableRawReaderTest(unittest.TestCase): - def setUp(self): self._body = b'123456' self._local_resource = boto3.resource('s3', endpoint_url='http://localhost:5000') self._local_resource.Bucket(BUCKET_NAME).create() self._local_resource.Object(BUCKET_NAME, KEY_NAME).put(Body=self._body) + self._local_client('s3', endpoint_url='http://localhost:5000') def tearDown(self): self._local_resource.Object(BUCKET_NAME, KEY_NAME).delete() self._local_resource.Bucket(BUCKET_NAME).delete() def test_read_from_a_closed_body(self): - obj = self._local_resource.Object(BUCKET_NAME, KEY_NAME) - reader = smart_open.s3._SeekableRawReader(obj) + reader = smart_open.s3._SeekableRawReader(self._local_client, BUCKET_NAME, KEY_NAME) self.assertEqual(reader.read(1), b'1') reader._body.close() self.assertEqual(reader.read(2), b'23') From 07dd26ed8457a58c2ed1ed90d91ba0d71f5ef42d Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 10 Feb 2021 11:58:02 +0900 Subject: [PATCH 06/19] fixup --- smart_open/tests/test_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index 662cde63..91c400cc 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -141,7 +141,7 @@ def setUp(self): self._local_resource = boto3.resource('s3', endpoint_url='http://localhost:5000') self._local_resource.Bucket(BUCKET_NAME).create() self._local_resource.Object(BUCKET_NAME, KEY_NAME).put(Body=self._body) - self._local_client('s3', endpoint_url='http://localhost:5000') + self._local_client = boto3.client('s3', endpoint_url='http://localhost:5000') def tearDown(self): self._local_resource.Object(BUCKET_NAME, KEY_NAME).delete() From f3b6962d31e21b6695df2be0280c8728ae8bd7db Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 10 Feb 2021 15:03:10 +0900 Subject: [PATCH 07/19] update documentation --- README.rst | 9 ++++---- help.txt | 25 ++++++++-------------- howto.md | 54 ++++++++++++++++++++++++++++++++++++++++++++++++ smart_open/s3.py | 2 ++ 4 files changed, 70 insertions(+), 20 deletions(-) diff --git a/README.rst b/README.rst index cb999920..c10aa690 100644 --- a/README.rst +++ b/README.rst @@ -278,8 +278,8 @@ S3 Credentials By default, ``smart_open`` will defer to ``boto3`` and let the latter take care of the credentials. There are several ways to override this behavior. -The first is to pass a ``boto3.Session`` object as a transport parameter to the ``open`` function. -You can customize the credentials when constructing the session. +The first is to pass a ``boto3.Client`` object as a transport parameter to the ``open`` function. +You can customize the credentials when constructing the session for the client. ``smart_open`` will then use the session when talking to S3. .. code-block:: python @@ -289,7 +289,8 @@ You can customize the credentials when constructing the session. aws_secret_access_key=SECRET_KEY, aws_session_token=SESSION_TOKEN, ) - fin = open('s3://bucket/key', transport_params=dict(client=session.client('s3')), ...) + client = session.client('s3', endpoint_url=..., config=...) + fin = open('s3://bucket/key', transport_params=dict(client=client)) Your second option is to specify the credentials within the S3 URL itself: @@ -297,7 +298,7 @@ Your second option is to specify the credentials within the S3 URL itself: fin = open('s3://aws_access_key_id:aws_secret_access_key@bucket/key', ...) -*Important*: The two methods above are **mutually exclusive**. If you pass an AWS session *and* the URL contains credentials, ``smart_open`` will ignore the latter. +*Important*: The two methods above are **mutually exclusive**. If you pass an AWS client *and* the URL contains credentials, ``smart_open`` will ignore the latter. *Important*: ``smart_open`` ignores configuration files from the older ``boto`` library. Port your old ``boto`` settings to ``boto3`` in order to use them with ``smart_open``. diff --git a/help.txt b/help.txt index 653c096a..82fe17e1 100644 --- a/help.txt +++ b/help.txt @@ -137,17 +137,6 @@ FUNCTIONS The buffer size to use when performing I/O. min_part_size: int, optional The minimum part size for multipart uploads. For writing only. - session: object, optional - The S3 session to use when working with boto3. - resource_kwargs: dict, optional - Keyword arguments to use when accessing the S3 resource for reading or writing. - multipart_upload_kwargs: dict, optional - Additional parameters to pass to boto3's initiate_multipart_upload function. - For writing only. - singlepart_upload_kwargs: dict, optional - Additional parameters to pass to boto3's S3.Object.put function when using single - part upload. - For writing only. multipart_upload: bool, optional Default: `True` If set to `True`, will use multipart upload for writing to S3. If set @@ -157,14 +146,18 @@ FUNCTIONS version_id: str, optional Version of the object, used when reading object. If None, will fetch the most recent version. - object_kwargs: dict, optional - Additional parameters to pass to boto3's object.get function. - Used during reading only. defer_seek: boolean, optional Default: `False` If set to `True` on a file opened for reading, GetObject will not be called until the first seek() or read(). Avoids redundant API queries when seeking before reading. + client: object, optional + The S3 client to use when working with boto3. + If you don't specify this, then smart_open will create a new client for you. + client_kwargs: dict, optional + Additional parameters to pass to the relevant functions of the client. + The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`. + The values are kwargs to pass to that method each time it is called. scp (smart_open/ssh.py) ~~~~~~~~~~~~~~~~~~~~~~~ @@ -318,13 +311,13 @@ FUNCTIONS s3_iter_bucket(bucket_name, prefix='', accept_key=None, key_limit=None, workers=16, retries=3, **session_kwargs) Deprecated. Use smart_open.s3.iter_bucket instead. - smart_open(uri, mode='rb', **kw) + smart_open(uri, mode='rb', buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None, ignore_extension=False, **kwargs) DATA __all__ = ['open', 'parse_uri', 'register_compressor', 's3_iter_bucket... VERSION - 2.1.1 + 4.1.2.dev0 FILE /home/misha/git/smart_open/smart_open/__init__.py diff --git a/howto.md b/howto.md index 64e5de24..8ef95d2a 100644 --- a/howto.md +++ b/howto.md @@ -245,6 +245,60 @@ logging.getLogger('smart_open.s3').setLevel(logging.DEBUG) and check the log output of your code. +## How to Pass Additional Parameters to boto3 + +`boto3` is a highly configurable library, and each function call accepts many optional parameters. +`smart_open` does not attempt to replicate this behavior, since most of these parameters often do not influence the behavior of `smart_open` itself. +Instead, `smart_open` offers the caller of the function to pass additional parameters as necessary: + +```python +>>> import boto3 +>>> client_kwargs = {'S3.Client.get_object': {RequestPayer': 'requester'}}} +>>> with open('s3://arxiv/pdf/arXiv_pdf_manifest.xml', transport_params=params) as fin: +... pass +``` + +The above example influences how the [S3.Client.get_object function](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object) gets called by `smart_open` when reading the specified URL. +More specifically, the `RequestPayer` parameter will be set to `requester` **for each call**. +Influential functions include: + +- S3.Client (the initializer function) +- S3.Client.abort_multipart_upload +- S3.Client.complete_multipart_upload +- S3.Client.create_multipart_upload +- S3.Client.get_object +- S3.Client.head_bucket +- S3.Client.put_object +- S3.Client.upload_part + +If you choose to pass additional parameters, keep the following in mind: + +1. Study the [boto3 client API](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client) and ensure the function and parameters are valid. +2. Study the [code for the smart_open.s3 submodule](smart_open/s3.py) and ensure `smart_open` is actually calling the function you're passing additional parameters for. + +Finally, in some cases, it's possible to work directly with `boto3` without going through `smart_open`. +For example, setting the ACL for an object is possible after the object is created (with `boto3`), as opposed to at creation time (with `smart_open`). +More specifically, here's the direct method: + +```python +import boto3 +import smart_open +with smart_open.open('s3://bucket/key', 'wb') as fout: + fout.write(b'hello world!') +client = boto3.client('s3').put_object_acl(ACL=acl_as_string) +``` + +Here's the same code that passes the above parameter via `smart_open`: + +```python +import smart_open +tp = {'client_kwargs': {'S3.Client.create_multipart_upload': {'ACL': acl_as_string}}} +with smart_open.open('s3://bucket/key', 'wb', transport_params=tp) as fout: + fout.write(b'hello world!') +``` + +If passing everything via `smart_open` feels awkward, try passing part of the parameters directly to `boto3`. + ## How to Read/Write from localstack [localstack](https://github.com/localstack/localstack) is a convenient test framework for developing cloud apps. diff --git a/smart_open/s3.py b/smart_open/s3.py index dd79d3d3..faf7952d 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -241,6 +241,8 @@ def open( If you don't specify this, then smart_open will create a new client for you. client_kwargs: dict, optional Additional parameters to pass to the relevant functions of the client. + The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`. + The values are kwargs to pass to that method each time it is called. """ logger.debug('%r', locals()) if mode not in constants.BINARY_MODES: From 9b72b8ebb4f90b3416c4485969b3d0c24db23ff1 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 10 Feb 2021 15:40:58 +0900 Subject: [PATCH 08/19] fixup in doc --- howto.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/howto.md b/howto.md index 8ef95d2a..feecdc9b 100644 --- a/howto.md +++ b/howto.md @@ -285,7 +285,8 @@ import boto3 import smart_open with smart_open.open('s3://bucket/key', 'wb') as fout: fout.write(b'hello world!') -client = boto3.client('s3').put_object_acl(ACL=acl_as_string) +client = boto3.client('s3') +client.put_object_acl(ACL=acl_as_string) ``` Here's the same code that passes the above parameter via `smart_open`: From 9c009ad3a7e8279376d433187ccf624695bdc07d Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Mon, 22 Feb 2021 21:55:03 +0900 Subject: [PATCH 09/19] Update smart_open/s3.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Radim Řehůřek --- smart_open/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 3e70bf11..17b64eb3 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -666,7 +666,7 @@ def to_boto3(self, resource=None): `boto3.s3.Object` may not necessarily affect the current instance. """ - if not resource: + if resource is None: resource = boto3.resource('s3') obj = resource.Object(self._bucket, self._key) if self._version_id is not None: From e8a9c5094874ac21f9ad3cc64ac2bd37cb958ca5 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Mon, 1 Mar 2021 13:01:19 +0900 Subject: [PATCH 10/19] update migration guide --- MIGRATING_FROM_OLDER_VERSIONS.rst | 72 +++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/MIGRATING_FROM_OLDER_VERSIONS.rst b/MIGRATING_FROM_OLDER_VERSIONS.rst index 98354f2c..f115672a 100644 --- a/MIGRATING_FROM_OLDER_VERSIONS.rst +++ b/MIGRATING_FROM_OLDER_VERSIONS.rst @@ -1,3 +1,75 @@ +Migrating to the new client-based S3 API +======================================== + +Version of smart_open prior to 5.0.0 used the boto3 `resource API_` for communicating with S3. +This API was easy to integrate for smart_open developers, but this came at a cost: it was not thread- or multiprocess-safe. +Furthermore, as smart_open supported more and more options, the transport parameter list grew, making it less maintainable. +Starting with version 5.0.0, smart_open uses the `client API`_ instead of the resource API. +Functionally, the little changes for the smart_open user. +The only difference is in passing transport parameters to the S3 backend. + +More specifically, the following S3 transport parameters are no longer supported: + +- `multipart_upload_kwargs` +- `object_kwargs` +- `resource` +- `resource_kwargs` +- `session` +- `singlepart_upload_kwargs` + +If you weren't using the above parameters, nothing changes for you. +However, if you were using any of the above, then you need to adjust your code. +Here are some quick recipes below. + +If you were previously passing `session`, then construct an S3 client from the session and pass that instead. +For example, before:: + + smart_open.open('s3://bucket/key', transport_params={'session': session}) + +After:: + + smart_open.open('s3://bucket/key', transport_params={'client': session.client('s3')}) + + +If you were passing `resource`, then replace the resource with a client, and pass that instead. +For example, before:: + + resource = session.resource('s3', **resource_kwargs) + smart_open.open('s3://bucket/key', transport_params={'resource': resource}) + +After:: + + client = session.client('s3') + smart_open.open('s3://bucket/key', transport_params={'client': client}) + +If you were passing any of the `*_kwargs` parameters, you will need to include them in `client_kwargs`, keeping in mind the following transformations. + +========================== ====================================== ========================== +Parameter name Resource API method Client API function +========================== ====================================== ========================== +`multipart_upload_kwargs` `s3.Object.initiate_multipart_upload`_ `create_multipart_upload`_ +`object_kwargs` `s3.Object.get`_ `get_object`_ +`resource_kwargs` ??? ??? +`singlepart_upload_kwargs` `s3.Object.put`_ `put_object`_ +========================== ====================================== ========================== + +The `client_kwargs` dict can thus contain the following members: + +- `s3.Client`: initializer parameters, e.g. those to pass directly to the `boto3.client` function +- `s3.Client.create_multipart_upload` +- `s3.Client.get_object` +- `s3.Client.put_object` + +.. _resource_API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource +.. _s3.Object.initiate_multipart_upload: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.initiate_multipart_upload +.. _s3.Object.get: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary.get +.. _s3.Object.put: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary.put + +.. _client_API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client +.. _create_multipart_upload: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_multipart_upload +.. _get_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object +.. _get_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object + Migrating to the new dependency management subsystem ==================================================== From 23333519deaea89fdb81aece0e737eb519187514 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Mon, 1 Mar 2021 13:37:58 +0900 Subject: [PATCH 11/19] fixup --- MIGRATING_FROM_OLDER_VERSIONS.rst | 34 +++++++++++++++++++------------ 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/MIGRATING_FROM_OLDER_VERSIONS.rst b/MIGRATING_FROM_OLDER_VERSIONS.rst index f115672a..1156f4a8 100644 --- a/MIGRATING_FROM_OLDER_VERSIONS.rst +++ b/MIGRATING_FROM_OLDER_VERSIONS.rst @@ -22,23 +22,26 @@ However, if you were using any of the above, then you need to adjust your code. Here are some quick recipes below. If you were previously passing `session`, then construct an S3 client from the session and pass that instead. -For example, before:: +For example, before: +.. code-block:: python smart_open.open('s3://bucket/key', transport_params={'session': session}) -After:: +After: +.. code-block:: python smart_open.open('s3://bucket/key', transport_params={'client': session.client('s3')}) - If you were passing `resource`, then replace the resource with a client, and pass that instead. -For example, before:: +For example, before: +.. code-block:: python resource = session.resource('s3', **resource_kwargs) smart_open.open('s3://bucket/key', transport_params={'resource': resource}) -After:: +After: +.. code-block:: python client = session.client('s3') smart_open.open('s3://bucket/key', transport_params={'client': client}) @@ -47,28 +50,33 @@ If you were passing any of the `*_kwargs` parameters, you will need to include t ========================== ====================================== ========================== Parameter name Resource API method Client API function ========================== ====================================== ========================== -`multipart_upload_kwargs` `s3.Object.initiate_multipart_upload`_ `create_multipart_upload`_ -`object_kwargs` `s3.Object.get`_ `get_object`_ -`resource_kwargs` ??? ??? -`singlepart_upload_kwargs` `s3.Object.put`_ `put_object`_ +`multipart_upload_kwargs` `s3.Object.initiate_multipart_upload`_ `s3.Client.create_multipart_upload`_ +`object_kwargs` `s3.Object.get`_ `s3.Client.get_object`_ +`resource_kwargs` s3.resource `s3.Client` +`singlepart_upload_kwargs` `s3.Object.put`_ `s3.Client.put_object`_ ========================== ====================================== ========================== +Most of the above is self-explanatory, with the exception of `resource_kwargs`. +These were previously used mostly for passing a custom endpoint URL. + The `client_kwargs` dict can thus contain the following members: -- `s3.Client`: initializer parameters, e.g. those to pass directly to the `boto3.client` function +- `s3.Client`: initializer parameters, e.g. those to pass directly to the `boto3.client` function, such as `endpoint_url`. - `s3.Client.create_multipart_upload` - `s3.Client.get_object` - `s3.Client.put_object` +See `README `_ and `HOWTO `_ for more examples. + .. _resource_API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource .. _s3.Object.initiate_multipart_upload: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.initiate_multipart_upload .. _s3.Object.get: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary.get .. _s3.Object.put: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary.put .. _client_API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client -.. _create_multipart_upload: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_multipart_upload -.. _get_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object -.. _get_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object +.. _s3.Client.create_multipart_upload: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_multipart_upload +.. _s3.Client.get_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object +.. _s3.Client.put_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object Migrating to the new dependency management subsystem ==================================================== From 491dacc3293f0d6af7d6e14c291ded2a97aeafd7 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Mon, 1 Mar 2021 13:38:55 +0900 Subject: [PATCH 12/19] fixup --- MIGRATING_FROM_OLDER_VERSIONS.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/MIGRATING_FROM_OLDER_VERSIONS.rst b/MIGRATING_FROM_OLDER_VERSIONS.rst index 1156f4a8..106dd142 100644 --- a/MIGRATING_FROM_OLDER_VERSIONS.rst +++ b/MIGRATING_FROM_OLDER_VERSIONS.rst @@ -74,6 +74,7 @@ See `README `_ and `HOWTO `_ for more examples. .. _s3.Object.put: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary.put .. _client_API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client +.. _s3.Client: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client .. _s3.Client.create_multipart_upload: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_multipart_upload .. _s3.Client.get_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object .. _s3.Client.put_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object From 467ccdd296ac5e1be55b7094139211cd2364ae3a Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Mon, 1 Mar 2021 13:40:20 +0900 Subject: [PATCH 13/19] fixup --- MIGRATING_FROM_OLDER_VERSIONS.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/MIGRATING_FROM_OLDER_VERSIONS.rst b/MIGRATING_FROM_OLDER_VERSIONS.rst index 106dd142..5844cd0d 100644 --- a/MIGRATING_FROM_OLDER_VERSIONS.rst +++ b/MIGRATING_FROM_OLDER_VERSIONS.rst @@ -68,12 +68,12 @@ The `client_kwargs` dict can thus contain the following members: See `README `_ and `HOWTO `_ for more examples. -.. _resource_API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource +.. _resource API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource .. _s3.Object.initiate_multipart_upload: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.initiate_multipart_upload .. _s3.Object.get: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary.get .. _s3.Object.put: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary.put -.. _client_API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client +.. _client API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client .. _s3.Client: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client .. _s3.Client.create_multipart_upload: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_multipart_upload .. _s3.Client.get_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object From 9cc714393386e5a2e83c3619c2ccb4c4e1b58ea1 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Mon, 1 Mar 2021 13:41:10 +0900 Subject: [PATCH 14/19] fixup --- MIGRATING_FROM_OLDER_VERSIONS.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MIGRATING_FROM_OLDER_VERSIONS.rst b/MIGRATING_FROM_OLDER_VERSIONS.rst index 5844cd0d..40430019 100644 --- a/MIGRATING_FROM_OLDER_VERSIONS.rst +++ b/MIGRATING_FROM_OLDER_VERSIONS.rst @@ -1,7 +1,7 @@ Migrating to the new client-based S3 API ======================================== -Version of smart_open prior to 5.0.0 used the boto3 `resource API_` for communicating with S3. +Version of smart_open prior to 5.0.0 used the boto3 `resource API`_ for communicating with S3. This API was easy to integrate for smart_open developers, but this came at a cost: it was not thread- or multiprocess-safe. Furthermore, as smart_open supported more and more options, the transport parameter list grew, making it less maintainable. Starting with version 5.0.0, smart_open uses the `client API`_ instead of the resource API. From bc1876ee1ca9bd9e5121f744a762a203732d792e Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Mon, 1 Mar 2021 13:42:08 +0900 Subject: [PATCH 15/19] fixup --- MIGRATING_FROM_OLDER_VERSIONS.rst | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/MIGRATING_FROM_OLDER_VERSIONS.rst b/MIGRATING_FROM_OLDER_VERSIONS.rst index 40430019..293266f7 100644 --- a/MIGRATING_FROM_OLDER_VERSIONS.rst +++ b/MIGRATING_FROM_OLDER_VERSIONS.rst @@ -5,7 +5,7 @@ Version of smart_open prior to 5.0.0 used the boto3 `resource API`_ for communic This API was easy to integrate for smart_open developers, but this came at a cost: it was not thread- or multiprocess-safe. Furthermore, as smart_open supported more and more options, the transport parameter list grew, making it less maintainable. Starting with version 5.0.0, smart_open uses the `client API`_ instead of the resource API. -Functionally, the little changes for the smart_open user. +Functionally, very little changes for the smart_open user. The only difference is in passing transport parameters to the S3 backend. More specifically, the following S3 transport parameters are no longer supported: @@ -25,23 +25,27 @@ If you were previously passing `session`, then construct an S3 client from the s For example, before: .. code-block:: python + smart_open.open('s3://bucket/key', transport_params={'session': session}) After: .. code-block:: python + smart_open.open('s3://bucket/key', transport_params={'client': session.client('s3')}) If you were passing `resource`, then replace the resource with a client, and pass that instead. For example, before: .. code-block:: python + resource = session.resource('s3', **resource_kwargs) smart_open.open('s3://bucket/key', transport_params={'resource': resource}) After: .. code-block:: python + client = session.client('s3') smart_open.open('s3://bucket/key', transport_params={'client': client}) From 3b2fa86bb3e85600396b7478b0ef32fbd0edb4d9 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Mon, 1 Mar 2021 13:43:01 +0900 Subject: [PATCH 16/19] fixup --- MIGRATING_FROM_OLDER_VERSIONS.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MIGRATING_FROM_OLDER_VERSIONS.rst b/MIGRATING_FROM_OLDER_VERSIONS.rst index 293266f7..cb057aaf 100644 --- a/MIGRATING_FROM_OLDER_VERSIONS.rst +++ b/MIGRATING_FROM_OLDER_VERSIONS.rst @@ -56,7 +56,7 @@ Parameter name Resource API method Client API fun ========================== ====================================== ========================== `multipart_upload_kwargs` `s3.Object.initiate_multipart_upload`_ `s3.Client.create_multipart_upload`_ `object_kwargs` `s3.Object.get`_ `s3.Client.get_object`_ -`resource_kwargs` s3.resource `s3.Client` +`resource_kwargs` s3.resource `s3.client`_ `singlepart_upload_kwargs` `s3.Object.put`_ `s3.Client.put_object`_ ========================== ====================================== ========================== From 0dcfe1827cc9cf19a3e8f99ed28edb23cf4e0b90 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Mon, 1 Mar 2021 19:30:44 +0900 Subject: [PATCH 17/19] Update MIGRATING_FROM_OLDER_VERSIONS.rst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Radim Řehůřek --- MIGRATING_FROM_OLDER_VERSIONS.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/MIGRATING_FROM_OLDER_VERSIONS.rst b/MIGRATING_FROM_OLDER_VERSIONS.rst index cb057aaf..874c36c7 100644 --- a/MIGRATING_FROM_OLDER_VERSIONS.rst +++ b/MIGRATING_FROM_OLDER_VERSIONS.rst @@ -17,7 +17,8 @@ More specifically, the following S3 transport parameters are no longer supported - `session` - `singlepart_upload_kwargs` -If you weren't using the above parameters, nothing changes for you. +**If you weren't using the above parameters, nothing changes for you.** + However, if you were using any of the above, then you need to adjust your code. Here are some quick recipes below. @@ -196,4 +197,3 @@ or view the help online `here Date: Mon, 1 Mar 2021 21:34:37 +0900 Subject: [PATCH 18/19] more examples --- MIGRATING_FROM_OLDER_VERSIONS.rst | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/MIGRATING_FROM_OLDER_VERSIONS.rst b/MIGRATING_FROM_OLDER_VERSIONS.rst index 874c36c7..23d74a47 100644 --- a/MIGRATING_FROM_OLDER_VERSIONS.rst +++ b/MIGRATING_FROM_OLDER_VERSIONS.rst @@ -71,6 +71,24 @@ The `client_kwargs` dict can thus contain the following members: - `s3.Client.get_object` - `s3.Client.put_object` +Here's a before-and-after example for connecting to a custom endpoint. Before: + +.. code-block:: python + + session = boto3.Session(profile_name='digitalocean') + resource_kwargs = {'endpoint_url': 'https://ams3.digitaloceanspaces.com'} + with open('s3://bucket/key.txt', 'wb', transport_params={'resource_kwarg': resource_kwargs}) as fout: + fout.write(b'here we stand') + +After: + +.. code-block:: python + + session = boto3.Session(profile_name='digitalocean') + client = session.client('s3', endpoint_url='https://ams3.digitaloceanspaces.com') + with open('s3://bucket/key.txt', 'wb', transport_params={'client': client}) as fout: + fout.write(b'here we stand') + See `README `_ and `HOWTO `_ for more examples. .. _resource API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource From 27e4f46597b5c4cecb780c0dd3bace4dea7c12e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radim=20=C5=98eh=C5=AF=C5=99ek?= Date: Mon, 1 Mar 2021 13:42:29 +0100 Subject: [PATCH 19/19] Update MIGRATING_FROM_OLDER_VERSIONS.rst --- MIGRATING_FROM_OLDER_VERSIONS.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/MIGRATING_FROM_OLDER_VERSIONS.rst b/MIGRATING_FROM_OLDER_VERSIONS.rst index 23d74a47..6d6261ff 100644 --- a/MIGRATING_FROM_OLDER_VERSIONS.rst +++ b/MIGRATING_FROM_OLDER_VERSIONS.rst @@ -4,6 +4,7 @@ Migrating to the new client-based S3 API Version of smart_open prior to 5.0.0 used the boto3 `resource API`_ for communicating with S3. This API was easy to integrate for smart_open developers, but this came at a cost: it was not thread- or multiprocess-safe. Furthermore, as smart_open supported more and more options, the transport parameter list grew, making it less maintainable. + Starting with version 5.0.0, smart_open uses the `client API`_ instead of the resource API. Functionally, very little changes for the smart_open user. The only difference is in passing transport parameters to the S3 backend.