Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Enable writing to s3 storage in chunked parquet writer #10769

Merged
merged 18 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conda/environments/cudf_dev_cuda11.5.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ dependencies:
- pydata-sphinx-theme
- librdkafka=1.7.0
- python-confluent-kafka=1.7.0
- moto>=3.1.6
- boto3>=1.21.21
- botocore>=1.24.21
- aiobotocore>=2.2.0
- s3fs>=2022.3.0
- pip:
- git+https://github.com/python-streamz/streamz.git@master
- pyorc
Expand Down
4 changes: 4 additions & 0 deletions docs/cudf/source/api_docs/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ Parquet
read_parquet
DataFrame.to_parquet
cudf.io.parquet.read_parquet_metadata
:template: autosummary/class_with_autosummary.rst

cudf.io.parquet.ParquetDatasetWriter


ORC
~~~
Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.
from cudf.io.avro import read_avro
from cudf.io.csv import read_csv, to_csv
from cudf.io.dlpack import from_dlpack
Expand All @@ -9,6 +9,7 @@
from cudf.io.parquet import (
merge_parquet_filemetadata,
read_parquet,
ParquetDatasetWriter,
read_parquet_metadata,
write_to_dataset,
)
Expand Down
168 changes: 96 additions & 72 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.

import math
import shutil
import tempfile
import warnings
from collections import defaultdict
from contextlib import ExitStack
Expand Down Expand Up @@ -232,12 +234,15 @@ def _process_dataset(
filters = pq._filters_to_expression(filters)

# Initialize ds.FilesystemDataset
# TODO: Remove the if len(paths) workaround after following bug is fixed:
# https://issues.apache.org/jira/browse/ARROW-16438
dataset = ds.dataset(
paths,
source=paths[0] if len(paths) == 1 else paths,
filesystem=fs,
format="parquet",
partitioning="hive",
)

file_list = dataset.files
if len(file_list) == 0:
raise FileNotFoundError(f"{paths} could not be resolved to any files")
Expand Down Expand Up @@ -837,6 +842,67 @@ def _parse_bytes(s):


class ParquetDatasetWriter:
"""
Write a parquet file or dataset incrementally

Parameters
----------
path : str
A local directory path or S3 URL. Will be used as root directory
path while writing a partitioned dataset.
partition_cols : list
Column names by which to partition the dataset
Columns are partitioned in the order they are given
index : bool, default None
If ``True``, include the dataframe's index(es) in the file output.
If ``False``, they will not be written to the file. If ``None``,
index(es) other than RangeIndex will be saved as columns.
compression : {'snappy', None}, default 'snappy'
Name of the compression to use. Use ``None`` for no compression.
statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP'
Level at which column statistics should be included in file.
Comment on lines +860 to +863
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does compression accept None while statistics accepts 'NONE'? Is this a repeated pattern across cuDF? Can we adopt None instead of string 'NONE' values universally?

Copy link
Contributor Author

@galipremsagar galipremsagar May 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I'll open a separate PR to standardize this.

max_file_size : int or str, default None
A file size that cannot be exceeded by the writer.
It is in bytes, if the input is int.
Size can also be a str in form or "10 MB", "1 GB", etc.
If this parameter is used, it is mandatory to pass
`file_name_prefix`.
file_name_prefix : str
This is a prefix to file names generated only when
`max_file_size` is specified.


Examples
--------
Using a context

>>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]})
>>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]})
>>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw:
... cw.write_table(df1)
... cw.write_table(df2)

By manually calling ``close()``

>>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"])
>>> cw.write_table(df1)
>>> cw.write_table(df2)
>>> cw.close()

Both the methods will generate the same directory structure

.. code-block:: none

dataset/
a=1
<filename>.parquet
a=2
<filename>.parquet
a=3
<filename>.parquet

"""

@_cudf_nvtx_annotate
def __init__(
self,
Expand All @@ -847,68 +913,15 @@ def __init__(
statistics="ROWGROUP",
max_file_size=None,
file_name_prefix=None,
**kwargs,
) -> None:
"""
Write a parquet file or dataset incrementally

Parameters
----------
path : str
File path or Root Directory path. Will be used as Root Directory
path while writing a partitioned dataset.
partition_cols : list
Column names by which to partition the dataset
Columns are partitioned in the order they are given
index : bool, default None
If ``True``, include the dataframe’s index(es) in the file output.
If ``False``, they will not be written to the file. If ``None``,
index(es) other than RangeIndex will be saved as columns.
compression : {'snappy', None}, default 'snappy'
Name of the compression to use. Use ``None`` for no compression.
statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP'
Level at which column statistics should be included in file.
max_file_size : int or str, default None
A file size that cannot be exceeded by the writer.
It is in bytes, if the input is int.
Size can also be a str in form or "10 MB", "1 GB", etc.
If this parameter is used, it is mandatory to pass
`file_name_prefix`.
file_name_prefix : str
This is a prefix to file names generated only when
`max_file_size` is specified.


Examples
________
Using a context

>>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]})
>>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]})
>>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw:
... cw.write_table(df1)
... cw.write_table(df2)

By manually calling ``close()``

>>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"])
>>> cw.write_table(df1)
>>> cw.write_table(df2)
>>> cw.close()

Both the methods will generate the same directory structure

.. code-block:: bash

dataset/
a=1
<filename>.parquet
a=2
<filename>.parquet
a=3
<filename>.parquet
if isinstance(path, str) and path.startswith("s3://"):
self.fs_meta = {"is_s3": True, "actual_path": path}
self.path = tempfile.TemporaryDirectory().name
else:
self.fs_meta = {}
self.path = path
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved

"""
self.path = path
self.common_args = {
"index": index,
"compression": compression,
Expand All @@ -923,6 +936,7 @@ def __init__(
# Map of partition_col values to their ParquetWriter's index
# in self._chunked_writers for reverse lookup
self.path_cw_map: Dict[str, int] = {}
self.kwargs = kwargs
self.filename = file_name_prefix
self.max_file_size = max_file_size
if max_file_size is not None:
Expand Down Expand Up @@ -1051,18 +1065,19 @@ def write_table(self, df):
]
cw.write_table(grouped_df, this_cw_part_info)

# Create new cw for unhandled paths encountered in this write_table
new_paths, part_info, meta_paths = zip(*new_cw_paths)
self._chunked_writers.append(
(
ParquetWriter(new_paths, **self.common_args),
new_paths,
meta_paths,
if new_cw_paths:
# Create new cw for unhandled paths encountered in this write_table
new_paths, part_info, meta_paths = zip(*new_cw_paths)
self._chunked_writers.append(
(
ParquetWriter(new_paths, **self.common_args),
new_paths,
meta_paths,
)
Comment on lines -1054 to +1076
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be possible fto wrap theseParquetWriter objects in a file-opeing shim? This way, for non-local storaage, we could open the necessarry paths with fsspec before passing the open "file-like" objects down to ParquetWriter. The shim would need to keep track of the open files and close them after the underlying ParquetWriter object is closed.

If I understand correctly (which I may not), we should be able to write directly to s3 without the file-copy workaround.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was my initial goto-approach(same as df.to_parquet) and after implementing it I ran into issues. Upon investigation, the way ParquetWriter is written right now I realized that writing to non-local storage won't fully create and flush the data..even when we manually flush the data after writing a table, only a file is created with 0 bytes and no data is written...for the formation of a correct parquet file with the same file connection open if I try to write the parquet metadata in the footer during the close operation the libraries end up overwriting the existing file in s3 storage and thus losing all the past data and only writing the footer metadata. After repeated tries of different combinations of (write table, flush, write metadata, flush, close) and no one happening successfully I came to a conclusion that the way s3(or other cloud file systems) are designed seems to be such that they act as object stores rather than true file-systems where they allow append operations to the contents of existing files. At least for s3 this is the limitation while ParquetWriter is basically built around the concept of appending.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay - I still have a suspicion that we can make this work, but I do think the approach you are using here is reasonable - So, I can experiment and follow up separately. My question here is definitely not a blocker :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will be happy to chat about the findings.

)
)
new_cw_idx = len(self._chunked_writers) - 1
self.path_cw_map.update({k: new_cw_idx for k in new_paths})
self._chunked_writers[-1][0].write_table(grouped_df, part_info)
new_cw_idx = len(self._chunked_writers) - 1
self.path_cw_map.update({k: new_cw_idx for k in new_paths})
self._chunked_writers[-1][0].write_table(grouped_df, part_info)

@_cudf_nvtx_annotate
def close(self, return_metadata=False):
Expand All @@ -1076,6 +1091,15 @@ def close(self, return_metadata=False):
for cw, _, meta_path in self._chunked_writers
]

if self.fs_meta.get("is_s3", False):
local_path = self.path
s3_path = self.fs_meta["actual_path"]
s3_file, _ = ioutils._get_filesystem_and_paths(
s3_path, **self.kwargs
)
s3_file.put(local_path, s3_path, recursive=True)
shutil.rmtree(self.path)

if return_metadata:
return (
merge_parquet_filemetadata(metadata)
Expand Down
Loading