diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index 07b312361f2..d02fffe9c0d 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -207,6 +207,10 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_chunked_writer& write( cudf_table_view.table_view table_, ) except+ + parquet_chunked_writer& write( + const cudf_table_view.table_view& table_, + const vector[cudf_io_types.partition_info]& partitions, + ) except+ unique_ptr[vector[uint8_t]] close( vector[string] column_chunks_file_paths, ) except+ diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 36099b03ef6..16873435e1d 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -411,23 +411,31 @@ cdef class ParquetWriter: cdef unique_ptr[cpp_parquet_chunked_writer] writer cdef unique_ptr[table_input_metadata] tbl_meta cdef cudf_io_types.sink_info sink - cdef unique_ptr[cudf_io_types.data_sink] _data_sink + cdef vector[unique_ptr[cudf_io_types.data_sink]] _data_sink cdef cudf_io_types.statistics_freq stat_freq cdef cudf_io_types.compression_type comp_type cdef object index - def __cinit__(self, object path, object index=None, + def __cinit__(self, object filepaths_or_buffers, object index=None, object compression=None, str statistics="ROWGROUP"): - self.sink = make_sink_info(path, self._data_sink) + filepaths_or_buffers = ( + list(filepaths_or_buffers) + if is_list_like(filepaths_or_buffers) + else [filepaths_or_buffers] + ) + self.sink = make_sinks_info(filepaths_or_buffers, self._data_sink) self.stat_freq = _get_stat_freq(statistics) self.comp_type = _get_comp_type(compression) self.index = index self.initialized = False - def write_table(self, table): + def write_table(self, table, object partitions_info=None): """ Writes a single table to the file """ if not self.initialized: - self._initialize_chunked_state(table) + self._initialize_chunked_state( + table, + num_partitions=len(partitions_info) if partitions_info else 1 + ) cdef table_view tv if self.index is not False and ( @@ -437,8 +445,15 @@ cdef class ParquetWriter: else: tv = table_view_from_table(table, ignore_index=True) + cdef vector[cudf_io_types.partition_info] partitions + if partitions_info is not None: + for part in partitions_info: + partitions.push_back( + cudf_io_types.partition_info(part[0], part[1]) + ) + with nogil: - self.writer.get()[0].write(tv) + self.writer.get()[0].write(tv, partitions) def close(self, object metadata_file_path=None): cdef unique_ptr[vector[uint8_t]] out_metadata_c @@ -449,7 +464,13 @@ cdef class ParquetWriter: # Update metadata-collection options if metadata_file_path is not None: - column_chunks_file_paths.push_back(str.encode(metadata_file_path)) + if is_list_like(metadata_file_path): + for path in metadata_file_path: + column_chunks_file_paths.push_back(str.encode(path)) + else: + column_chunks_file_paths.push_back( + str.encode(metadata_file_path) + ) with nogil: out_metadata_c = move( @@ -463,10 +484,13 @@ cdef class ParquetWriter: return np.asarray(out_metadata_py) return None - def __dealloc__(self): + def __enter__(self): + return self + + def __exit__(self, *args): self.close() - def _initialize_chunked_state(self, table): + def _initialize_chunked_state(self, table, num_partitions=1): """ Prepares all the values required to build the chunked_parquet_writer_options and creates a writer""" cdef table_view tv @@ -499,10 +523,14 @@ cdef class ParquetWriter: table[name]._column, self.tbl_meta.get().column_metadata[i] ) - pandas_metadata = generate_pandas_metadata(table, self.index) + index = ( + False if isinstance(table._index, cudf.RangeIndex) else self.index + ) + pandas_metadata = generate_pandas_metadata(table, index) + cdef map[string, string] tmp_user_data + tmp_user_data[str.encode("pandas")] = str.encode(pandas_metadata) cdef vector[map[string, string]] user_data - user_data.resize(1) - user_data.back()[str.encode("pandas")] = str.encode(pandas_metadata) + user_data = vector[map[string, string]](num_partitions, tmp_user_data) cdef chunked_parquet_writer_options args with nogil: diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index ca03e40e2a6..9694d19e159 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -5,9 +5,11 @@ import warnings from collections import defaultdict from contextlib import ExitStack +from typing import Dict, List, Tuple from uuid import uuid4 import fsspec +import numpy as np import pyarrow as pa from pyarrow import dataset as ds, parquet as pq @@ -126,32 +128,21 @@ def write_to_dataset( if partition_cols is not None and len(partition_cols) > 0: - data_cols = df.columns.drop(partition_cols) - if len(data_cols) == 0: - raise ValueError("No data left to save outside partition columns") - - part_names, part_offsets, _, grouped_df = df.groupby( - partition_cols - )._grouped() - if not preserve_index: - grouped_df.reset_index(drop=True, inplace=True) - grouped_df.drop(columns=partition_cols, inplace=True) - # Copy the entire keys df in one operation rather than using iloc - part_names = part_names.to_pandas().to_frame(index=False) - - full_paths = [] - metadata_file_paths = [] - for keys in part_names.itertuples(index=False): - subdir = fs.sep.join( - [f"{name}={val}" for name, val in zip(partition_cols, keys)] - ) - prefix = fs.sep.join([root_path, subdir]) - fs.mkdirs(prefix, exist_ok=True) - filename = filename or uuid4().hex + ".parquet" - full_path = fs.sep.join([prefix, filename]) - full_paths.append(full_path) - if return_metadata: - metadata_file_paths.append(fs.sep.join([subdir, filename])) + ( + full_paths, + metadata_file_paths, + grouped_df, + part_offsets, + _, + ) = _get_partitioned( + df, + root_path, + partition_cols, + filename, + fs, + preserve_index, + **kwargs, + ) if return_metadata: kwargs["metadata_file_path"] = metadata_file_paths @@ -164,7 +155,7 @@ def write_to_dataset( ) else: - filename = filename or uuid4().hex + ".parquet" + filename = filename or _generate_filename() full_path = fs.sep.join([root_path, filename]) if return_metadata: kwargs["metadata_file_path"] = filename @@ -737,13 +728,12 @@ def to_parquet( ) if partition_offsets: - kwargs["partitions_info"] = [ - ( - partition_offsets[i], - partition_offsets[i + 1] - partition_offsets[i], + kwargs["partitions_info"] = list( + zip( + partition_offsets, + np.roll(partition_offsets, -1) - partition_offsets, ) - for i in range(0, len(partition_offsets) - 1) - ] + )[:-1] return _write_parquet( df, @@ -790,9 +780,210 @@ def merge_parquet_filemetadata(filemetadata_list): return libparquet.merge_filemetadata(filemetadata_list) +def _generate_filename(): + return uuid4().hex + ".parquet" + + +def _get_partitioned( + df, + root_path, + partition_cols, + filename=None, + fs=None, + preserve_index=False, + **kwargs, +): + fs = ioutils._ensure_filesystem(fs, root_path, **kwargs) + fs.mkdirs(root_path, exist_ok=True) + if not (set(df._data) - set(partition_cols)): + raise ValueError("No data left to save outside partition columns") + + part_names, part_offsets, _, grouped_df = df.groupby( + partition_cols + )._grouped() + if not preserve_index: + grouped_df.reset_index(drop=True, inplace=True) + grouped_df.drop(columns=partition_cols, inplace=True) + # Copy the entire keys df in one operation rather than using iloc + part_names = part_names.to_pandas().to_frame(index=False) + + full_paths = [] + metadata_file_paths = [] + for keys in part_names.itertuples(index=False): + subdir = fs.sep.join( + [f"{name}={val}" for name, val in zip(partition_cols, keys)] + ) + prefix = fs.sep.join([root_path, subdir]) + fs.mkdirs(prefix, exist_ok=True) + filename = filename or _generate_filename() + full_path = fs.sep.join([prefix, filename]) + full_paths.append(full_path) + metadata_file_paths.append(fs.sep.join([subdir, filename])) + + return full_paths, metadata_file_paths, grouped_df, part_offsets, filename + + ParquetWriter = libparquet.ParquetWriter +class ParquetDatasetWriter: + def __init__( + self, + path, + partition_cols, + index=None, + compression=None, + statistics="ROWGROUP", + ) -> None: + """ + Write a parquet file or dataset incrementally + + Parameters + ---------- + path : str + File path or Root Directory path. Will be used as Root Directory + path while writing a partitioned dataset. + partition_cols : list + Column names by which to partition the dataset + Columns are partitioned in the order they are given + index : bool, default None + If ``True``, include the dataframe’s index(es) in the file output. + If ``False``, they will not be written to the file. If ``None``, + index(es) other than RangeIndex will be saved as columns. + compression : {'snappy', None}, default 'snappy' + Name of the compression to use. Use ``None`` for no compression. + statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' + Level at which column statistics should be included in file. + + + Examples + ________ + Using a context + + >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) + >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) + >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: + ... cw.write_table(df1) + ... cw.write_table(df2) + + By manually calling ``close()`` + + >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) + >>> cw.write_table(df1) + >>> cw.write_table(df2) + >>> cw.close() + + Both the methods will generate the same directory structure + + .. code-block:: bash + + dataset/ + a=1 + .parquet + a=2 + .parquet + a=3 + .parquet + + """ + self.path = path + self.common_args = { + "index": index, + "compression": compression, + "statistics": statistics, + } + self.partition_cols = partition_cols + # Collection of `ParquetWriter`s, and the corresponding + # partition_col values they're responsible for + self._chunked_writers: List[ + Tuple[libparquet.ParquetWriter, List[str], str] + ] = [] + # Map of partition_col values to their ParquetWriter's index + # in self._chunked_writers for reverse lookup + self.path_cw_map: Dict[str, int] = {} + self.filename = None + + def write_table(self, df): + """ + Write a dataframe to the file/dataset + """ + ( + paths, + metadata_file_paths, + grouped_df, + offsets, + self.filename, + ) = _get_partitioned( + df, + self.path, + self.partition_cols, + preserve_index=self.common_args["index"], + filename=self.filename, + ) + + existing_cw_batch = defaultdict(dict) + new_cw_paths = [] + + for path, part_info, meta_path in zip( + paths, + zip(offsets, np.roll(offsets, -1) - offsets), + metadata_file_paths, + ): + if path in self.path_cw_map: # path is a currently open file + cw_idx = self.path_cw_map[path] + existing_cw_batch[cw_idx][path] = part_info + else: # path not currently handled by any chunked writer + new_cw_paths.append((path, part_info, meta_path)) + + # Write out the parts of grouped_df currently handled by existing cw's + for cw_idx, path_to_part_info_map in existing_cw_batch.items(): + cw = self._chunked_writers[cw_idx][0] + # match found paths with this cw's paths and nullify partition info + # for partition_col values not in this batch + this_cw_part_info = [ + path_to_part_info_map.get(path, (0, 0)) + for path in self._chunked_writers[cw_idx][1] + ] + cw.write_table(grouped_df, this_cw_part_info) + + # Create new cw for unhandled paths encountered in this write_table + new_paths, part_info, meta_paths = zip(*new_cw_paths) + self._chunked_writers.append( + ( + ParquetWriter(new_paths, **self.common_args), + new_paths, + meta_paths, + ) + ) + new_cw_idx = len(self._chunked_writers) - 1 + self.path_cw_map.update({k: new_cw_idx for k in new_paths}) + self._chunked_writers[-1][0].write_table(grouped_df, part_info) + + def close(self, return_metadata=False): + """ + Close all open files and optionally return footer metadata as a binary + blob + """ + + metadata = [ + cw.close(metadata_file_path=meta_path if return_metadata else None) + for cw, _, meta_path in self._chunked_writers + ] + + if return_metadata: + return ( + merge_parquet_filemetadata(metadata) + if len(metadata) > 1 + else metadata[0] + ) + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def _check_decimal128_type(arrow_type): if isinstance(arrow_type, pa.Decimal128Type): if arrow_type.precision > cudf.Decimal64Dtype.MAX_PRECISION: diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 9a66de8a3a6..016ed1229f1 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -18,7 +18,11 @@ from pyarrow import fs as pa_fs, parquet as pq import cudf -from cudf.io.parquet import ParquetWriter, merge_parquet_filemetadata +from cudf.io.parquet import ( + ParquetDatasetWriter, + ParquetWriter, + merge_parquet_filemetadata, +) from cudf.testing import dataset_generator as dg from cudf.testing._utils import ( TIMEDELTA_TYPES, @@ -1573,6 +1577,16 @@ def test_parquet_writer_gpu_chunked(tmpdir, simple_pdf, simple_gdf): assert_eq(pd.read_parquet(gdf_fname), pd.concat([simple_pdf, simple_pdf])) +def test_parquet_writer_gpu_chunked_context(tmpdir, simple_pdf, simple_gdf): + gdf_fname = tmpdir.join("gdf.parquet") + + with ParquetWriter(gdf_fname) as writer: + writer.write_table(simple_gdf) + writer.write_table(simple_gdf) + + assert_eq(pd.read_parquet(gdf_fname), pd.concat([simple_pdf, simple_pdf])) + + def test_parquet_write_bytes_io(simple_gdf): output = BytesIO() simple_gdf.to_parquet(output) @@ -1627,6 +1641,73 @@ def test_parquet_partitioned(tmpdir_factory, cols, filename): assert fn == filename +@pytest.mark.parametrize("return_meta", [True, False]) +def test_parquet_writer_chunked_partitioned(tmpdir_factory, return_meta): + pdf_dir = str(tmpdir_factory.mktemp("pdf_dir")) + gdf_dir = str(tmpdir_factory.mktemp("gdf_dir")) + + df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) + df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) + + cw = ParquetDatasetWriter(gdf_dir, partition_cols=["a"], index=False) + cw.write_table(df1) + cw.write_table(df2) + meta_byte_array = cw.close(return_metadata=return_meta) + pdf = cudf.concat([df1, df2]).to_pandas() + pdf.to_parquet(pdf_dir, index=False, partition_cols=["a"]) + + if return_meta: + fmd = pq.ParquetFile(BytesIO(meta_byte_array)).metadata + assert fmd.num_rows == len(pdf) + assert fmd.num_row_groups == 4 + files = { + os.path.join(directory, files[0]) + for directory, _, files in os.walk(gdf_dir) + if files + } + meta_files = { + os.path.join(gdf_dir, fmd.row_group(i).column(c).file_path) + for i in range(fmd.num_row_groups) + for c in range(fmd.row_group(i).num_columns) + } + assert files == meta_files + + # Read back with pandas to compare + expect_pd = pd.read_parquet(pdf_dir) + got_pd = pd.read_parquet(gdf_dir) + assert_eq(expect_pd, got_pd) + + # Check that cudf and pd return the same read + got_cudf = cudf.read_parquet(gdf_dir) + assert_eq(got_pd, got_cudf) + + +def test_parquet_writer_chunked_partitioned_context(tmpdir_factory): + pdf_dir = str(tmpdir_factory.mktemp("pdf_dir")) + gdf_dir = str(tmpdir_factory.mktemp("gdf_dir")) + + df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) + df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) + + with ParquetDatasetWriter( + gdf_dir, partition_cols=["a"], index=False + ) as cw: + cw.write_table(df1) + cw.write_table(df2) + + pdf = cudf.concat([df1, df2]).to_pandas() + pdf.to_parquet(pdf_dir, index=False, partition_cols=["a"]) + + # Read back with pandas to compare + expect_pd = pd.read_parquet(pdf_dir) + got_pd = pd.read_parquet(gdf_dir) + assert_eq(expect_pd, got_pd) + + # Check that cudf and pd return the same read + got_cudf = cudf.read_parquet(gdf_dir) + assert_eq(got_pd, got_cudf) + + @pytest.mark.parametrize("cols", [None, ["b"]]) def test_parquet_write_to_dataset(tmpdir_factory, cols): dir1 = tmpdir_factory.mktemp("dir1")