From 4da6b19f86b87cf8453fa4a2b54caea276d49706 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Fri, 10 Mar 2023 11:31:49 -0600 Subject: [PATCH] Fix null hive-partition behavior in dask-cudf parquet (#12866) This PR includes a few simple changes to fix the handling of null hive partitions in `dask_cudf`. ~Depends on https://github.com/dask/dask/pull/10007~ Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Bradley Dice (https://github.com/bdice) - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/12866 --- python/cudf/cudf/io/parquet.py | 52 ++++++++++++++++--- python/dask_cudf/dask_cudf/io/parquet.py | 17 +++--- .../dask_cudf/io/tests/test_parquet.py | 22 ++++++++ 3 files changed, 77 insertions(+), 14 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 1329c06b18d..ca4fb103ee8 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -9,6 +9,7 @@ from typing import Dict, List, Optional, Tuple from uuid import uuid4 +import pandas as pd from pyarrow import dataset as ds, parquet as pq import cudf @@ -269,6 +270,7 @@ def _process_dataset( filters=None, row_groups=None, categorical_partitions=True, + dataset_kwargs=None, ): # Returns: # file_list - Expanded/filtered list of paths @@ -296,8 +298,13 @@ def _process_dataset( dataset = ds.dataset( source=paths[0] if len(paths) == 1 else paths, filesystem=fs, - format="parquet", - partitioning="hive", + **( + dataset_kwargs + or { + "format": "parquet", + "partitioning": "hive", + } + ), ) file_list = dataset.files @@ -423,6 +430,7 @@ def read_parquet( categorical_partitions=True, open_file_options=None, bytes_per_thread=None, + dataset_kwargs=None, *args, **kwargs, ): @@ -483,6 +491,7 @@ def read_parquet( filters=filters, row_groups=row_groups, categorical_partitions=categorical_partitions, + dataset_kwargs=dataset_kwargs, ) elif filters is not None: raise ValueError("cudf cannot apply filters to open file objects.") @@ -540,6 +549,7 @@ def read_parquet( use_pandas_metadata=use_pandas_metadata, partition_keys=partition_keys, partition_categories=partition_categories, + dataset_kwargs=dataset_kwargs, **kwargs, ) @@ -551,6 +561,7 @@ def _parquet_to_frame( row_groups=None, partition_keys=None, partition_categories=None, + dataset_kwargs=None, **kwargs, ): @@ -564,6 +575,13 @@ def _parquet_to_frame( **kwargs, ) + partition_meta = None + partitioning = (dataset_kwargs or {}).get("partitioning", None) + if hasattr(partitioning, "schema"): + partition_meta = cudf.DataFrame.from_arrow( + partitioning.schema.empty_table() + ) + # For partitioned data, we need a distinct read for each # unique set of partition keys. Therefore, we start by # aggregating all paths with matching keys using a dict @@ -607,7 +625,14 @@ def _parquet_to_frame( else: # Not building categorical columns, so # `value` is already what we want - dfs[-1][name] = as_column(value, length=len(dfs[-1])) + if partition_meta is not None: + dfs[-1][name] = as_column( + value, + length=len(dfs[-1]), + dtype=partition_meta[name].dtype, + ) + else: + dfs[-1][name] = as_column(value, length=len(dfs[-1])) # Concatenate dfs and return. # Assume we can ignore the index if it has no name. @@ -827,7 +852,10 @@ def _get_partitioned( metadata_file_paths = [] for keys in part_names.itertuples(index=False): subdir = fs.sep.join( - [f"{name}={val}" for name, val in zip(partition_cols, keys)] + [ + _hive_dirname(name, val) + for name, val in zip(partition_cols, keys) + ] ) prefix = fs.sep.join([root_path, subdir]) fs.mkdirs(prefix, exist_ok=True) @@ -848,16 +876,17 @@ def _get_groups_and_offsets( ): if not (set(df._data) - set(partition_cols)): - raise ValueError("No data left to save outside partition columns") + warnings.warn("No data left to save outside partition columns") - part_names, part_offsets, _, grouped_df = df.groupby( - partition_cols + _, part_offsets, part_keys, grouped_df = df.groupby( + partition_cols, + dropna=False, )._grouped() if not preserve_index: grouped_df.reset_index(drop=True, inplace=True) grouped_df.drop(columns=partition_cols, inplace=True) # Copy the entire keys df in one operation rather than using iloc - part_names = part_names.to_pandas().to_frame(index=False) + part_names = part_keys.to_pandas().unique().to_frame(index=False) return part_names, grouped_df, part_offsets @@ -1251,3 +1280,10 @@ def _default_open_file_options( ) open_file_options["precache_options"] = precache_options return open_file_options + + +def _hive_dirname(name, val): + # Simple utility to produce hive directory name + if pd.isna(val): + val = "__HIVE_DEFAULT_PARTITION__" + return f"{name}={val}" diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 962662061b5..452f2f8914a 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -71,6 +71,7 @@ def _read_paths( partitioning=None, partition_keys=None, open_file_options=None, + dataset_kwargs=None, **kwargs, ): @@ -78,6 +79,8 @@ def _read_paths( if row_groups == [None for path in paths]: row_groups = None + dataset_kwargs = dataset_kwargs or {} + dataset_kwargs["partitioning"] = partitioning or "hive" with ExitStack() as stack: # Non-local filesystem handling @@ -100,6 +103,8 @@ def _read_paths( columns=columns, row_groups=row_groups if row_groups else None, strings_to_categorical=strings_to_categorical, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, **kwargs, ) except RuntimeError as err: @@ -127,15 +132,10 @@ def _read_paths( if partitions and partition_keys is None: # Use `HivePartitioning` by default - partitioning = partitioning or {"obj": pa_ds.HivePartitioning} ds = pa_ds.dataset( paths, filesystem=fs, - format="parquet", - partitioning=partitioning["obj"].discover( - *partitioning.get("args", []), - **partitioning.get("kwargs", {}), - ), + **dataset_kwargs, ) frag = next(ds.get_fragments()) if frag: @@ -189,6 +189,9 @@ def read_partition( if isinstance(index, list): columns += index + dataset_kwargs = kwargs.get("dataset", {}) + partitioning = partitioning or dataset_kwargs.get("partitioning", None) + # Check if we are actually selecting any columns read_columns = columns if schema and columns: @@ -249,6 +252,7 @@ def read_partition( partitions=partitions, partitioning=partitioning, partition_keys=last_partition_keys, + dataset_kwargs=dataset_kwargs, **read_kwargs, ) ) @@ -274,6 +278,7 @@ def read_partition( partitions=partitions, partitioning=partitioning, partition_keys=last_partition_keys, + dataset_kwargs=dataset_kwargs, **read_kwargs, ) ) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 355a1a5d73a..8fb6e591660 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -504,6 +504,28 @@ def test_check_file_size(tmpdir): dask_cudf.read_parquet(fn, check_file_size=1).compute() +def test_null_partition(tmpdir): + import pyarrow as pa + from pyarrow.dataset import HivePartitioning + + df = pd.DataFrame({"id": [0, 1, None], "x": [1, 2, 3]}) + ddf = dd.from_pandas(df, npartitions=1).to_backend("cudf") + ddf.to_parquet(str(tmpdir), partition_on="id") + fns = glob.glob(os.path.join(tmpdir, "id" + "=*/*.parquet")) + assert len(fns) == 3 + + partitioning = HivePartitioning(pa.schema([("id", pa.float64())])) + ddf_read = dask_cudf.read_parquet( + str(tmpdir), + dataset={"partitioning": partitioning}, + ) + dd.assert_eq( + ddf[["x", "id"]], + ddf_read[["x", "id"]], + check_divisions=False, + ) + + def test_nullable_schema_mismatch(tmpdir): # See: https://github.com/rapidsai/cudf/issues/12702 path0 = str(tmpdir.join("test.0.parquet"))