Skip to content

Commit

Permalink
Fix null hive-partition behavior in dask-cudf parquet (#12866)
Browse files Browse the repository at this point in the history
This PR includes a few simple changes to fix the handling of null hive partitions in `dask_cudf`.
~Depends on dask/dask#10007

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #12866
  • Loading branch information
rjzamora authored Mar 10, 2023
1 parent e37bddb commit 4da6b19
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 14 deletions.
52 changes: 44 additions & 8 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Dict, List, Optional, Tuple
from uuid import uuid4

import pandas as pd
from pyarrow import dataset as ds, parquet as pq

import cudf
Expand Down Expand Up @@ -269,6 +270,7 @@ def _process_dataset(
filters=None,
row_groups=None,
categorical_partitions=True,
dataset_kwargs=None,
):
# Returns:
# file_list - Expanded/filtered list of paths
Expand Down Expand Up @@ -296,8 +298,13 @@ def _process_dataset(
dataset = ds.dataset(
source=paths[0] if len(paths) == 1 else paths,
filesystem=fs,
format="parquet",
partitioning="hive",
**(
dataset_kwargs
or {
"format": "parquet",
"partitioning": "hive",
}
),
)

file_list = dataset.files
Expand Down Expand Up @@ -423,6 +430,7 @@ def read_parquet(
categorical_partitions=True,
open_file_options=None,
bytes_per_thread=None,
dataset_kwargs=None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -483,6 +491,7 @@ def read_parquet(
filters=filters,
row_groups=row_groups,
categorical_partitions=categorical_partitions,
dataset_kwargs=dataset_kwargs,
)
elif filters is not None:
raise ValueError("cudf cannot apply filters to open file objects.")
Expand Down Expand Up @@ -540,6 +549,7 @@ def read_parquet(
use_pandas_metadata=use_pandas_metadata,
partition_keys=partition_keys,
partition_categories=partition_categories,
dataset_kwargs=dataset_kwargs,
**kwargs,
)

Expand All @@ -551,6 +561,7 @@ def _parquet_to_frame(
row_groups=None,
partition_keys=None,
partition_categories=None,
dataset_kwargs=None,
**kwargs,
):

Expand All @@ -564,6 +575,13 @@ def _parquet_to_frame(
**kwargs,
)

partition_meta = None
partitioning = (dataset_kwargs or {}).get("partitioning", None)
if hasattr(partitioning, "schema"):
partition_meta = cudf.DataFrame.from_arrow(
partitioning.schema.empty_table()
)

# For partitioned data, we need a distinct read for each
# unique set of partition keys. Therefore, we start by
# aggregating all paths with matching keys using a dict
Expand Down Expand Up @@ -607,7 +625,14 @@ def _parquet_to_frame(
else:
# Not building categorical columns, so
# `value` is already what we want
dfs[-1][name] = as_column(value, length=len(dfs[-1]))
if partition_meta is not None:
dfs[-1][name] = as_column(
value,
length=len(dfs[-1]),
dtype=partition_meta[name].dtype,
)
else:
dfs[-1][name] = as_column(value, length=len(dfs[-1]))

# Concatenate dfs and return.
# Assume we can ignore the index if it has no name.
Expand Down Expand Up @@ -827,7 +852,10 @@ def _get_partitioned(
metadata_file_paths = []
for keys in part_names.itertuples(index=False):
subdir = fs.sep.join(
[f"{name}={val}" for name, val in zip(partition_cols, keys)]
[
_hive_dirname(name, val)
for name, val in zip(partition_cols, keys)
]
)
prefix = fs.sep.join([root_path, subdir])
fs.mkdirs(prefix, exist_ok=True)
Expand All @@ -848,16 +876,17 @@ def _get_groups_and_offsets(
):

if not (set(df._data) - set(partition_cols)):
raise ValueError("No data left to save outside partition columns")
warnings.warn("No data left to save outside partition columns")

part_names, part_offsets, _, grouped_df = df.groupby(
partition_cols
_, part_offsets, part_keys, grouped_df = df.groupby(
partition_cols,
dropna=False,
)._grouped()
if not preserve_index:
grouped_df.reset_index(drop=True, inplace=True)
grouped_df.drop(columns=partition_cols, inplace=True)
# Copy the entire keys df in one operation rather than using iloc
part_names = part_names.to_pandas().to_frame(index=False)
part_names = part_keys.to_pandas().unique().to_frame(index=False)

return part_names, grouped_df, part_offsets

Expand Down Expand Up @@ -1251,3 +1280,10 @@ def _default_open_file_options(
)
open_file_options["precache_options"] = precache_options
return open_file_options


def _hive_dirname(name, val):
# Simple utility to produce hive directory name
if pd.isna(val):
val = "__HIVE_DEFAULT_PARTITION__"
return f"{name}={val}"
17 changes: 11 additions & 6 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,16 @@ def _read_paths(
partitioning=None,
partition_keys=None,
open_file_options=None,
dataset_kwargs=None,
**kwargs,
):

# Simplify row_groups if all None
if row_groups == [None for path in paths]:
row_groups = None

dataset_kwargs = dataset_kwargs or {}
dataset_kwargs["partitioning"] = partitioning or "hive"
with ExitStack() as stack:

# Non-local filesystem handling
Expand All @@ -100,6 +103,8 @@ def _read_paths(
columns=columns,
row_groups=row_groups if row_groups else None,
strings_to_categorical=strings_to_categorical,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
except RuntimeError as err:
Expand Down Expand Up @@ -127,15 +132,10 @@ def _read_paths(
if partitions and partition_keys is None:

# Use `HivePartitioning` by default
partitioning = partitioning or {"obj": pa_ds.HivePartitioning}
ds = pa_ds.dataset(
paths,
filesystem=fs,
format="parquet",
partitioning=partitioning["obj"].discover(
*partitioning.get("args", []),
**partitioning.get("kwargs", {}),
),
**dataset_kwargs,
)
frag = next(ds.get_fragments())
if frag:
Expand Down Expand Up @@ -189,6 +189,9 @@ def read_partition(
if isinstance(index, list):
columns += index

dataset_kwargs = kwargs.get("dataset", {})
partitioning = partitioning or dataset_kwargs.get("partitioning", None)

# Check if we are actually selecting any columns
read_columns = columns
if schema and columns:
Expand Down Expand Up @@ -249,6 +252,7 @@ def read_partition(
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
dataset_kwargs=dataset_kwargs,
**read_kwargs,
)
)
Expand All @@ -274,6 +278,7 @@ def read_partition(
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
dataset_kwargs=dataset_kwargs,
**read_kwargs,
)
)
Expand Down
22 changes: 22 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,28 @@ def test_check_file_size(tmpdir):
dask_cudf.read_parquet(fn, check_file_size=1).compute()


def test_null_partition(tmpdir):
import pyarrow as pa
from pyarrow.dataset import HivePartitioning

df = pd.DataFrame({"id": [0, 1, None], "x": [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=1).to_backend("cudf")
ddf.to_parquet(str(tmpdir), partition_on="id")
fns = glob.glob(os.path.join(tmpdir, "id" + "=*/*.parquet"))
assert len(fns) == 3

partitioning = HivePartitioning(pa.schema([("id", pa.float64())]))
ddf_read = dask_cudf.read_parquet(
str(tmpdir),
dataset={"partitioning": partitioning},
)
dd.assert_eq(
ddf[["x", "id"]],
ddf_read[["x", "id"]],
check_divisions=False,
)


def test_nullable_schema_mismatch(tmpdir):
# See: https://github.com/rapidsai/cudf/issues/12702
path0 = str(tmpdir.join("test.0.parquet"))
Expand Down

0 comments on commit 4da6b19

Please sign in to comment.