Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Support S3 writes in chunked writer - ParquetDatasetWriter #10522

Closed
sauravdev opened this issue Mar 28, 2022 · 2 comments · Fixed by #10769
Closed

[FEA] Support S3 writes in chunked writer - ParquetDatasetWriter #10522

sauravdev opened this issue Mar 28, 2022 · 2 comments · Fixed by #10769
Assignees
Labels
cuIO cuIO issue feature request New feature or request Python Affects Python cuDF API.

Comments

@sauravdev
Copy link

Parquet writes to external storage such as s3 should be possible using ParquetDatasetWriter, right now it errors out.

@sauravdev sauravdev added Needs Triage Need team to review and classify feature request New feature or request labels Mar 28, 2022
@GregoryKimball
Copy link
Contributor

Thanks @sauravdev for sharing this use case. Would you please post a code sample and the error message, if any?

@GregoryKimball GregoryKimball added the cuIO cuIO issue label Mar 28, 2022
@sauravdev
Copy link
Author

Sure, below is sample code

stream = Stream.from_kafka_batched(
    kafka_input_topic, consumer_conf,asynchronous=True,
    poll_interval="60s", max_batch_size=90000,
    engine="cudf", dask=True
) 

from cudf.io.parquet import ParquetDatasetWriter

def cudf_passthrough_to_parquet_chunk_test(gdf):
    batch_process_start_time = time.time()
    size = len(gdf)
    with ParquetDatasetWriter("s3://samplepath/", partition_cols=["samplecol"], index=False) as cw:
        cw.write_table(gdf)
    batch_process_finish_time = time.time()
    return (batch_process_start_time, batch_process_finish_time, size)

output = stream.map(cudf_passthrough_to_parquet_chunk_test).gather().sink_to_list()
stream.start()
output

and below is error:
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f27e7e25e50>>, <Future finished exception=RuntimeError('cuDF failure at: ../src/io/utilities/data_sink.cpp:36: Cannot open output file')>)

@galipremsagar galipremsagar self-assigned this Apr 12, 2022
@galipremsagar galipremsagar added Python Affects Python cuDF API. and removed Needs Triage Need team to review and classify labels Apr 27, 2022
rapids-bot bot pushed a commit that referenced this issue May 10, 2022
Resolves: #10522
This PR:

- [x] Enables `s3` writing support in `ParquetDatasetWriter`
- [x] Add's a work-around to reading an `s3` directory in `cudf.read_parquet`. Issue here: https://issues.apache.org/jira/browse/ARROW-16438
- [x] Introduces all the required `s3` python library combinations that will work together with such that `test_s3.py` can be run locally on dev environments.
- [x] Improved the default `s3fs` error logs by changing the log level to `DEBUG` in pytests.(`S3FS_LOGGING_LEVEL`)

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - AJ Schmidt (https://github.com/ajschmidt8)
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Ayush Dattagupta (https://github.com/ayushdg)
  - Bradley Dice (https://github.com/bdice)

URL: #10769
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cuIO cuIO issue feature request New feature or request Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants