Skip to content

Commit

Permalink
Add partitioning support to Parquet chunked writer (#10000)
Browse files Browse the repository at this point in the history
Chunked writer (`class ParquetWriter`) now takes an argument `partition_cols`. For each call to `write_table(df)`, the `df` is partitioned and the parts are appended to the same corresponding file in the dataset directory. This can be used when partitioning is desired but when one wants to avoid making many small files in each sub directory e.g.
Instead of repeated call to `write_to_dataset` like so:
```python
write_to_dataset(df1, root_path, partition_cols=['group'])
write_to_dataset(df2, root_path, partition_cols=['group'])
...
```
which will yield the following structure
```
root_dir/
  group=value1/
    <uuid1>.parquet
    <uuid2>.parquet
    ...
  group=value2/
    <uuid1>.parquet
    <uuid2>.parquet
    ...
  ...
```
One can write with
```python
pw = ParquetWriter(root_path, partition_cols=['group'])
pw.write_table(df1)
pw.write_table(df2)
pw.close()
```
to get the structure
```
root_dir/
  group=value1/
    <uuid1>.parquet
  group=value2/
    <uuid1>.parquet
  ...
```

Closes #7196
Also workaround fixes
fixes #9216
fixes #7011

TODO:

- [x] Tests

Authors:
  - Devavret Makkar (https://github.com/devavret)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Ashwin Srinath (https://github.com/shwina)

URL: #10000
  • Loading branch information
devavret authored Jan 14, 2022
1 parent c07fdab commit 1eceaed
Show file tree
Hide file tree
Showing 4 changed files with 350 additions and 46 deletions.
4 changes: 4 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
parquet_chunked_writer& write(
cudf_table_view.table_view table_,
) except+
parquet_chunked_writer& write(
const cudf_table_view.table_view& table_,
const vector[cudf_io_types.partition_info]& partitions,
) except+
unique_ptr[vector[uint8_t]] close(
vector[string] column_chunks_file_paths,
) except+
Expand Down
52 changes: 40 additions & 12 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -411,23 +411,31 @@ cdef class ParquetWriter:
cdef unique_ptr[cpp_parquet_chunked_writer] writer
cdef unique_ptr[table_input_metadata] tbl_meta
cdef cudf_io_types.sink_info sink
cdef unique_ptr[cudf_io_types.data_sink] _data_sink
cdef vector[unique_ptr[cudf_io_types.data_sink]] _data_sink
cdef cudf_io_types.statistics_freq stat_freq
cdef cudf_io_types.compression_type comp_type
cdef object index

def __cinit__(self, object path, object index=None,
def __cinit__(self, object filepaths_or_buffers, object index=None,
object compression=None, str statistics="ROWGROUP"):
self.sink = make_sink_info(path, self._data_sink)
filepaths_or_buffers = (
list(filepaths_or_buffers)
if is_list_like(filepaths_or_buffers)
else [filepaths_or_buffers]
)
self.sink = make_sinks_info(filepaths_or_buffers, self._data_sink)
self.stat_freq = _get_stat_freq(statistics)
self.comp_type = _get_comp_type(compression)
self.index = index
self.initialized = False

def write_table(self, table):
def write_table(self, table, object partitions_info=None):
""" Writes a single table to the file """
if not self.initialized:
self._initialize_chunked_state(table)
self._initialize_chunked_state(
table,
num_partitions=len(partitions_info) if partitions_info else 1
)

cdef table_view tv
if self.index is not False and (
Expand All @@ -437,8 +445,15 @@ cdef class ParquetWriter:
else:
tv = table_view_from_table(table, ignore_index=True)

cdef vector[cudf_io_types.partition_info] partitions
if partitions_info is not None:
for part in partitions_info:
partitions.push_back(
cudf_io_types.partition_info(part[0], part[1])
)

with nogil:
self.writer.get()[0].write(tv)
self.writer.get()[0].write(tv, partitions)

def close(self, object metadata_file_path=None):
cdef unique_ptr[vector[uint8_t]] out_metadata_c
Expand All @@ -449,7 +464,13 @@ cdef class ParquetWriter:

# Update metadata-collection options
if metadata_file_path is not None:
column_chunks_file_paths.push_back(str.encode(metadata_file_path))
if is_list_like(metadata_file_path):
for path in metadata_file_path:
column_chunks_file_paths.push_back(str.encode(path))
else:
column_chunks_file_paths.push_back(
str.encode(metadata_file_path)
)

with nogil:
out_metadata_c = move(
Expand All @@ -463,10 +484,13 @@ cdef class ParquetWriter:
return np.asarray(out_metadata_py)
return None

def __dealloc__(self):
def __enter__(self):
return self

def __exit__(self, *args):
self.close()

def _initialize_chunked_state(self, table):
def _initialize_chunked_state(self, table, num_partitions=1):
""" Prepares all the values required to build the
chunked_parquet_writer_options and creates a writer"""
cdef table_view tv
Expand Down Expand Up @@ -499,10 +523,14 @@ cdef class ParquetWriter:
table[name]._column, self.tbl_meta.get().column_metadata[i]
)

pandas_metadata = generate_pandas_metadata(table, self.index)
index = (
False if isinstance(table._index, cudf.RangeIndex) else self.index
)
pandas_metadata = generate_pandas_metadata(table, index)
cdef map[string, string] tmp_user_data
tmp_user_data[str.encode("pandas")] = str.encode(pandas_metadata)
cdef vector[map[string, string]] user_data
user_data.resize(1)
user_data.back()[str.encode("pandas")] = str.encode(pandas_metadata)
user_data = vector[map[string, string]](num_partitions, tmp_user_data)

cdef chunked_parquet_writer_options args
with nogil:
Expand Down
Loading

0 comments on commit 1eceaed

Please sign in to comment.