Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioning support to Parquet chunked writer #10000

Merged
Merged
Show file tree
Hide file tree
Changes from 77 commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
7ca6570
First working version of partitioned write
devavret Oct 22, 2021
80e03a4
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Nov 22, 2021
21dc54b
multiple sink API
devavret Nov 23, 2021
d947abd
partitions in write parquet API
devavret Nov 23, 2021
360bf87
Fix a bug in frag causing incorrect num rows
devavret Nov 24, 2021
942dd58
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Nov 24, 2021
d454507
Dict encoding changes. Dict kernels now use frags
devavret Nov 25, 2021
b2b44a6
API cleanups
devavret Nov 25, 2021
0b6d33f
Add a gtest and fix other tests by handling no partition case
devavret Nov 26, 2021
2beed73
Add a guard to protect from an exception being thrown in impl dtor wh…
devavret Nov 26, 2021
4e21e99
Add per-sink user_data in table_input_metadata
devavret Nov 29, 2021
e0d1f33
Cleanups in dict code and replace index translating while LIST loop w…
devavret Nov 30, 2021
54de724
fix the returned metadata blob on close
devavret Dec 1, 2021
aa45827
Revert to using meta ctor without user_data in pyx
devavret Dec 1, 2021
06b2643
Remove num_rows param and docs cleanup
devavret Dec 1, 2021
fffb41e
orc use table meta ctor with single user_data
devavret Dec 1, 2021
ecd3aa5
Small size_type cleanups
devavret Dec 1, 2021
950f505
Misc cleanups
devavret Dec 2, 2021
019ac25
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Dec 7, 2021
1d55d1a
API changes
devavret Dec 7, 2021
387c2ac
Take user_data out of table_input_metadata
devavret Dec 8, 2021
9a77f5e
python changes for moving user_data
devavret Dec 8, 2021
dc157e1
Add checks for sizes of options in case of multiple sinks
devavret Dec 8, 2021
79639ee
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Dec 8, 2021
8c2927d
bug in tests in init list for kv meta
devavret Dec 8, 2021
200d1b0
Prevent setting chunk paths if not specified
devavret Dec 9, 2021
d0de9a9
Make returned metadata blob optional
devavret Dec 9, 2021
d90245f
Make sink info members private
devavret Dec 9, 2021
5a17a4c
Revert "Make returned metadata blob optional"
devavret Dec 9, 2021
2e1c359
make source data members private
devavret Dec 10, 2021
f44a50b
Refactor aggregate_metadata
devavret Dec 10, 2021
be8c19a
revert tests that were changed for debugging
devavret Dec 10, 2021
b9b5c15
Add empty df tests, make review changes
devavret Dec 10, 2021
85ce42f
Initial somewhat working python bindings for parquet partitioned writing
devavret Dec 10, 2021
1e79453
Review changes: reduce line size by aliasing the variable I keep using
devavret Dec 13, 2021
be83945
source/sink_info memeber privatisation
devavret Dec 13, 2021
e537314
aggregate metadata privatisation
devavret Dec 13, 2021
91580b4
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Dec 13, 2021
c53fea7
Fix a private member access
devavret Dec 13, 2021
0dfcb89
Merge branch 'parq-partitioned-write' into py-parq-partitioned-write
devavret Dec 13, 2021
085442c
Working for IOBase
devavret Dec 13, 2021
099093d
Started to use groupby instead of special partition logic
devavret Dec 13, 2021
2ffec7e
Avoid multiple d->h iloc using groupby keys
devavret Dec 14, 2021
975207e
Merge branch 'branch-22.02' into py-parq-partitioned-write
devavret Dec 14, 2021
5dae74d
Index fixes
devavret Dec 15, 2021
120f32e
Re-enabling non-partitioned case
devavret Dec 15, 2021
39ec9a0
remember to return metadata. remove temp test
devavret Dec 15, 2021
43d35cb
actually drop index when asked to. now mimics the pre-refactor behaviour
devavret Dec 15, 2021
7d7444a
Fix flaky test
devavret Dec 16, 2021
e66d855
Metadata file paths should be a list
devavret Dec 17, 2021
2442be8
Make all sinks work again
devavret Dec 17, 2021
48354e9
Consolidate common args and save some lines
devavret Dec 17, 2021
72b1e81
write_to_dataset cleanups:
devavret Dec 18, 2021
e84fc1c
Write_to_dataset calls back to_parquet.
devavret Dec 20, 2021
96139d4
Passthrough other args with partition_cols instead of ignoring
devavret Dec 20, 2021
2ec7d29
Merge branch 'branch-22.02' into py-parq-partitioned-write
devavret Dec 20, 2021
f6d4175
Fix style issues in python/cython
devavret Dec 21, 2021
833bc88
allowing cython ParquetWriter to call write with partitions
devavret Dec 29, 2021
15557b7
First working poc of ParquetWriter class that can dynamically handle …
devavret Dec 29, 2021
879294c
code cleanups to be more pythonic
devavret Dec 29, 2021
2bcf62c
enable regular old non-partitioned writing
devavret Dec 29, 2021
cf13e81
Enable index arg and passthrough compression and statistics
devavret Dec 30, 2021
7e57eec
ability to return metadata
devavret Dec 30, 2021
48fb111
Skip opening file which is not required for remote
devavret Jan 5, 2022
0513d54
review changes
devavret Jan 6, 2022
eab23d0
Merge branch 'py-parq-partitioned-write' into chunked-partitioned-par…
devavret Jan 7, 2022
5b00973
Review changes
devavret Jan 7, 2022
b47d38f
Merge branch 'py-parq-partitioned-write' into chunked-partitioned-par…
devavret Jan 7, 2022
9d61a77
Docs, remove custom fs support
devavret Jan 10, 2022
ddb95a3
Merge branch 'branch-22.02' into chunked-partitioned-parq-write
devavret Jan 10, 2022
912301f
remove redundant partitioning logic in write_to_dataset
devavret Jan 10, 2022
bb30a09
fix cython style
devavret Jan 10, 2022
18c3524
Add pytest
devavret Jan 10, 2022
cd698ed
mypy fixes
devavret Jan 11, 2022
b717373
More mypy fix
devavret Jan 11, 2022
7278f4f
Review fixes
devavret Jan 11, 2022
4b0efe5
1 more review change
devavret Jan 11, 2022
ef69e62
Add returned meta test
devavret Jan 12, 2022
fc4a6df
mypy fix
devavret Jan 12, 2022
e1d608a
Another style fix
devavret Jan 12, 2022
64aae8d
Merge branch 'branch-22.02' into chunked-partitioned-parq-write
devavret Jan 12, 2022
70612e6
Remove destructor in favour of contextlib
devavret Jan 12, 2022
6552fbe
Review changes
devavret Jan 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
parquet_chunked_writer& write(
cudf_table_view.table_view table_,
) except+
parquet_chunked_writer& write(
const cudf_table_view.table_view& table_,
const vector[cudf_io_types.partition_info]& partitions,
) except+
unique_ptr[vector[uint8_t]] close(
vector[string] column_chunks_file_paths,
) except+
Expand Down
47 changes: 36 additions & 11 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -411,23 +411,31 @@ cdef class ParquetWriter:
cdef unique_ptr[cpp_parquet_chunked_writer] writer
cdef unique_ptr[table_input_metadata] tbl_meta
cdef cudf_io_types.sink_info sink
cdef unique_ptr[cudf_io_types.data_sink] _data_sink
cdef vector[unique_ptr[cudf_io_types.data_sink]] _data_sink
cdef cudf_io_types.statistics_freq stat_freq
cdef cudf_io_types.compression_type comp_type
cdef object index

