Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioning support to Parquet chunked writer #10000

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
7ca6570
First working version of partitioned write
devavret Oct 22, 2021
80e03a4
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Nov 22, 2021
21dc54b
multiple sink API
devavret Nov 23, 2021
d947abd
partitions in write parquet API
devavret Nov 23, 2021
360bf87
Fix a bug in frag causing incorrect num rows
devavret Nov 24, 2021
942dd58
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Nov 24, 2021
d454507
Dict encoding changes. Dict kernels now use frags
devavret Nov 25, 2021
b2b44a6
API cleanups
devavret Nov 25, 2021
0b6d33f
Add a gtest and fix other tests by handling no partition case
devavret Nov 26, 2021
2beed73
Add a guard to protect from an exception being thrown in impl dtor wh…
devavret Nov 26, 2021
4e21e99
Add per-sink user_data in table_input_metadata
devavret Nov 29, 2021
e0d1f33
Cleanups in dict code and replace index translating while LIST loop w…
devavret Nov 30, 2021
54de724
fix the returned metadata blob on close
devavret Dec 1, 2021
aa45827
Revert to using meta ctor without user_data in pyx
devavret Dec 1, 2021
06b2643
Remove num_rows param and docs cleanup
devavret Dec 1, 2021
fffb41e
orc use table meta ctor with single user_data
devavret Dec 1, 2021
ecd3aa5
Small size_type cleanups
devavret Dec 1, 2021
950f505
Misc cleanups
devavret Dec 2, 2021
019ac25
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Dec 7, 2021
1d55d1a
API changes
devavret Dec 7, 2021
387c2ac
Take user_data out of table_input_metadata
devavret Dec 8, 2021
9a77f5e
python changes for moving user_data
devavret Dec 8, 2021
dc157e1
Add checks for sizes of options in case of multiple sinks
devavret Dec 8, 2021
79639ee
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Dec 8, 2021
8c2927d
bug in tests in init list for kv meta
devavret Dec 8, 2021
200d1b0
Prevent setting chunk paths if not specified
devavret Dec 9, 2021
d0de9a9
Make returned metadata blob optional
devavret Dec 9, 2021
d90245f
Make sink info members private
devavret Dec 9, 2021
5a17a4c
Revert "Make returned metadata blob optional"
devavret Dec 9, 2021
2e1c359
make source data members private
devavret Dec 10, 2021
f44a50b
Refactor aggregate_metadata
devavret Dec 10, 2021
be8c19a
revert tests that were changed for debugging
devavret Dec 10, 2021
b9b5c15
Add empty df tests, make review changes
devavret Dec 10, 2021
85ce42f
Initial somewhat working python bindings for parquet partitioned writing
devavret Dec 10, 2021
1e79453
Review changes: reduce line size by aliasing the variable I keep using
devavret Dec 13, 2021
be83945
source/sink_info memeber privatisation
devavret Dec 13, 2021
e537314
aggregate metadata privatisation
devavret Dec 13, 2021
91580b4
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Dec 13, 2021
c53fea7
Fix a private member access
devavret Dec 13, 2021
0dfcb89
Merge branch 'parq-partitioned-write' into py-parq-partitioned-write
devavret Dec 13, 2021
085442c
Working for IOBase
devavret Dec 13, 2021
099093d
Started to use groupby instead of special partition logic
devavret Dec 13, 2021
2ffec7e
Avoid multiple d->h iloc using groupby keys
devavret Dec 14, 2021
975207e
Merge branch 'branch-22.02' into py-parq-partitioned-write
devavret Dec 14, 2021
5dae74d
Index fixes
devavret Dec 15, 2021
120f32e
Re-enabling non-partitioned case
devavret Dec 15, 2021
39ec9a0
remember to return metadata. remove temp test
devavret Dec 15, 2021
43d35cb
actually drop index when asked to. now mimics the pre-refactor behaviour
devavret Dec 15, 2021
7d7444a
Fix flaky test
devavret Dec 16, 2021
e66d855
Metadata file paths should be a list
devavret Dec 17, 2021
2442be8
Make all sinks work again
devavret Dec 17, 2021
48354e9
Consolidate common args and save some lines
devavret Dec 17, 2021
72b1e81
write_to_dataset cleanups:
devavret Dec 18, 2021
e84fc1c
Write_to_dataset calls back to_parquet.
devavret Dec 20, 2021
96139d4
Passthrough other args with partition_cols instead of ignoring
devavret Dec 20, 2021
2ec7d29
Merge branch 'branch-22.02' into py-parq-partitioned-write
devavret Dec 20, 2021
f6d4175
Fix style issues in python/cython
devavret Dec 21, 2021
833bc88
allowing cython ParquetWriter to call write with partitions
devavret Dec 29, 2021
15557b7
First working poc of ParquetWriter class that can dynamically handle …
devavret Dec 29, 2021
879294c
code cleanups to be more pythonic
devavret Dec 29, 2021
2bcf62c
enable regular old non-partitioned writing
devavret Dec 29, 2021
cf13e81
Enable index arg and passthrough compression and statistics
devavret Dec 30, 2021
7e57eec
ability to return metadata
devavret Dec 30, 2021
48fb111
Skip opening file which is not required for remote
devavret Jan 5, 2022
0513d54
review changes
devavret Jan 6, 2022
eab23d0
Merge branch 'py-parq-partitioned-write' into chunked-partitioned-par…
devavret Jan 7, 2022
5b00973
Review changes
devavret Jan 7, 2022
b47d38f
Merge branch 'py-parq-partitioned-write' into chunked-partitioned-par…
devavret Jan 7, 2022
9d61a77
Docs, remove custom fs support
devavret Jan 10, 2022
ddb95a3
Merge branch 'branch-22.02' into chunked-partitioned-parq-write
devavret Jan 10, 2022
912301f
remove redundant partitioning logic in write_to_dataset
devavret Jan 10, 2022
bb30a09
fix cython style
devavret Jan 10, 2022
18c3524
Add pytest
devavret Jan 10, 2022
cd698ed
mypy fixes
devavret Jan 11, 2022
b717373
More mypy fix
devavret Jan 11, 2022
7278f4f
Review fixes
devavret Jan 11, 2022
4b0efe5
1 more review change
devavret Jan 11, 2022
ef69e62
Add returned meta test
devavret Jan 12, 2022
fc4a6df
mypy fix
devavret Jan 12, 2022
e1d608a
Another style fix
devavret Jan 12, 2022
64aae8d
Merge branch 'branch-22.02' into chunked-partitioned-parq-write
devavret Jan 12, 2022
70612e6
Remove destructor in favour of contextlib
devavret Jan 12, 2022
6552fbe
Review changes
devavret Jan 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
parquet_chunked_writer& write(
cudf_table_view.table_view table_,
) except+
parquet_chunked_writer& write(
const cudf_table_view.table_view& table_,
const vector[cudf_io_types.partition_info]& partitions,
) except+
unique_ptr[vector[uint8_t]] close(
vector[string] column_chunks_file_paths,
) except+
Expand Down
52 changes: 40 additions & 12 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -411,23 +411,31 @@ cdef class ParquetWriter:
cdef unique_ptr[cpp_parquet_chunked_writer] writer
cdef unique_ptr[table_input_metadata] tbl_meta
cdef cudf_io_types.sink_info sink
cdef unique_ptr[cudf_io_types.data_sink] _data_sink
cdef vector[unique_ptr[cudf_io_types.data_sink]] _data_sink
cdef cudf_io_types.statistics_freq stat_freq
cdef cudf_io_types.compression_type comp_type
cdef object index

