Skip to content

Commit

Permalink
Add multi-partition DataFrameScan support to cuDF-Polars (#17441)
Browse files Browse the repository at this point in the history
Follow-up to #17262

Adds support for parallel `DataFrameScan` operations.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #17441
  • Loading branch information
rjzamora authored Dec 3, 2024
1 parent 12c77f3 commit 3785a48
Show file tree
Hide file tree
Showing 10 changed files with 411 additions and 123 deletions.
14 changes: 13 additions & 1 deletion python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ def validate_config_options(config: dict) -> None:
If the configuration contains unsupported options.
"""
if unsupported := (
config.keys() - {"raise_on_fail", "parquet_options", "executor"}
config.keys()
- {"raise_on_fail", "parquet_options", "executor", "executor_options"}
):
raise ValueError(
f"Engine configuration contains unsupported settings: {unsupported}"
Expand All @@ -226,6 +227,17 @@ def validate_config_options(config: dict) -> None:
config.get("parquet_options", {})
)

# Validate executor_options
executor = config.get("executor", "pylibcudf")
if executor == "dask-experimental":
unsupported = config.get("executor_options", {}).keys() - {
"max_rows_per_partition"
}
else:
unsupported = config.get("executor_options", {}).keys()
if unsupported:
raise ValueError(f"Unsupported executor_options for {executor}: {unsupported}")


def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
"""
Expand Down
17 changes: 14 additions & 3 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,26 +688,30 @@ class DataFrameScan(IR):
This typically arises from ``q.collect().lazy()``
"""

__slots__ = ("df", "predicate", "projection")
_non_child = ("schema", "df", "projection", "predicate")
__slots__ = ("config_options", "df", "predicate", "projection")
_non_child = ("schema", "df", "projection", "predicate", "config_options")
df: Any
"""Polars LazyFrame object."""
projection: tuple[str, ...] | None
"""List of columns to project out."""
predicate: expr.NamedExpr | None
"""Mask to apply."""
config_options: dict[str, Any]
"""GPU-specific configuration options"""

def __init__(
self,
schema: Schema,
df: Any,
projection: Sequence[str] | None,
predicate: expr.NamedExpr | None,
config_options: dict[str, Any],
):
self.schema = schema
self.df = df
self.projection = tuple(projection) if projection is not None else None
self.predicate = predicate
self.config_options = config_options
self._non_child_args = (schema, df, self.projection, predicate)
self.children = ()

Expand All @@ -719,7 +723,14 @@ def get_hashable(self) -> Hashable:
not stable across runs, or repeat instances of the same equal dataframes.
"""
schema_hash = tuple(self.schema.items())
return (type(self), schema_hash, id(self.df), self.projection, self.predicate)
return (
type(self),
schema_hash,
id(self.df),
self.projection,
self.predicate,
json.dumps(self.config_options),
)

@classmethod
def do_evaluate(
Expand Down
1 change: 1 addition & 0 deletions python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def _(
translate_named_expr(translator, n=node.selection)
if node.selection is not None
else None,
translator.config.config.copy(),
)


Expand Down
43 changes: 43 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Multi-partition base classes."""

from __future__ import annotations

from typing import TYPE_CHECKING

from cudf_polars.dsl.ir import Union

if TYPE_CHECKING:
from collections.abc import Iterator, Sequence

from cudf_polars.containers import DataFrame
from cudf_polars.dsl.nodebase import Node


class PartitionInfo:
"""
Partitioning information.
This class only tracks the partition count (for now).
"""

__slots__ = ("count",)

def __init__(self, count: int):
self.count = count

def keys(self, node: Node) -> Iterator[tuple[str, int]]:
"""Return the partitioned keys for a given node."""
name = get_key_name(node)
yield from ((name, i) for i in range(self.count))


def get_key_name(node: Node) -> str:
"""Generate the key name for a Node."""
return f"{type(node).__name__.lower()}-{hash(node)}"


def _concat(dfs: Sequence[DataFrame]) -> DataFrame:
# Concatenate a sequence of DataFrames vertically
return Union.do_evaluate(None, *dfs)
84 changes: 84 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Multi-partition dispatch functions."""

from __future__ import annotations

from functools import singledispatch
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from collections.abc import MutableMapping
from typing import TypeAlias

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.typing import GenericTransformer


LowerIRTransformer: TypeAlias = (
"GenericTransformer[IR, tuple[IR, MutableMapping[IR, PartitionInfo]]]"
)
"""Protocol for Lowering IR nodes."""


@singledispatch
def lower_ir_node(
ir: IR, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
"""
Rewrite an IR node and extract partitioning information.
Parameters
----------
ir
IR node to rewrite.
rec
Recursive LowerIRTransformer callable.
Returns
-------
new_ir, partition_info
The rewritten node, and a mapping from unique nodes in
the full IR graph to associated partitioning information.
Notes
-----
This function is used by `lower_ir_graph`.
See Also
--------
lower_ir_graph
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover


@singledispatch
def generate_ir_tasks(
ir: IR, partition_info: MutableMapping[IR, PartitionInfo]
) -> MutableMapping[Any, Any]:
"""
Generate a task graph for evaluation of an IR node.
Parameters
----------
ir
IR node to generate tasks for.
partition_info
Partitioning information, obtained from :func:`lower_ir_graph`.
Returns
-------
mapping
A (partial) dask task graph for the evaluation of an ir node.
Notes
-----
Task generation should only produce the tasks for the current node,
referring to child tasks by name.
See Also
--------
task_graph
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover
49 changes: 49 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Multi-partition IO Logic."""

from __future__ import annotations

import math
from typing import TYPE_CHECKING

from cudf_polars.dsl.ir import DataFrameScan, Union
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.dispatch import lower_ir_node

if TYPE_CHECKING:
from collections.abc import MutableMapping

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.dispatch import LowerIRTransformer


@lower_ir_node.register(DataFrameScan)
def _(
ir: DataFrameScan, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
rows_per_partition = ir.config_options.get("executor_options", {}).get(
"max_rows_per_partition", 1_000_000
)

nrows = max(ir.df.shape()[0], 1)
count = math.ceil(nrows / rows_per_partition)

if count > 1:
length = math.ceil(nrows / count)
slices = [
DataFrameScan(
ir.schema,
ir.df.slice(offset, length),
ir.projection,
ir.predicate,
ir.config_options,
)
for offset in range(0, nrows, length)
]
new_node = Union(ir.schema, None, *slices)
return new_node, {slice: PartitionInfo(count=1) for slice in slices} | {
new_node: PartitionInfo(count=count)
}

return ir, {ir: PartitionInfo(count=1)}
Loading

0 comments on commit 3785a48

Please sign in to comment.