Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes "the connection was closed by the remote peer" error #389

Merged
merged 8 commits into from
Nov 24, 2019

Conversation

Gapex
Copy link
Contributor

@Gapex Gapex commented Nov 13, 2019

SeekableRawReader reopens a connection after getting a IncompleteReadError

Motivation

The current SeekableRawReader sends HTTP request in the seek() method, and keeps an continuous connection with the remote peer. But if the SeekableRawReader does not read from the connection immediately after seek(), the remote peer may close the connection due to a timeout. Then, the SeekableRawReader attempts to read on a StreamingBody whose underlying connection had been closed, and gets a botocore.exceptions.IncompleteReadError, the following example shows the case:

from boto3 import resource

endpoints = [
    'http://xxxxxxxxxxxxx', # real s3
    'http://localhost:8000' # moto s3 server
]

real_s3_resource = resource('s3', endpoint_url=endpoints[0])
moto_s3_resource = resource('s3', endpoint_url=endpoints[1])

# Test read from a closed streaming body.

# For real s3.
obj = real_s3_resource.Object('gyp', 'images.msg')
body = obj.get()['Body']
print(type(body)) # <class 'botocore.response.StreamingBody'>
print(body.read(1)) # b'\xdd'
body.close() # Close the body.
try:
    body.read(1)
except Exception as e:
    print(type(e)) # <class 'botocore.exceptions.IncompleteReadError'>


# For moto s3 server.
moto_s3_resource.Bucket('gyp').create()
moto_s3_resource.Object('gyp', 'test-key').put(Body=b'123456')
obj = moto_s3_resource.Object('gyp', 'test-key')
body = obj.get()['Body']
print(type(body)) # <class 'botocore.response.StreamingBody'>
print(body.read(1)) # b'1'
body.close() # Close the body.
try:
    body.read(1)
except Exception as e:
    print(type(e)) # <class 'botocore.exceptions.IncompleteReadError'>

In the above code, the body is closed by client actively to stimulate connection closed by the remote peer.

In the PR, I intended to delay the action of sending HTTP request until read, the solution solved the problem of closed connection between a pair of seek() - read(), but introduced a similar problem between continuous read(), namely, the connection may also be closed by the remote peer after a read, if the second read occurs a long time later.

In this PR, I try to catch the IncompleteReadError in the read() directly, if connection is close, and IncompleteReadError is raised,

  1. Catch the exception and dereference the current StreamingBody,
  2. Call the seek() from the current offset to build a new connection with remote peer.
  3. Finally, read from the new connection as required.

@Gapex
Copy link
Contributor Author

Gapex commented Nov 13, 2019

@mpenkov

  • The moto uses a file object to mock StreamingBody, and reading from a closed file can not actully raises botocore.exceptions.IncompleteReadError.
  • So, I need to skip the test case in 'Mock Environment'.
  • I notice that The travis unsets the SO_DISABLE_MOCKS env variable, thus all s3 tests were using moto.
  • In order to test the modified part of code, a real connection maybe is needed.
from boto3 import resource

mocker = moto.mock_s3()
mocker.start()
r = resource('s3')
r.Bucket('bucket').create()
r.Object('bucket', 'key').put(Body=b'123456')
obj = r.Object('bucket', 'key')
body = obj.get()['Body']
print(body.read(1)) # b'1'
body.close()
try:
    print(body.read(1))
except Exception as e:
    print(type(e)) # <class 'ValueError'>
    print(e) # I/O operation on closed file.
mocker.stop()

@vuryleo
Copy link

vuryleo commented Nov 18, 2019

I would think it's an issue related to moto. We may need to raise it there. :(

Copy link
Collaborator

@mpenkov mpenkov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know when this is no longer WIP and I'll have a closer look.

The overall direction looks right to me.

class SeekableRawReaderTest(unittest.TestCase):

def setUp(self):
self._local_resoruce = boto3.resource('s3', endpoint_url='http://localhost:5000')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo:

Suggested change
self._local_resoruce = boto3.resource('s3', endpoint_url='http://localhost:5000')
self._local_resource = boto3.resource('s3', endpoint_url='http://localhost:5000')

smart_open/tests/test_s3.py Show resolved Hide resolved
@Gapex Gapex changed the title WIP: Fixup the connection was closed by the remote peer Fixes the connection was closed by the remote peer Nov 19, 2019
@mpenkov mpenkov changed the title Fixes the connection was closed by the remote peer Fixes "the connection was closed by the remote peer" error Nov 24, 2019
@mpenkov mpenkov merged commit fa9e755 into piskvorky:master Nov 24, 2019
@mpenkov
Copy link
Collaborator

mpenkov commented Nov 24, 2019

@Gapex Thank you for your patience! This is a very welcome contribution. Keep them coming!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

s3 open seek operation try read rest of file into buffer, which makes following read has timeout risk
3 participants