Skip to content

Commit

Permalink
Shutdown IO queue when file can't be opened
Browse files Browse the repository at this point in the history
If any errors happen while processing IO (including
errors opening the destination file), we should shutdown
the IO queue.

As part of this change I added an integration test.  This also
demonstrated an issue with python3 retrying socket.errors because
they're now aliased to OSErrors.  I updated the compat module
to give us the appropriate "something went wrong on the socket"
exception based on python version.

Fixes #367.
  • Loading branch information
jamesls committed Feb 3, 2016
1 parent 8ce95c8 commit ac346ec
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 19 deletions.
12 changes: 12 additions & 0 deletions boto3/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@
import sys
import os
import errno
import socket

from botocore.vendored import six

if six.PY3:
# In python3, socket.error is OSError, which is too general
# for what we want (i.e FileNotFoundError is a subclass of OSError).
# In py3 all the socket related errors are in a newly created
# ConnectionError
SOCKET_ERROR = ConnectionError
else:
SOCKET_ERROR = socket.error


if sys.platform.startswith('win'):
Expand Down
33 changes: 18 additions & 15 deletions boto3/s3/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ def __call__(self, bytes_amount):

MB = 1024 * 1024
SHUTDOWN_SENTINEL = object()
S3_RETRYABLE_ERRORS = (
socket.timeout, boto3.compat.SOCKET_ERROR,
ReadTimeoutError, IncompleteReadError
)


def random_file_extension(num_digits=8):
Expand Down Expand Up @@ -523,8 +527,7 @@ def _download_range(self, bucket, key, filename,
self._ioqueue.put((current_index, chunk))
current_index += len(chunk)
return
except (socket.timeout, socket.error,
ReadTimeoutError, IncompleteReadError) as e:
except S3_RETRYABLE_ERRORS as e:
logger.debug("Retrying exception caught (%s), "
"retrying request, (attempt %s / %s)", e, i,
max_attempts, exc_info=True)
Expand All @@ -535,6 +538,15 @@ def _download_range(self, bucket, key, filename,
logger.debug("EXITING _download_range for part: %s", part_index)

def _perform_io_writes(self, filename):
try:
self._loop_on_io_writes(filename)
except Exception as e:
logger.debug("Caught exception in IO thread: %s",
e, exc_info=True)
self._ioqueue.trigger_shutdown()
raise

def _loop_on_io_writes(self, filename):
with self._os.open(filename, 'wb') as f:
while True:
task = self._ioqueue.get()
Expand All @@ -543,15 +555,9 @@ def _perform_io_writes(self, filename):
"shutting down IO handler.")
return
else:
try:
offset, data = task
f.seek(offset)
f.write(data)
except Exception as e:
logger.debug("Caught exception in IO thread: %s",
e, exc_info=True)
self._ioqueue.trigger_shutdown()
raise
offset, data = task
f.seek(offset)
f.write(data)


class TransferConfig(object):
Expand Down Expand Up @@ -699,10 +705,7 @@ def _get_object(self, bucket, key, filename, extra_args, callback):
try:
return self._do_get_object(bucket, key, filename,
extra_args, callback)
except (socket.timeout, socket.error,
ReadTimeoutError, IncompleteReadError) as e:
# TODO: we need a way to reset the callback if the
# download failed.
except S3_RETRYABLE_ERRORS as e:
logger.debug("Retrying exception caught (%s), "
"retrying request, (attempt %s / %s)", e, i,
max_attempts, exc_info=True)
Expand Down
26 changes: 26 additions & 0 deletions tests/integration/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,32 @@ def test_download_above_threshold(self):
download_path)
assert_files_equal(filename, download_path)

def test_download_file_with_directory_not_exist(self):
transfer = self.create_s3_transfer()
self.client.put_object(Bucket=self.bucket_name,
Key='foo.txt',
Body=b'foo')
self.addCleanup(self.delete_object, 'foo.txt')
download_path = os.path.join(self.files.rootdir, 'a', 'b', 'c',
'downloaded.txt')
with self.assertRaises(IOError):
transfer.download_file(self.bucket_name, 'foo.txt', download_path)

def test_download_large_file_directory_not_exist(self):
transfer = self.create_s3_transfer()

filename = self.files.create_file_with_size(
'foo.txt', filesize=20 * 1024 * 1024)
with open(filename, 'rb') as f:
self.client.put_object(Bucket=self.bucket_name,
Key='foo.txt',
Body=f)
self.addCleanup(self.delete_object, 'foo.txt')
download_path = os.path.join(self.files.rootdir, 'a', 'b', 'c',
'downloaded.txt')
with self.assertRaises(IOError):
transfer.download_file(self.bucket_name, 'foo.txt', download_path)

def test_transfer_methods_through_client(self):
# This is really just a sanity check to ensure that the interface
# from the clients work. We're not exhaustively testing through
Expand Down
24 changes: 20 additions & 4 deletions tests/unit/s3/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def test_retry_on_failures_from_stream_reads(self):
response_body = b'foobarbaz'
stream_with_errors = mock.Mock()
stream_with_errors.read.side_effect = [
socket.error("fake error"),
socket.timeout("fake error"),
response_body
]
client.get_object.return_value = {'Body': stream_with_errors}
Expand Down Expand Up @@ -429,7 +429,7 @@ def test_exception_raised_on_exceeded_retries(self):
client = mock.Mock()
response_body = b'foobarbaz'
stream_with_errors = mock.Mock()
stream_with_errors.read.side_effect = socket.error("fake error")
stream_with_errors.read.side_effect = socket.timeout("fake error")
client.get_object.return_value = {'Body': stream_with_errors}
config = TransferConfig(multipart_threshold=4,
multipart_chunksize=4)
Expand Down Expand Up @@ -459,6 +459,22 @@ def test_io_thread_failure_triggers_shutdown(self):
downloader.download_file('bucket', 'key', 'filename',
len(response_body), {})

def test_io_thread_fails_to_open_triggers_shutdown_error(self):
client = mock.Mock()
client.get_object.return_value = {
'Body': six.BytesIO(b'asdf')
}
os_layer = mock.Mock(spec=OSUtils)
os_layer.open.side_effect = IOError("Can't open file")
downloader = MultipartDownloader(
client, TransferConfig(),
os_layer, SequentialExecutor)
# We're verifying that the exception raised from the IO future
# propogates back up via download_file().
with self.assertRaisesRegexp(IOError, "Can't open file"):
downloader.download_file('bucket', 'key', 'filename',
len(b'asdf'), {})

def test_download_futures_fail_triggers_shutdown(self):
class FailedDownloadParts(SequentialExecutor):
def __init__(self, max_workers):
Expand Down Expand Up @@ -619,7 +635,7 @@ def test_get_object_stream_is_retried_and_succeeds(self):
'ContentLength': below_threshold}
self.client.get_object.side_effect = [
# First request fails.
socket.error("fake error"),
socket.timeout("fake error"),
# Second succeeds.
{'Body': six.BytesIO(b'foobar')}
]
Expand All @@ -636,7 +652,7 @@ def test_get_object_stream_uses_all_retries_and_errors_out(self):
# Here we're raising an exception every single time, which
# will exhaust our retry count and propogate a
# RetriesExceededError.
self.client.get_object.side_effect = socket.error("fake error")
self.client.get_object.side_effect = socket.timeout("fake error")
with self.assertRaises(RetriesExceededError):
transfer.download_file('bucket', 'key', 'smallfile')

Expand Down

0 comments on commit ac346ec

Please sign in to comment.