Skip to content

Commit

Permalink
Enable writing to s3 storage in chunked parquet writer (#10769)
Browse files Browse the repository at this point in the history
Resolves: #10522
This PR:

- [x] Enables `s3` writing support in `ParquetDatasetWriter`
- [x] Add's a work-around to reading an `s3` directory in `cudf.read_parquet`. Issue here: https://issues.apache.org/jira/browse/ARROW-16438
- [x] Introduces all the required `s3` python library combinations that will work together with such that `test_s3.py` can be run locally on dev environments.
- [x] Improved the default `s3fs` error logs by changing the log level to `DEBUG` in pytests.(`S3FS_LOGGING_LEVEL`)

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - AJ Schmidt (https://github.com/ajschmidt8)
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Ayush Dattagupta (https://github.com/ayushdg)
  - Bradley Dice (https://github.com/bdice)

URL: #10769
  • Loading branch information
galipremsagar authored May 10, 2022
1 parent ee8cd59 commit 0fcd364
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 150 deletions.
5 changes: 5 additions & 0 deletions conda/environments/cudf_dev_cuda11.5.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ dependencies:
- pydata-sphinx-theme
- librdkafka=1.7.0
- python-confluent-kafka=1.7.0
- moto>=3.1.6
- boto3>=1.21.21
- botocore>=1.24.21
- aiobotocore>=2.2.0
- s3fs>=2022.3.0
- pip:
- git+https://github.com/python-streamz/streamz.git@master
- pyorc
Expand Down
4 changes: 4 additions & 0 deletions docs/cudf/source/api_docs/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ Parquet
read_parquet
DataFrame.to_parquet
cudf.io.parquet.read_parquet_metadata
:template: autosummary/class_with_autosummary.rst

cudf.io.parquet.ParquetDatasetWriter


ORC
~~~
Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.
from cudf.io.avro import read_avro
from cudf.io.csv import read_csv, to_csv
from cudf.io.dlpack import from_dlpack
Expand All @@ -9,6 +9,7 @@
from cudf.io.parquet import (
merge_parquet_filemetadata,
read_parquet,
ParquetDatasetWriter,
read_parquet_metadata,
write_to_dataset,
)
Expand Down
168 changes: 96 additions & 72 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.

import math
import shutil
import tempfile
import warnings
from collections import defaultdict
from contextlib import ExitStack
Expand Down Expand Up @@ -232,12 +234,15 @@ def _process_dataset(
filters = pq._filters_to_expression(filters)

# Initialize ds.FilesystemDataset
# TODO: Remove the if len(paths) workaround after following bug is fixed:
# https://issues.apache.org/jira/browse/ARROW-16438
dataset = ds.dataset(
paths,
source=paths[0] if len(paths) == 1 else paths,
filesystem=fs,
format="parquet",
partitioning="hive",
)

file_list = dataset.files
if len(file_list) == 0:
raise FileNotFoundError(f"{paths} could not be resolved to any files")
Expand Down Expand Up @@ -837,6 +842,67 @@ def _parse_bytes(s):


class ParquetDatasetWriter:
"""
Write a parquet file or dataset incrementally
Parameters
----------
path : str
A local directory path or S3 URL. Will be used as root directory
path while writing a partitioned dataset.
partition_cols : list
Column names by which to partition the dataset
Columns are partitioned in the order they are given
index : bool, default None
If ``True``, include the dataframe's index(es) in the file output.
If ``False``, they will not be written to the file. If ``None``,
index(es) other than RangeIndex will be saved as columns.
compression : {'snappy', None}, default 'snappy'
Name of the compression to use. Use ``None`` for no compression.
statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP'
Level at which column statistics should be included in file.
max_file_size : int or str, default None
A file size that cannot be exceeded by the writer.
It is in bytes, if the input is int.
Size can also be a str in form or "10 MB", "1 GB", etc.
If this parameter is used, it is mandatory to pass
`file_name_prefix`.
file_name_prefix : str
This is a prefix to file names generated only when
`max_file_size` is specified.
Examples
--------
Using a context
>>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]})
>>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]})
>>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw:
... cw.write_table(df1)
... cw.write_table(df2)
By manually calling ``close()``
>>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"])
>>> cw.write_table(df1)
>>> cw.write_table(df2)
>>> cw.close()
Both the methods will generate the same directory structure
.. code-block:: none
dataset/
a=1
<filename>.parquet
a=2
<filename>.parquet
a=3
<filename>.parquet
"""

@_cudf_nvtx_annotate
def __init__(
self,
Expand All @@ -847,68 +913,15 @@ def __init__(
statistics="ROWGROUP",
max_file_size=None,
file_name_prefix=None,
**kwargs,
) -> None:
"""
Write a parquet file or dataset incrementally
Parameters
----------
path : str
File path or Root Directory path. Will be used as Root Directory
path while writing a partitioned dataset.
partition_cols : list
Column names by which to partition the dataset
Columns are partitioned in the order they are given
index : bool, default None
If ``True``, include the dataframe’s index(es) in the file output.
If ``False``, they will not be written to the file. If ``None``,
index(es) other than RangeIndex will be saved as columns.
compression : {'snappy', None}, default 'snappy'
Name of the compression to use. Use ``None`` for no compression.
statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP'
Level at which column statistics should be included in file.
max_file_size : int or str, default None
A file size that cannot be exceeded by the writer.
It is in bytes, if the input is int.
Size can also be a str in form or "10 MB", "1 GB", etc.
If this parameter is used, it is mandatory to pass
`file_name_prefix`.
file_name_prefix : str
This is a prefix to file names generated only when
`max_file_size` is specified.
Examples
________
Using a context
>>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]})
>>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]})
>>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw:
... cw.write_table(df1)
... cw.write_table(df2)
By manually calling ``close()``
>>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"])
>>> cw.write_table(df1)
>>> cw.write_table(df2)
>>> cw.close()
Both the methods will generate the same directory structure
.. code-block:: bash
dataset/
a=1
<filename>.parquet
a=2
<filename>.parquet
a=3
<filename>.parquet
if isinstance(path, str) and path.startswith("s3://"):
self.fs_meta = {"is_s3": True, "actual_path": path}
self.path = tempfile.TemporaryDirectory().name
else:
self.fs_meta = {}
self.path = path

"""
self.path = path
self.common_args = {
"index": index,
"compression": compression,
Expand All @@ -923,6 +936,7 @@ def __init__(
# Map of partition_col values to their ParquetWriter's index
# in self._chunked_writers for reverse lookup
self.path_cw_map: Dict[str, int] = {}
self.kwargs = kwargs
self.filename = file_name_prefix
self.max_file_size = max_file_size
if max_file_size is not None:
Expand Down Expand Up @@ -1051,18 +1065,19 @@ def write_table(self, df):
]
cw.write_table(grouped_df, this_cw_part_info)

# Create new cw for unhandled paths encountered in this write_table
new_paths, part_info, meta_paths = zip(*new_cw_paths)
self._chunked_writers.append(
(
ParquetWriter(new_paths, **self.common_args),
new_paths,
meta_paths,
if new_cw_paths:
# Create new cw for unhandled paths encountered in this write_table
new_paths, part_info, meta_paths = zip(*new_cw_paths)
self._chunked_writers.append(
(
ParquetWriter(new_paths, **self.common_args),
new_paths,
meta_paths,
)
)
)
new_cw_idx = len(self._chunked_writers) - 1
self.path_cw_map.update({k: new_cw_idx for k in new_paths})
self._chunked_writers[-1][0].write_table(grouped_df, part_info)
new_cw_idx = len(self._chunked_writers) - 1
self.path_cw_map.update({k: new_cw_idx for k in new_paths})
self._chunked_writers[-1][0].write_table(grouped_df, part_info)

@_cudf_nvtx_annotate
def close(self, return_metadata=False):
Expand All @@ -1076,6 +1091,15 @@ def close(self, return_metadata=False):
for cw, _, meta_path in self._chunked_writers
]

if self.fs_meta.get("is_s3", False):
local_path = self.path
s3_path = self.fs_meta["actual_path"]
s3_file, _ = ioutils._get_filesystem_and_paths(
s3_path, **self.kwargs
)
s3_file.put(local_path, s3_path, recursive=True)
shutil.rmtree(self.path)

if return_metadata:
return (
merge_parquet_filemetadata(metadata)
Expand Down
Loading

0 comments on commit 0fcd364

Please sign in to comment.