Skip to content

Commit

Permalink
Review changes
Browse files Browse the repository at this point in the history
- Replace part_info generator with numpy roll
- Add examples to docs
  • Loading branch information
devavret committed Jan 13, 2022
1 parent 70612e6 commit 6552fbe
Showing 1 changed file with 46 additions and 19 deletions.
65 changes: 46 additions & 19 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from uuid import uuid4

import fsspec
import numpy as np
import pyarrow as pa
from pyarrow import dataset as ds, parquet as pq

Expand Down Expand Up @@ -154,7 +155,7 @@ def write_to_dataset(
)

else:
filename = filename or (uuid4().hex + ".parquet")
filename = filename or _generate_filename()
full_path = fs.sep.join([root_path, filename])
if return_metadata:
kwargs["metadata_file_path"] = filename
Expand Down Expand Up @@ -727,13 +728,12 @@ def to_parquet(
)

if partition_offsets:
kwargs["partitions_info"] = [
(
partition_offsets[i],
partition_offsets[i + 1] - partition_offsets[i],
kwargs["partitions_info"] = list(
zip(
partition_offsets,
np.roll(partition_offsets, -1) - partition_offsets,
)
for i in range(0, len(partition_offsets) - 1)
]
)[:-1]

return _write_parquet(
df,
Expand Down Expand Up @@ -780,6 +780,10 @@ def merge_parquet_filemetadata(filemetadata_list):
return libparquet.merge_filemetadata(filemetadata_list)


def _generate_filename():
return uuid4().hex + ".parquet"


def _get_partitioned(
df,
root_path,
Expand Down Expand Up @@ -811,7 +815,7 @@ def _get_partitioned(
)
prefix = fs.sep.join([root_path, subdir])
fs.mkdirs(prefix, exist_ok=True)
filename = filename or (uuid4().hex + ".parquet")
filename = filename or _generate_filename()
full_path = fs.sep.join([prefix, filename])
full_paths.append(full_path)
metadata_file_paths.append(fs.sep.join([subdir, filename]))
Expand Down Expand Up @@ -850,6 +854,37 @@ def __init__(
Name of the compression to use. Use ``None`` for no compression.
statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP'
Level at which column statistics should be included in file.
Examples
________
Using a context
>>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]})
>>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]})
>>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw:
... cw.write_table(df1)
... cw.write_table(df2)
By manually calling ``close()``
>>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"])
>>> cw.write_table(df1)
>>> cw.write_table(df2)
>>> cw.close()
Both the methods will generate the same directory structure
.. code-block:: bash
dataset/
a=1
<filename>.parquet
a=2
<filename>.parquet
a=3
<filename>.parquet
"""
self.path = path
self.common_args = {
Expand Down Expand Up @@ -889,18 +924,10 @@ def write_table(self, df):
existing_cw_batch = defaultdict(dict)
new_cw_paths = []

def pairwise(iterable):
"""
Generates a pair of `(it[i], it[i + 1] - it[i])` from iterable `it`
"""
it = iter(iterable)
a = next(it, None)
for b in it:
yield (a, b - a)
a = b

for path, part_info, meta_path in zip(
paths, pairwise(offsets), metadata_file_paths
paths,
zip(offsets, np.roll(offsets, -1) - offsets),
metadata_file_paths,
):
if path in self.path_cw_map: # path is a currently open file
cw_idx = self.path_cw_map[path]
Expand Down

0 comments on commit 6552fbe

Please sign in to comment.