Skip to content

Commit

Permalink
Single-partition Dask executor for cuDF-Polars (#17262)
Browse files Browse the repository at this point in the history
The goal here is to lay down the initial foundation for dask-based evaluation of `IR` graphs in cudf-polars. The first pass will only support single-partition workloads. This functionality could be achieved with much less-complicated changes to cudf-polars. However, we **do** want to build multi-partition support on top of this.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)
  - James Lamb (https://github.com/jameslamb)

URL: #17262
  • Loading branch information
rjzamora authored Nov 25, 2024
1 parent d93e9c2 commit f05e89d
Show file tree
Hide file tree
Showing 9 changed files with 388 additions and 14 deletions.
4 changes: 4 additions & 0 deletions ci/run_cudf_polars_pytests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ set -euo pipefail
# Support invoking run_cudf_polars_pytests.sh outside the script directory
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cudf_polars/

# Test the default "cudf" executor
python -m pytest --cache-clear "$@" tests

# Test the "dask-experimental" executor
python -m pytest --cache-clear "$@" tests --executor dask-experimental
18 changes: 15 additions & 3 deletions python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import os
import warnings
from functools import cache, partial
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal

import nvtx

Expand Down Expand Up @@ -181,6 +181,7 @@ def _callback(
*,
device: int | None,
memory_resource: int | None,
executor: Literal["pylibcudf", "dask-experimental"] | None,
) -> pl.DataFrame:
assert with_columns is None
assert pyarrow_predicate is None
Expand All @@ -191,7 +192,14 @@ def _callback(
set_device(device),
set_memory_resource(memory_resource),
):
return ir.evaluate(cache={}).to_polars()
if executor is None or executor == "pylibcudf":
return ir.evaluate(cache={}).to_polars()
elif executor == "dask-experimental":
from cudf_polars.experimental.parallel import evaluate_dask

return evaluate_dask(ir).to_polars()
else:
raise ValueError(f"Unknown executor '{executor}'")


def validate_config_options(config: dict) -> None:
Expand All @@ -208,7 +216,9 @@ def validate_config_options(config: dict) -> None:
ValueError
If the configuration contains unsupported options.
"""
if unsupported := (config.keys() - {"raise_on_fail", "parquet_options"}):
if unsupported := (
config.keys() - {"raise_on_fail", "parquet_options", "executor"}
):
raise ValueError(
f"Engine configuration contains unsupported settings: {unsupported}"
)
Expand Down Expand Up @@ -243,6 +253,7 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
device = config.device
memory_resource = config.memory_resource
raise_on_fail = config.config.get("raise_on_fail", False)
executor = config.config.get("executor", None)
validate_config_options(config.config)

with nvtx.annotate(message="ConvertIR", domain="cudf_polars"):
Expand Down Expand Up @@ -272,5 +283,6 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
ir,
device=device,
memory_resource=memory_resource,
executor=executor,
)
)
25 changes: 16 additions & 9 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -1599,13 +1599,15 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR):
# polars requires that all to-explode columns have the
# same sub-shapes
raise NotImplementedError("Explode with more than one column")
self.options = (tuple(to_explode),)
elif self.name == "rename":
old, new, _ = self.options
old, new, strict = self.options
# TODO: perhaps polars should validate renaming in the IR?
if len(new) != len(set(new)) or (
set(new) & (set(df.schema.keys()) - set(old))
):
raise NotImplementedError("Duplicate new names in rename.")
self.options = (tuple(old), tuple(new), strict)
elif self.name == "unpivot":
indices, pivotees, variable_name, value_name = self.options
value_name = "value" if value_name is None else value_name
Expand All @@ -1623,13 +1625,15 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR):
self.options = (
tuple(indices),
tuple(pivotees),
(variable_name, schema[variable_name]),
(value_name, schema[value_name]),
variable_name,
value_name,
)
self._non_child_args = (name, self.options)
self._non_child_args = (schema, name, self.options)

@classmethod
def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
def do_evaluate(
cls, schema: Schema, name: str, options: Any, df: DataFrame
) -> DataFrame:
"""Evaluate and return a dataframe."""
if name == "rechunk":
# No-op in our data model
Expand All @@ -1651,8 +1655,8 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
(
indices,
pivotees,
(variable_name, variable_dtype),
(value_name, value_dtype),
variable_name,
value_name,
) = options
npiv = len(pivotees)
index_columns = [
Expand All @@ -1669,15 +1673,18 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
plc.interop.from_arrow(
pa.array(
pivotees,
type=plc.interop.to_arrow(variable_dtype),
type=plc.interop.to_arrow(schema[variable_name]),
),
)
]
),
df.num_rows,
).columns()
value_column = plc.concatenate.concatenate(
[df.column_map[pivotee].astype(value_dtype).obj for pivotee in pivotees]
[
df.column_map[pivotee].astype(schema[value_name]).obj
for pivotee in pivotees
]
)
return DataFrame(
[
Expand Down
3 changes: 2 additions & 1 deletion python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,10 @@ def _(node: pl_expr.Sort, translator: Translator, dtype: plc.DataType) -> expr.E

@_translate_expr.register
def _(node: pl_expr.SortBy, translator: Translator, dtype: plc.DataType) -> expr.Expr:
options = node.sort_options
return expr.SortBy(
dtype,
node.sort_options,
(options[0], tuple(options[1]), tuple(options[2])),
translator.translate_expr(n=node.expr),
*(translator.translate_expr(n=n) for n in node.by),
)
Expand Down
236 changes: 236 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Partitioned LogicalPlan nodes."""

from __future__ import annotations

import operator
from functools import reduce, singledispatch
from typing import TYPE_CHECKING, Any

from cudf_polars.dsl.ir import IR
from cudf_polars.dsl.traversal import traversal

if TYPE_CHECKING:
from collections.abc import MutableMapping
from typing import TypeAlias

from cudf_polars.containers import DataFrame
from cudf_polars.dsl.nodebase import Node
from cudf_polars.typing import GenericTransformer


class PartitionInfo:
"""
Partitioning information.
This class only tracks the partition count (for now).
"""

__slots__ = ("count",)

def __init__(self, count: int):
self.count = count


LowerIRTransformer: TypeAlias = (
"GenericTransformer[IR, MutableMapping[IR, PartitionInfo]]"
)
"""Protocol for Lowering IR nodes."""


def get_key_name(node: Node) -> str:
"""Generate the key name for a Node."""
return f"{type(node).__name__.lower()}-{hash(node)}"


@singledispatch
def lower_ir_node(
ir: IR, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
"""
Rewrite an IR node and extract partitioning information.
Parameters
----------
ir
IR node to rewrite.
rec
Recursive LowerIRTransformer callable.
Returns
-------
new_ir, partition_info
The rewritten node, and a mapping from unique nodes in
the full IR graph to associated partitioning information.
Notes
-----
This function is used by `lower_ir_graph`.
See Also
--------
lower_ir_graph
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover


@lower_ir_node.register(IR)
def _(ir: IR, rec: LowerIRTransformer) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
if len(ir.children) == 0:
# Default leaf node has single partition
return ir, {ir: PartitionInfo(count=1)}

# Lower children
children, _partition_info = zip(*(rec(c) for c in ir.children), strict=False)
partition_info = reduce(operator.or_, _partition_info)

# Check that child partitioning is supported
count = max(partition_info[c].count for c in children)
if count > 1:
raise NotImplementedError(
f"Class {type(ir)} does not support multiple partitions."
) # pragma: no cover

# Return reconstructed node and partition-info dict
partition = PartitionInfo(count=1)
new_node = ir.reconstruct(children)
partition_info[new_node] = partition
return new_node, partition_info


def lower_ir_graph(ir: IR) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
"""
Rewrite an IR graph and extract partitioning information.
Parameters
----------
ir
Root of the graph to rewrite.
Returns
-------
new_ir, partition_info
The rewritten graph, and a mapping from unique nodes
in the new graph to associated partitioning information.
Notes
-----
This function traverses the unique nodes of the graph with
root `ir`, and applies :func:`lower_ir_node` to each node.
See Also
--------
lower_ir_node
"""
from cudf_polars.dsl.traversal import CachingVisitor

mapper = CachingVisitor(lower_ir_node)
return mapper(ir)


@singledispatch
def generate_ir_tasks(
ir: IR, partition_info: MutableMapping[IR, PartitionInfo]
) -> MutableMapping[Any, Any]:
"""
Generate a task graph for evaluation of an IR node.
Parameters
----------
ir
IR node to generate tasks for.
partition_info
Partitioning information, obtained from :func:`lower_ir_graph`.
Returns
-------
mapping
A (partial) dask task graph for the evaluation of an ir node.
Notes
-----
Task generation should only produce the tasks for the current node,
referring to child tasks by name.
See Also
--------
task_graph
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover


@generate_ir_tasks.register(IR)
def _(
ir: IR, partition_info: MutableMapping[IR, PartitionInfo]
) -> MutableMapping[Any, Any]:
# Single-partition default behavior.
# This is used by `generate_ir_tasks` for all unregistered IR sub-types.
if partition_info[ir].count > 1:
raise NotImplementedError(
f"Failed to generate multiple output tasks for {ir}."
) # pragma: no cover

child_names = []
for child in ir.children:
child_names.append(get_key_name(child))
if partition_info[child].count > 1:
raise NotImplementedError(
f"Failed to generate tasks for {ir} with child {child}."
) # pragma: no cover

key_name = get_key_name(ir)
return {
(key_name, 0): (
ir.do_evaluate,
*ir._non_child_args,
*((child_name, 0) for child_name in child_names),
)
}


def task_graph(
ir: IR, partition_info: MutableMapping[IR, PartitionInfo]
) -> tuple[MutableMapping[Any, Any], str | tuple[str, int]]:
"""
Construct a task graph for evaluation of an IR graph.
Parameters
----------
ir
Root of the graph to rewrite.
partition_info
A mapping from all unique IR nodes to the
associated partitioning information.
Returns
-------
graph
A Dask-compatible task graph for the entire
IR graph with root `ir`.
Notes
-----
This function traverses the unique nodes of the
graph with root `ir`, and extracts the tasks for
each node with :func:`generate_ir_tasks`.
See Also
--------
generate_ir_tasks
"""
graph = reduce(
operator.or_,
(generate_ir_tasks(node, partition_info) for node in traversal(ir)),
)
return graph, (get_key_name(ir), 0)


def evaluate_dask(ir: IR) -> DataFrame:
"""Evaluate an IR graph with Dask."""
from dask import get

ir, partition_info = lower_ir_graph(ir)

graph, key = task_graph(ir, partition_info)
return get(graph, key)
Loading

0 comments on commit f05e89d

Please sign in to comment.