From 6552fbee05d8a814d9cae9c1d3c417d78c98239c Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 14 Jan 2022 02:41:28 +0530 Subject: [PATCH] Review changes - Replace part_info generator with numpy roll - Add examples to docs --- python/cudf/cudf/io/parquet.py | 65 ++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 19 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index bd4de908f49..9694d19e159 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -9,6 +9,7 @@ from uuid import uuid4 import fsspec +import numpy as np import pyarrow as pa from pyarrow import dataset as ds, parquet as pq @@ -154,7 +155,7 @@ def write_to_dataset( ) else: - filename = filename or (uuid4().hex + ".parquet") + filename = filename or _generate_filename() full_path = fs.sep.join([root_path, filename]) if return_metadata: kwargs["metadata_file_path"] = filename @@ -727,13 +728,12 @@ def to_parquet( ) if partition_offsets: - kwargs["partitions_info"] = [ - ( - partition_offsets[i], - partition_offsets[i + 1] - partition_offsets[i], + kwargs["partitions_info"] = list( + zip( + partition_offsets, + np.roll(partition_offsets, -1) - partition_offsets, ) - for i in range(0, len(partition_offsets) - 1) - ] + )[:-1] return _write_parquet( df, @@ -780,6 +780,10 @@ def merge_parquet_filemetadata(filemetadata_list): return libparquet.merge_filemetadata(filemetadata_list) +def _generate_filename(): + return uuid4().hex + ".parquet" + + def _get_partitioned( df, root_path, @@ -811,7 +815,7 @@ def _get_partitioned( ) prefix = fs.sep.join([root_path, subdir]) fs.mkdirs(prefix, exist_ok=True) - filename = filename or (uuid4().hex + ".parquet") + filename = filename or _generate_filename() full_path = fs.sep.join([prefix, filename]) full_paths.append(full_path) metadata_file_paths.append(fs.sep.join([subdir, filename])) @@ -850,6 +854,37 @@ def __init__( Name of the compression to use. Use ``None`` for no compression. statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' Level at which column statistics should be included in file. + + + Examples + ________ + Using a context + + >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) + >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) + >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: + ... cw.write_table(df1) + ... cw.write_table(df2) + + By manually calling ``close()`` + + >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) + >>> cw.write_table(df1) + >>> cw.write_table(df2) + >>> cw.close() + + Both the methods will generate the same directory structure + + .. code-block:: bash + + dataset/ + a=1 + .parquet + a=2 + .parquet + a=3 + .parquet + """ self.path = path self.common_args = { @@ -889,18 +924,10 @@ def write_table(self, df): existing_cw_batch = defaultdict(dict) new_cw_paths = [] - def pairwise(iterable): - """ - Generates a pair of `(it[i], it[i + 1] - it[i])` from iterable `it` - """ - it = iter(iterable) - a = next(it, None) - for b in it: - yield (a, b - a) - a = b - for path, part_info, meta_path in zip( - paths, pairwise(offsets), metadata_file_paths + paths, + zip(offsets, np.roll(offsets, -1) - offsets), + metadata_file_paths, ): if path in self.path_cw_map: # path is a currently open file cw_idx = self.path_cw_map[path]