def __cinit__(self, object path, object index=None,
def __cinit__(self, object filepaths_or_buffers, object index=None,
object compression=None, str statistics="ROWGROUP"):
self.sink = make_sink_info(path, self._data_sink)
filepaths_or_buffers = (
list(filepaths_or_buffers)
if is_list_like(filepaths_or_buffers)
else [filepaths_or_buffers]
)
self.sink = make_sinks_info(filepaths_or_buffers, self._data_sink)
self.stat_freq = _get_stat_freq(statistics)
self.comp_type = _get_comp_type(compression)
self.index = index
self.initialized = False

def write_table(self, table):
def write_table(self, table, object partitions_info=None):
""" Writes a single table to the file """
if not self.initialized:
self._initialize_chunked_state(table)
self._initialize_chunked_state(
table,
num_partitions=len(partitions_info) if partitions_info else 1
)

cdef table_view tv
if self.index is not False and (
Expand All @@ -437,8 +445,15 @@ cdef class ParquetWriter:
else:
tv = table_view_from_table(table, ignore_index=True)

cdef vector[cudf_io_types.partition_info] partitions
if partitions_info is not None:
for part in partitions_info:
partitions.push_back(
cudf_io_types.partition_info(part[0], part[1])
)

with nogil:
self.writer.get()[0].write(tv)
self.writer.get()[0].write(tv, partitions)

def close(self, object metadata_file_path=None):
cdef unique_ptr[vector[uint8_t]] out_metadata_c
Expand All @@ -449,7 +464,13 @@ cdef class ParquetWriter:

# Update metadata-collection options
if metadata_file_path is not None:
column_chunks_file_paths.push_back(str.encode(metadata_file_path))
if is_list_like(metadata_file_path):
for path in metadata_file_path:
column_chunks_file_paths.push_back(str.encode(path))
else:
column_chunks_file_paths.push_back(
str.encode(metadata_file_path)
)

with nogil:
out_metadata_c = move(
Expand All @@ -463,10 +484,13 @@ cdef class ParquetWriter:
return np.asarray(out_metadata_py)
return None

def __dealloc__(self):
def __enter__(self):
return self

def __exit__(self, *args):
self.close()

def _initialize_chunked_state(self, table):
def _initialize_chunked_state(self, table, num_partitions=1):
""" Prepares all the values required to build the
chunked_parquet_writer_options and creates a writer"""
cdef table_view tv
Expand Down Expand Up @@ -499,10 +523,14 @@ cdef class ParquetWriter:
table[name]._column, self.tbl_meta.get().column_metadata[i]
)

pandas_metadata = generate_pandas_metadata(table, self.index)
index = (
False if isinstance(table._index, cudf.RangeIndex) else self.index
)
pandas_metadata = generate_pandas_metadata(table, index)
cdef map[string, string] tmp_user_data
tmp_user_data[str.encode("pandas")] = str.encode(pandas_metadata)
cdef vector[map[string, string]] user_data
user_data.resize(1)
user_data.back()[str.encode("pandas")] = str.encode(pandas_metadata)
user_data = vector[map[string, string]](num_partitions, tmp_user_data)
vyasr marked this conversation as resolved.
Show resolved Hide resolved

cdef chunked_parquet_writer_options args
with nogil:
Expand Down
Loading