Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix null hive-partition behavior in dask-cudf parquet #12866

Merged
merged 17 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Dict, List, Optional, Tuple
from uuid import uuid4

import pandas as pd
from pyarrow import dataset as ds, parquet as pq

import cudf
Expand Down Expand Up @@ -269,6 +270,7 @@ def _process_dataset(
filters=None,
row_groups=None,
categorical_partitions=True,
dataset_kwargs=None,
):
# Returns:
# file_list - Expanded/filtered list of paths
Expand Down Expand Up @@ -296,8 +298,13 @@ def _process_dataset(
dataset = ds.dataset(
source=paths[0] if len(paths) == 1 else paths,
filesystem=fs,
format="parquet",
partitioning="hive",
**(
dataset_kwargs
or {
"format": "parquet",
"partitioning": "hive",
}
),
)

file_list = dataset.files
Expand Down Expand Up @@ -423,6 +430,7 @@ def read_parquet(
categorical_partitions=True,
open_file_options=None,
bytes_per_thread=None,
dataset_kwargs=None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -483,6 +491,7 @@ def read_parquet(
filters=filters,
row_groups=row_groups,
categorical_partitions=categorical_partitions,
dataset_kwargs=dataset_kwargs,
)
elif filters is not None:
raise ValueError("cudf cannot apply filters to open file objects.")
Expand Down Expand Up @@ -540,6 +549,7 @@ def read_parquet(
use_pandas_metadata=use_pandas_metadata,
partition_keys=partition_keys,
partition_categories=partition_categories,
dataset_kwargs=dataset_kwargs,
**kwargs,
)

Expand All @@ -551,6 +561,7 @@ def _parquet_to_frame(
row_groups=None,
partition_keys=None,
partition_categories=None,
dataset_kwargs=None,
**kwargs,
):

Expand All @@ -564,6 +575,13 @@ def _parquet_to_frame(
**kwargs,
)

partition_meta = None
partitioning = (dataset_kwargs or {}).get("partitioning", None)
if hasattr(partitioning, "schema"):
partition_meta = cudf.DataFrame.from_arrow(
partitioning.schema.empty_table()
)

# For partitioned data, we need a distinct read for each
# unique set of partition keys. Therefore, we start by
# aggregating all paths with matching keys using a dict
Expand Down Expand Up @@ -607,7 +625,14 @@ def _parquet_to_frame(
else:
# Not building categorical columns, so
# `value` is already what we want
dfs[-1][name] = as_column(value, length=len(dfs[-1]))
if partition_meta is not None:
dfs[-1][name] = as_column(
value,
length=len(dfs[-1]),
dtype=partition_meta[name].dtype,
)
else:
dfs[-1][name] = as_column(value, length=len(dfs[-1]))

# Concatenate dfs and return.
# Assume we can ignore the index if it has no name.
Expand Down Expand Up @@ -827,7 +852,10 @@ def _get_partitioned(
metadata_file_paths = []
for keys in part_names.itertuples(index=False):
subdir = fs.sep.join(
[f"{name}={val}" for name, val in zip(partition_cols, keys)]
[
_hive_dirname(name, val)
for name, val in zip(partition_cols, keys)
]
)
prefix = fs.sep.join([root_path, subdir])
fs.mkdirs(prefix, exist_ok=True)
Expand All @@ -848,16 +876,17 @@ def _get_groups_and_offsets(
):

if not (set(df._data) - set(partition_cols)):
raise ValueError("No data left to save outside partition columns")
warnings.warn("No data left to save outside partition columns")

part_names, part_offsets, _, grouped_df = df.groupby(
partition_cols
_, part_offsets, part_keys, grouped_df = df.groupby(
partition_cols,
dropna=False,
)._grouped()
if not preserve_index:
grouped_df.reset_index(drop=True, inplace=True)
grouped_df.drop(columns=partition_cols, inplace=True)
# Copy the entire keys df in one operation rather than using iloc
part_names = part_names.to_pandas().to_frame(index=False)
part_names = part_keys.to_pandas().unique().to_frame(index=False)

return part_names, grouped_df, part_offsets

Expand Down Expand Up @@ -1251,3 +1280,10 @@ def _default_open_file_options(
)
open_file_options["precache_options"] = precache_options
return open_file_options


def _hive_dirname(name, val):
# Simple utility to produce hive directory name
if pd.isna(val):
val = "__HIVE_DEFAULT_PARTITION__"
return f"{name}={val}"
17 changes: 11 additions & 6 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,16 @@ def _read_paths(
partitioning=None,
partition_keys=None,
open_file_options=None,
dataset_kwargs=None,
**kwargs,
):

# Simplify row_groups if all None
if row_groups == [None for path in paths]:
row_groups = None

dataset_kwargs = dataset_kwargs or {}
dataset_kwargs["partitioning"] = partitioning or "hive"
with ExitStack() as stack:

# Non-local filesystem handling
Expand All @@ -100,6 +103,8 @@ def _read_paths(
columns=columns,
row_groups=row_groups if row_groups else None,
strings_to_categorical=strings_to_categorical,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
except RuntimeError as err:
Expand Down Expand Up @@ -127,15 +132,10 @@ def _read_paths(
if partitions and partition_keys is None:

# Use `HivePartitioning` by default
partitioning = partitioning or {"obj": pa_ds.HivePartitioning}
ds = pa_ds.dataset(
paths,
filesystem=fs,
format="parquet",
partitioning=partitioning["obj"].discover(
*partitioning.get("args", []),
**partitioning.get("kwargs", {}),
),
**dataset_kwargs,
)
frag = next(ds.get_fragments())
if frag:
Expand Down Expand Up @@ -189,6 +189,9 @@ def read_partition(
if isinstance(index, list):
columns += index

dataset_kwargs = kwargs.get("dataset", {})
partitioning = partitioning or dataset_kwargs.get("partitioning", None)

# Check if we are actually selecting any columns
read_columns = columns
if schema and columns:
Expand Down Expand Up @@ -249,6 +252,7 @@ def read_partition(
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
dataset_kwargs=dataset_kwargs,
**read_kwargs,
)
)
Expand All @@ -274,6 +278,7 @@ def read_partition(
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
dataset_kwargs=dataset_kwargs,
**read_kwargs,
)
)
Expand Down
22 changes: 22 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,28 @@ def test_check_file_size(tmpdir):
dask_cudf.read_parquet(fn, check_file_size=1).compute()


def test_null_partition(tmpdir):
import pyarrow as pa
from pyarrow.dataset import HivePartitioning

df = pd.DataFrame({"id": [0, 1, None], "x": [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=1).to_backend("cudf")
ddf.to_parquet(str(tmpdir), partition_on="id")
fns = glob.glob(os.path.join(tmpdir, "id" + "=*/*.parquet"))
assert len(fns) == 3

partitioning = HivePartitioning(pa.schema([("id", pa.float64())]))
ddf_read = dask_cudf.read_parquet(
str(tmpdir),
dataset={"partitioning": partitioning},
)
dd.assert_eq(
ddf[["x", "id"]],
ddf_read[["x", "id"]],
check_divisions=False,
)


def test_nullable_schema_mismatch(tmpdir):
# See: https://github.com/rapidsai/cudf/issues/12702
path0 = str(tmpdir.join("test.0.parquet"))
Expand Down