From ac346eced56019976c5dffc702e35cfafe48c580 Mon Sep 17 00:00:00 2001 From: James Saryerwinnie Date: Wed, 3 Feb 2016 10:06:58 -0800 Subject: [PATCH] Shutdown IO queue when file can't be opened If any errors happen while processing IO (including errors opening the destination file), we should shutdown the IO queue. As part of this change I added an integration test. This also demonstrated an issue with python3 retrying socket.errors because they're now aliased to OSErrors. I updated the compat module to give us the appropriate "something went wrong on the socket" exception based on python version. Fixes #367. --- boto3/compat.py | 12 ++++++++++++ boto3/s3/transfer.py | 33 ++++++++++++++++++--------------- tests/integration/test_s3.py | 26 ++++++++++++++++++++++++++ tests/unit/s3/test_transfer.py | 24 ++++++++++++++++++++---- 4 files changed, 76 insertions(+), 19 deletions(-) diff --git a/boto3/compat.py b/boto3/compat.py index 324b60a281..3a7a832f65 100644 --- a/boto3/compat.py +++ b/boto3/compat.py @@ -13,6 +13,18 @@ import sys import os import errno +import socket + +from botocore.vendored import six + +if six.PY3: + # In python3, socket.error is OSError, which is too general + # for what we want (i.e FileNotFoundError is a subclass of OSError). + # In py3 all the socket related errors are in a newly created + # ConnectionError + SOCKET_ERROR = ConnectionError +else: + SOCKET_ERROR = socket.error if sys.platform.startswith('win'): diff --git a/boto3/s3/transfer.py b/boto3/s3/transfer.py index 1707dc2c2e..096c943413 100644 --- a/boto3/s3/transfer.py +++ b/boto3/s3/transfer.py @@ -148,6 +148,10 @@ def __call__(self, bytes_amount): MB = 1024 * 1024 SHUTDOWN_SENTINEL = object() +S3_RETRYABLE_ERRORS = ( + socket.timeout, boto3.compat.SOCKET_ERROR, + ReadTimeoutError, IncompleteReadError +) def random_file_extension(num_digits=8): @@ -523,8 +527,7 @@ def _download_range(self, bucket, key, filename, self._ioqueue.put((current_index, chunk)) current_index += len(chunk) return - except (socket.timeout, socket.error, - ReadTimeoutError, IncompleteReadError) as e: + except S3_RETRYABLE_ERRORS as e: logger.debug("Retrying exception caught (%s), " "retrying request, (attempt %s / %s)", e, i, max_attempts, exc_info=True) @@ -535,6 +538,15 @@ def _download_range(self, bucket, key, filename, logger.debug("EXITING _download_range for part: %s", part_index) def _perform_io_writes(self, filename): + try: + self._loop_on_io_writes(filename) + except Exception as e: + logger.debug("Caught exception in IO thread: %s", + e, exc_info=True) + self._ioqueue.trigger_shutdown() + raise + + def _loop_on_io_writes(self, filename): with self._os.open(filename, 'wb') as f: while True: task = self._ioqueue.get() @@ -543,15 +555,9 @@ def _perform_io_writes(self, filename): "shutting down IO handler.") return else: - try: - offset, data = task - f.seek(offset) - f.write(data) - except Exception as e: - logger.debug("Caught exception in IO thread: %s", - e, exc_info=True) - self._ioqueue.trigger_shutdown() - raise + offset, data = task + f.seek(offset) + f.write(data) class TransferConfig(object): @@ -699,10 +705,7 @@ def _get_object(self, bucket, key, filename, extra_args, callback): try: return self._do_get_object(bucket, key, filename, extra_args, callback) - except (socket.timeout, socket.error, - ReadTimeoutError, IncompleteReadError) as e: - # TODO: we need a way to reset the callback if the - # download failed. + except S3_RETRYABLE_ERRORS as e: logger.debug("Retrying exception caught (%s), " "retrying request, (attempt %s / %s)", e, i, max_attempts, exc_info=True) diff --git a/tests/integration/test_s3.py b/tests/integration/test_s3.py index 4eec2eb58b..583bfd8381 100644 --- a/tests/integration/test_s3.py +++ b/tests/integration/test_s3.py @@ -484,6 +484,32 @@ def test_download_above_threshold(self): download_path) assert_files_equal(filename, download_path) + def test_download_file_with_directory_not_exist(self): + transfer = self.create_s3_transfer() + self.client.put_object(Bucket=self.bucket_name, + Key='foo.txt', + Body=b'foo') + self.addCleanup(self.delete_object, 'foo.txt') + download_path = os.path.join(self.files.rootdir, 'a', 'b', 'c', + 'downloaded.txt') + with self.assertRaises(IOError): + transfer.download_file(self.bucket_name, 'foo.txt', download_path) + + def test_download_large_file_directory_not_exist(self): + transfer = self.create_s3_transfer() + + filename = self.files.create_file_with_size( + 'foo.txt', filesize=20 * 1024 * 1024) + with open(filename, 'rb') as f: + self.client.put_object(Bucket=self.bucket_name, + Key='foo.txt', + Body=f) + self.addCleanup(self.delete_object, 'foo.txt') + download_path = os.path.join(self.files.rootdir, 'a', 'b', 'c', + 'downloaded.txt') + with self.assertRaises(IOError): + transfer.download_file(self.bucket_name, 'foo.txt', download_path) + def test_transfer_methods_through_client(self): # This is really just a sanity check to ensure that the interface # from the clients work. We're not exhaustively testing through diff --git a/tests/unit/s3/test_transfer.py b/tests/unit/s3/test_transfer.py index 5a053ce1fb..f9ec75f9de 100644 --- a/tests/unit/s3/test_transfer.py +++ b/tests/unit/s3/test_transfer.py @@ -398,7 +398,7 @@ def test_retry_on_failures_from_stream_reads(self): response_body = b'foobarbaz' stream_with_errors = mock.Mock() stream_with_errors.read.side_effect = [ - socket.error("fake error"), + socket.timeout("fake error"), response_body ] client.get_object.return_value = {'Body': stream_with_errors} @@ -429,7 +429,7 @@ def test_exception_raised_on_exceeded_retries(self): client = mock.Mock() response_body = b'foobarbaz' stream_with_errors = mock.Mock() - stream_with_errors.read.side_effect = socket.error("fake error") + stream_with_errors.read.side_effect = socket.timeout("fake error") client.get_object.return_value = {'Body': stream_with_errors} config = TransferConfig(multipart_threshold=4, multipart_chunksize=4) @@ -459,6 +459,22 @@ def test_io_thread_failure_triggers_shutdown(self): downloader.download_file('bucket', 'key', 'filename', len(response_body), {}) + def test_io_thread_fails_to_open_triggers_shutdown_error(self): + client = mock.Mock() + client.get_object.return_value = { + 'Body': six.BytesIO(b'asdf') + } + os_layer = mock.Mock(spec=OSUtils) + os_layer.open.side_effect = IOError("Can't open file") + downloader = MultipartDownloader( + client, TransferConfig(), + os_layer, SequentialExecutor) + # We're verifying that the exception raised from the IO future + # propogates back up via download_file(). + with self.assertRaisesRegexp(IOError, "Can't open file"): + downloader.download_file('bucket', 'key', 'filename', + len(b'asdf'), {}) + def test_download_futures_fail_triggers_shutdown(self): class FailedDownloadParts(SequentialExecutor): def __init__(self, max_workers): @@ -619,7 +635,7 @@ def test_get_object_stream_is_retried_and_succeeds(self): 'ContentLength': below_threshold} self.client.get_object.side_effect = [ # First request fails. - socket.error("fake error"), + socket.timeout("fake error"), # Second succeeds. {'Body': six.BytesIO(b'foobar')} ] @@ -636,7 +652,7 @@ def test_get_object_stream_uses_all_retries_and_errors_out(self): # Here we're raising an exception every single time, which # will exhaust our retry count and propogate a # RetriesExceededError. - self.client.get_object.side_effect = socket.error("fake error") + self.client.get_object.side_effect = socket.timeout("fake error") with self.assertRaises(RetriesExceededError): transfer.download_file('bucket', 'key', 'smallfile')