def __cinit__(self, object path, object index=None,
def __cinit__(self, object filepaths_or_buffers, object index=None,
object compression=None, str statistics="ROWGROUP"):
self.sink = make_sink_info(path, self._data_sink)
filepaths_or_buffers = (
list(filepaths_or_buffers)
if is_list_like(filepaths_or_buffers)
else [filepaths_or_buffers]
)
self.sink = make_sinks_info(filepaths_or_buffers, self._data_sink)
self.stat_freq = _get_stat_freq(statistics)
self.comp_type = _get_comp_type(compression)
self.index = index
self.initialized = False

def write_table(self, table):
def write_table(self, table, object partitions_info=None):
""" Writes a single table to the file """
if not self.initialized:
self._initialize_chunked_state(table)
self._initialize_chunked_state(
table,
num_partitions=len(partitions_info) if partitions_info else 1
)

cdef table_view tv
if self.index is not False and (
Expand All @@ -437,8 +445,15 @@ cdef class ParquetWriter:
else:
tv = table_view_from_table(table, ignore_index=True)

cdef vector[cudf_io_types.partition_info] partitions
if partitions_info is not None:
for part in partitions_info:
partitions.push_back(
cudf_io_types.partition_info(part[0], part[1])
)

with nogil:
self.writer.get()[0].write(tv)
self.writer.get()[0].write(tv, partitions)

def close(self, object metadata_file_path=None):
cdef unique_ptr[vector[uint8_t]] out_metadata_c
Expand All @@ -449,7 +464,13 @@ cdef class ParquetWriter:

# Update metadata-collection options
if metadata_file_path is not None:
column_chunks_file_paths.push_back(str.encode(metadata_file_path))
if is_list_like(metadata_file_path):
for path in metadata_file_path:
column_chunks_file_paths.push_back(str.encode(path))
else:
column_chunks_file_paths.push_back(
str.encode(metadata_file_path)
)

with nogil:
out_metadata_c = move(
Expand All @@ -466,7 +487,7 @@ cdef class ParquetWriter:
def __dealloc__(self):
self.close()

def _initialize_chunked_state(self, table):
def _initialize_chunked_state(self, table, num_partitions=1):
""" Prepares all the values required to build the
chunked_parquet_writer_options and creates a writer"""
cdef table_view tv
Expand Down Expand Up @@ -499,10 +520,14 @@ cdef class ParquetWriter:
table[name]._column, self.tbl_meta.get().column_metadata[i]
)

pandas_metadata = generate_pandas_metadata(table, self.index)
index = (
False if isinstance(table._index, cudf.RangeIndex) else self.index
)
pandas_metadata = generate_pandas_metadata(table, index)
cdef map[string, string] tmp_user_data
tmp_user_data[str.encode("pandas")] = str.encode(pandas_metadata)
cdef vector[map[string, string]] user_data
user_data.resize(1)
user_data.back()[str.encode("pandas")] = str.encode(pandas_metadata)
user_data = vector[map[string, string]](num_partitions, tmp_user_data)
vyasr marked this conversation as resolved.
Show resolved Hide resolved

cdef chunked_parquet_writer_options args
with nogil:
Expand Down
213 changes: 186 additions & 27 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import warnings
from collections import defaultdict
from contextlib import ExitStack
from typing import Dict, List, Tuple
from uuid import uuid4

import fsspec
Expand Down Expand Up @@ -126,32 +127,21 @@ def write_to_dataset(

if partition_cols is not None and len(partition_cols) > 0:

data_cols = df.columns.drop(partition_cols)
if len(data_cols) == 0:
raise ValueError("No data left to save outside partition columns")

part_names, part_offsets, _, grouped_df = df.groupby(
partition_cols
)._grouped()
if not preserve_index:
grouped_df.reset_index(drop=True, inplace=True)
grouped_df.drop(columns=partition_cols, inplace=True)
# Copy the entire keys df in one operation rather than using iloc
part_names = part_names.to_pandas().to_frame(index=False)

full_paths = []
metadata_file_paths = []
for keys in part_names.itertuples(index=False):
subdir = fs.sep.join(
[f"{name}={val}" for name, val in zip(partition_cols, keys)]
)
prefix = fs.sep.join([root_path, subdir])
fs.mkdirs(prefix, exist_ok=True)
filename = filename or uuid4().hex + ".parquet"
full_path = fs.sep.join([prefix, filename])
full_paths.append(full_path)
if return_metadata:
metadata_file_paths.append(fs.sep.join([subdir, filename]))
(
full_paths,
metadata_file_paths,
grouped_df,
part_offsets,
_,
) = _get_partitioned(
df,
root_path,
partition_cols,
filename,
fs,
preserve_index,
**kwargs,
)

if return_metadata:
kwargs["metadata_file_path"] = metadata_file_paths
Expand All @@ -164,7 +154,7 @@ def write_to_dataset(
)

else:
filename = filename or uuid4().hex + ".parquet"
filename = filename or (uuid4().hex + ".parquet")
vyasr marked this conversation as resolved.
Show resolved Hide resolved
full_path = fs.sep.join([root_path, filename])
if return_metadata:
kwargs["metadata_file_path"] = filename
Expand Down Expand Up @@ -790,9 +780,178 @@ def merge_parquet_filemetadata(filemetadata_list):
return libparquet.merge_filemetadata(filemetadata_list)


def _get_partitioned(
df,
root_path,
partition_cols,
filename=None,
fs=None,
preserve_index=False,
**kwargs,
):
fs = ioutils._ensure_filesystem(fs, root_path, **kwargs)
fs.mkdirs(root_path, exist_ok=True)
if not (set(df._data) - set(partition_cols)):
raise ValueError("No data left to save outside partition columns")

part_names, part_offsets, _, grouped_df = df.groupby(
partition_cols
)._grouped()
if not preserve_index:
grouped_df.reset_index(drop=True, inplace=True)
grouped_df.drop(columns=partition_cols, inplace=True)
# Copy the entire keys df in one operation rather than using iloc
part_names = part_names.to_pandas().to_frame(index=False)

full_paths = []
metadata_file_paths = []
for keys in part_names.itertuples(index=False):
subdir = fs.sep.join(
[f"{name}={val}" for name, val in zip(partition_cols, keys)]
)
prefix = fs.sep.join([root_path, subdir])
fs.mkdirs(prefix, exist_ok=True)
filename = filename or (uuid4().hex + ".parquet")
full_path = fs.sep.join([prefix, filename])
full_paths.append(full_path)
metadata_file_paths.append(fs.sep.join([subdir, filename]))

return full_paths, metadata_file_paths, grouped_df, part_offsets, filename


ParquetWriter = libparquet.ParquetWriter


class ParquetDatasetWriter:
def __init__(
self,
path,
partition_cols,
index=None,
compression=None,
statistics="ROWGROUP",
) -> None:
"""
Write a parquet file or dataset incrementally

Parameters
----------
path : str
File path or Root Directory path. Will be used as Root Directory
path while writing a partitioned dataset.
partition_cols : list
Column names by which to partition the dataset
Columns are partitioned in the order they are given
index : bool, default None
If ``True``, include the dataframe’s index(es) in the file output.
If ``False``, they will not be written to the file. If ``None``,
index(es) other than RangeIndex will be saved as columns.
compression : {'snappy', None}, default 'snappy'
Name of the compression to use. Use ``None`` for no compression.
statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP'
Level at which column statistics should be included in file.
"""
self.path = path
self.common_args = {
"index": index,
"compression": compression,
"statistics": statistics,
}
self.partition_cols = partition_cols
# Collection of `ParquetWriter`s, and the corresponding
# partition_col values they're responsible for
self._chunked_writers: List[Tuple[ParquetWriter, List[str], str]] = []
# Map of partition_col values to their ParquetWriter's index
# in self._chunked_writers for reverse lookup
self.path_cw_map: Dict[str, int] = {}
self.filename = None

def write_table(self, df):
"""
Write a dataframe to the file/dataset
"""
(
paths,
metadata_file_paths,
grouped_df,
offsets,
self.filename,
) = _get_partitioned(
df,
self.path,
self.partition_cols,
preserve_index=self.common_args["index"],
filename=self.filename,
)

existing_cw_batch = defaultdict(dict)
new_cw_paths = []

def pairwise(iterable):
devavret marked this conversation as resolved.
Show resolved Hide resolved
"""
Generates a pair of `(it[i], it[i + 1] - it[i])` from iterable `it`
vyasr marked this conversation as resolved.
Show resolved Hide resolved
"""
it = iter(iterable)
a = next(it, None)
for b in it:
yield (a, b - a)
a = b

for path, part_info, meta_path in zip(
paths, pairwise(offsets), metadata_file_paths
):
if path in self.path_cw_map: # path is a currently open file
cw_idx = self.path_cw_map[path]
existing_cw_batch[cw_idx][path] = part_info
else: # path not currently handled by any chunked writer
new_cw_paths.append((path, part_info, meta_path))

# Write out the parts of grouped_df currently handled by existing cw's
for cw_idx, path_to_part_info_map in existing_cw_batch.items():
cw = self._chunked_writers[cw_idx][0]
# match found paths with this cw's paths and nullify partition info
# for partition_col values not in this batch
this_cw_part_info = [
path_to_part_info_map.get(path, (0, 0))
for path in self._chunked_writers[cw_idx][1]
]
cw.write_table(grouped_df, this_cw_part_info)

# Create new cw for unhandled paths encountered in this write_table
new_paths, part_info, meta_paths = zip(*new_cw_paths)
self._chunked_writers.append(
(
ParquetWriter(new_paths, **self.common_args),
new_paths,
meta_paths,
)
)
new_cw_idx = len(self._chunked_writers) - 1
self.path_cw_map.update({k: new_cw_idx for k in new_paths})
self._chunked_writers[-1][0].write_table(grouped_df, part_info)

def close(self, return_metadata=False):
"""
Close all open files and optionally return footer metadata as a binary
blob
"""

metadata = [
cw.close(metadata_file_path=meta_path if return_metadata else None)
for cw, _, meta_path in self._chunked_writers
]

if return_metadata:
return (
merge_parquet_filemetadata(metadata)
if len(metadata) > 1
else metadata[0]
)

def __del__(self):
self.close()
Copy link
Contributor

@shwina shwina Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add the usual comment about __del__ that it's not always guaranteed to be called, even at interpreter shutdown: https://docs.python.org/3/reference/datamodel.html#object.del

There are lots of warnings in SO not to use __del__ as a destructor. The suggestion is to use __enter__() and __exit__() methods instead, the latter of which will call close().

This changes the API, but guarantees that the close() method will be called:

with ParquetDatasetWriter(...) as pq_writer:
    pq_writer.write_table(df1)
    pq_writer.write_table(df2)
# pq_writer.close() will be called upon exiting the `with` statement

If we don't care that the close() method may not always be called, I think using __del__ is OK, and I've seen it done in other parts of RAPIDS. If it's absolutely essential that all ParquetDatasetWriter objects be close'd, we might want to reconsider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's absolutely essential that all ParquetDatasetWriter objects be close'd

It is.

Does cython's __dealloc__ have the same limitation or is it more like the c++ destructor? I ask this because the non-partitioned ParquetWriter uses that.

Copy link
Contributor

@shwina shwina Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never been able to find a reliable reference on whether __dealloc__ (which translates into the type's tp_dealloc method) has the same limitations. My understanding is that it does.

However, now that you mention it, I'm seeing that ParquetWriter is doing something explicitly unsafe, i.e., calling a Python method in its __dealloc__ method. See here for why not to do that. The Cython docs here suggest just using __del__ (which translates into the type's tp_finalize method) in those situations instead...

To be frank, I don't know about situations in which __del__ or __dealloc__ may not be called. We rely on __dealloc__ within RAPIDS to free memory allocated in C++ and it has worked well. In any rare cases where it may not work, we wouldn't run into an error, "just" a memory leak. However, the ParquetWriter case may be different, where we need __dealloc__/__del__ to work as expected for correctness.

edit: fixed a link

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My knowledge of the del/dealloc methods aligns with what @shwina is saying here. I think that switching this API to a context manager (with …) that uses a try/finally to ensure the file is closed is the only way to guarantee the closure, unless you are willing to put it on the user as a requirement to explicitly call a “close” method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only user of ParquetWriter known to me is nvtabular and they wrap it in another ThreadedWriter class which does not have a destructor at all, but does have a close(). Their close() calls ParquetWriter's close().

So should we just remove the destructor altogether?
And then should we add __enter__() and __exit__()? How does that fare as python class design? Having two ways to use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's pretty standard. The file object in Python offers a similar interface, where you can explicitly close it with close, or use it within a with statement which will call close for you.

Calling close multiple times should be allowed, where subsequent calls to close() do nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So should we just remove the destructor altogether?

Yes, as long as we document that this class must either be used within a context manager (with statement), or otherwise, that it's the user's responsibility to call close() explicitly . This can be documented with examples, as part of the class docstring.



def _check_decimal128_type(arrow_type):
if isinstance(arrow_type, pa.Decimal128Type):
if arrow_type.precision > cudf.Decimal64Dtype.MAX_PRECISION:
Expand Down
31 changes: 30 additions & 1 deletion python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
from pyarrow import fs as pa_fs, parquet as pq

import cudf
from cudf.io.parquet import ParquetWriter, merge_parquet_filemetadata
from cudf.io.parquet import (
ParquetWriter,
ParquetDatasetWriter,
merge_parquet_filemetadata,
)
from cudf.testing import dataset_generator as dg
from cudf.testing._utils import (
TIMEDELTA_TYPES,
Expand Down Expand Up @@ -1627,6 +1631,31 @@ def test_parquet_partitioned(tmpdir_factory, cols, filename):
assert fn == filename


def test_parquet_writer_chunked_partitioned(tmpdir_factory):
devavret marked this conversation as resolved.
Show resolved Hide resolved
pdf_dir = str(tmpdir_factory.mktemp("pdf_dir"))
gdf_dir = str(tmpdir_factory.mktemp("gdf_dir"))

df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]})
df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]})

cw = ParquetDatasetWriter(gdf_dir, partition_cols=["a"], index=False)
cw.write_table(df1)
cw.write_table(df2)
cw.close()

pdf = cudf.concat([df1, df2]).to_pandas()
pdf.to_parquet(pdf_dir, index=False, partition_cols=["a"])

# Read back with pandas to compare
expect_pd = pd.read_parquet(pdf_dir)
got_pd = pd.read_parquet(gdf_dir)
assert_eq(expect_pd, got_pd)

# Check that cudf and pd return the same read
got_cudf = cudf.read_parquet(gdf_dir)
assert_eq(got_pd, got_cudf)


@pytest.mark.parametrize("cols", [None, ["b"]])
def test_parquet_write_to_dataset(tmpdir_factory, cols):
dir1 = tmpdir_factory.mktemp("dir1")
Expand Down