-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[REVIEW] Enable writing to s3
storage in chunked parquet writer
#10769
Conversation
Codecov Report
@@ Coverage Diff @@
## branch-22.06 #10769 +/- ##
================================================
- Coverage 86.40% 86.27% -0.13%
================================================
Files 143 144 +1
Lines 22448 22565 +117
================================================
+ Hits 19396 19468 +72
- Misses 3052 3097 +45
Continue to review full report at Codecov.
|
s3
storage in chunked parquet writer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving ops-codeowner
file changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @galipremsagar!
python/cudf/cudf/tests/test_s3.py
Outdated
@pytest.fixture(scope="session") | ||
def endpoint_port(): | ||
# Return a free port per worker session. | ||
sock = socket.socket() | ||
sock.bind(("", 0)) | ||
return sock.getsockname()[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Does this resource get closed cleanly?
https://docs.python.org/3/library/socket.html#socket.socket.close
Sockets are automatically closed when they are garbage-collected, but it is recommended to close() them explicitly, or to use a with statement around them.
- We are binding the socket to listen on
""
which maps to listening on all addresses (including public addresses, which is a bad safety practice). As far as I can tell, we only access it from127.0.0.1
. The socket should be bound to127.0.0.1
instead.
@pytest.fixture(scope="session") | |
def endpoint_port(): | |
# Return a free port per worker session. | |
sock = socket.socket() | |
sock.bind(("", 0)) | |
return sock.getsockname()[1] | |
@pytest.fixture(scope="session") | |
def endpoint_port(): | |
# Return a free port per worker session. | |
with socket.socket() as sock: | |
sock.bind(("127.0.0.1", 0)) | |
yield sock.getsockname()[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did make the above change locally but it seems to error consistently, did it work for you?:
During handling of the above exception, another exception occurred:
s3_base = 'http://127.0.0.1:47913/', s3so = {'client_kwargs': {'endpoint_url': 'http://127.0.0.1:47913/'}}
pdf = Integer Float Integer2 String Boolean
0 2345 9.001 2345 Alpha True
1 11987 8.343 106 Beta False
2 9027 6.000 2088 Gamma True
3 9027 2.781 789277 Delta False
bytes_per_thread = 32
@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
def test_read_csv(s3_base, s3so, pdf, bytes_per_thread):
# Write to buffer
fname = "test_csv_reader.csv"
bname = "csv"
buffer = pdf.to_csv(index=False)
# Use fsspec file object
> with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
python/cudf/cudf/tests/test_s3.py:150:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../envs/cudfdev/lib/python3.8/contextlib.py:113: in __enter__
return next(self.gen)
python/cudf/cudf/tests/test_s3.py:105: in s3_context
client.create_bucket(Bucket=bucket, ACL="public-read-write")
../envs/cudfdev/lib/python3.8/site-packages/botocore/client.py:395: in _api_call
return self._make_api_call(operation_name, kwargs)
../envs/cudfdev/lib/python3.8/site-packages/botocore/client.py:711: in _make_api_call
http, parsed_response = self._make_request(
../envs/cudfdev/lib/python3.8/site-packages/botocore/client.py:731: in _make_request
return self._endpoint.make_request(operation_model, request_dict)
../envs/cudfdev/lib/python3.8/site-packages/botocore/endpoint.py:107: in make_request
return self._send_request(request_dict, operation_model)
../envs/cudfdev/lib/python3.8/site-packages/botocore/endpoint.py:183: in _send_request
while self._needs_retry(attempts, operation_model, request_dict,
../envs/cudfdev/lib/python3.8/site-packages/botocore/endpoint.py:305: in _needs_retry
responses = self._event_emitter.emit(
../envs/cudfdev/lib/python3.8/site-packages/botocore/hooks.py:357: in emit
return self._emitter.emit(aliased_event_name, **kwargs)
../envs/cudfdev/lib/python3.8/site-packages/botocore/hooks.py:228: in emit
return self._emit(event_name, kwargs)
../envs/cudfdev/lib/python3.8/site-packages/botocore/hooks.py:211: in _emit
response = handler(**kwargs)
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:192: in __call__
if self._checker(**checker_kwargs):
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:265: in __call__
should_retry = self._should_retry(attempt_number, response,
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:292: in _should_retry
return self._checker(attempt_number, response, caught_exception)
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:331: in __call__
checker_response = checker(attempt_number, response,
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:231: in __call__
return self._check_caught_exception(
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:374: in _check_caught_exception
raise caught_exception
../envs/cudfdev/lib/python3.8/site-packages/botocore/endpoint.py:249: in _do_get_response
http_response = self._send(request)
../envs/cudfdev/lib/python3.8/site-packages/botocore/endpoint.py:321: in _send
return self.http_session.send(request)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <botocore.httpsession.URLLib3Session object at 0x7efe37194bb0>
request = <AWSPreparedRequest stream_output=False, method=PUT, url=http://127.0.0.1:47913/csv, headers={'x-amz-acl': b'public-re...nvocation-id': b'23e9f9cc-531f-4d50-ad98-8ca7ab3f4a60', 'amz-sdk-request': b'attempt=5; max=5', 'Content-Length': '0'}>
def send(self, request):
try:
proxy_url = self._proxy_config.proxy_url_for(request.url)
manager = self._get_connection_manager(request.url, proxy_url)
conn = manager.connection_from_url(request.url)
self._setup_ssl_cert(conn, request.url, self._verify)
if ensure_boolean(
os.environ.get('BOTO_EXPERIMENTAL__ADD_PROXY_HOST_HEADER', '')
):
# This is currently an "experimental" feature which provides
# no guarantees of backwards compatibility. It may be subject
# to change or removal in any patch version. Anyone opting in
# to this feature should strictly pin botocore.
host = urlparse(request.url).hostname
conn.proxy_headers['host'] = host
request_target = self._get_request_target(request.url, proxy_url)
urllib_response = conn.urlopen(
method=request.method,
url=request_target,
body=request.body,
headers=request.headers,
retries=Retry(False),
assert_same_host=False,
preload_content=False,
decode_content=False,
chunked=self._chunked(request.headers),
)
http_response = botocore.awsrequest.AWSResponse(
request.url,
urllib_response.status,
urllib_response.headers,
urllib_response,
)
if not request.stream_output:
# Cause the raw stream to be exhausted immediately. We do it
# this way instead of using preload_content because
# preload_content will never buffer chunked responses
http_response.content
return http_response
except URLLib3SSLError as e:
raise SSLError(endpoint_url=request.url, error=e)
except (NewConnectionError, socket.gaierror) as e:
> raise EndpointConnectionError(endpoint_url=request.url, error=e)
E botocore.exceptions.EndpointConnectionError: Could not connect to the endpoint URL: "http://127.0.0.1:47913/csv"
../envs/cudfdev/lib/python3.8/site-packages/botocore/httpsession.py:434: EndpointConnectionError
---------------------------------------------------- Captured stderr setup --------------------------------
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I've seen it close connections to the ports very consistently with the existing approach too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't test this before suggesting, but I'm not sure what's going wrong. I wonder if the context manager is yielding the port and immediately exiting the context / closing the resource? Might need to refresh myself on how pytest fixtures interact with context managers, or add some print statements to check what's happening. Alternatively, you can try this instead.
@pytest.fixture(scope="session") | |
def endpoint_port(): | |
# Return a free port per worker session. | |
sock = socket.socket() | |
sock.bind(("", 0)) | |
return sock.getsockname()[1] | |
@pytest.fixture(scope="session") | |
def endpoint_port(): | |
# Return a free port per worker session. | |
sock = socket.socket() | |
sock.bind(("127.0.0.1", 0)) | |
yield sock.getsockname()[1] | |
sock.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pytest.fixture(scope="session")
def endpoint_port():
# Return a free port per worker session.
sock = socket.socket()
sock.bind(("127.0.0.1", 0))
yield sock.getsockname()[1]
sock.close()
⬆️ This too gives the same error, but this works:
@pytest.fixture(scope="session")
def endpoint_port():
# Return a free port per worker session.
sock = socket.socket()
sock.bind(("127.0.0.1", 0))
port = sock.getsockname()[1]
sock.close()
return port
I've pushed this change, let me know if that looks good to you.
@pytest.fixture(scope="session") | ||
def endpoint_port(): | ||
# Return a free port per worker session. | ||
sock = socket.socket() | ||
sock.bind(("", 0)) | ||
return sock.getsockname()[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pytest.fixture(scope="session") | |
def endpoint_port(): | |
# Return a free port per worker session. | |
sock = socket.socket() | |
sock.bind(("", 0)) | |
return sock.getsockname()[1] | |
@pytest.fixture(scope="session") | |
def endpoint_port(): | |
# Return a free port per worker session. | |
with socket.socket() as sock: | |
sock.bind(("127.0.0.1", 0)) | |
yield sock.getsockname()[1] |
Co-authored-by: Bradley Dice <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a one suggestion related to variable naming that I'd like to see addressed before merging.
The other suggestions are related to the moto
server: avoiding repeated definitions of hosts/URIs, and avoiding subprocess
for the server (suggesting to use its Python API instead). Those moto
related suggestions can be handled in a follow-up PR.
5000 | ||
if worker_id == "master" | ||
else 5550 + int(worker_id.lstrip("gw")) | ||
) | ||
endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" | ||
|
||
proc = subprocess.Popen( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we're using moto
in Stand-alone server mode. Is it possible to use the Python API for this server instead of calling subprocess
? http://docs.getmoto.org/en/latest/docs/server_mode.html#start-within-python
I think it'd look like this:
from moto.server import ThreadedMotoServer
server = ThreadedMotoServer(ip_address="127.0.0.1", port=endpoint_port)
server.start()
yield endpoint_uri
# run tests
server.stop()
""" | ||
Returns s3 storage options to pass to fsspec | ||
""" | ||
endpoint_port = ( | ||
5000 if worker_id == "master" else 5550 + int(worker_id.lstrip("gw")) | ||
) | ||
endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're hard-coding the endpoint URI in a few locations in this file. Can we re-use the host/port/endpoint values instead? (Possibly with a fixture.)
def endpoint_port(): | ||
# Return a free port per worker session. | ||
sock = socket.socket() | ||
sock.bind(("127.0.0.1", 0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here - would be nice to define 127.0.0.1
once and re-use that value and the endpoint URI.
sock = socket.socket() | ||
sock.bind(("127.0.0.1", 0)) | ||
port = sock.getsockname()[1] | ||
sock.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, is this just to get a port number of an open port? If so, I was misunderstanding how this worked earlier. The context manager I suggested was probably failing because the socket was still held by sock
when you needed to close it and give up that port number to the moto
server. If that is correct, you aren't supposed to hold the socket resource with a context manager / yield
fixture. Please use this approach and ensure that sock
is closed before returning the port number.
Thanks @bdice, I've resolved |
@gpucibot merge |
…server (#10822) This is a follow-up PR to address review comments from here: #10769 (review) This PR: - [x] Uses `ThreadedMotoServer` instead of using `subprocess.open` to create a new server, this way it is guaranteed to close the server upon exit. - [x] Add's IP address fixture instead of having it hard-coded at multiple places. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Bradley Dice (https://github.com/bdice) - Charles Blackmon-Luca (https://github.com/charlesbluca) URL: #10822
Resolves: #10522
This PR:
s3
writing support inParquetDatasetWriter
s3
directory incudf.read_parquet
. Issue here: https://issues.apache.org/jira/browse/ARROW-16438s3
python library combinations that will work together with such thattest_s3.py
can be run locally on dev environments.s3fs
error logs by changing the log level toDEBUG
in pytests.(S3FS_LOGGING_LEVEL
)