Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single-partition Dask executor for cuDF-Polars #17262

Merged
merged 67 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
a590076
cleanup
rjzamora Nov 6, 2024
7f1bec7
rename to parallel
rjzamora Nov 7, 2024
023e085
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 7, 2024
e7a2fce
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 7, 2024
69a3374
simplify solution
rjzamora Nov 7, 2024
6aa3694
Merge branch 'cudf-polars-dask-simple' of github.com:rjzamora/cudf in…
rjzamora Nov 7, 2024
ea22a9a
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 7, 2024
915a779
deeper dive
rjzamora Nov 8, 2024
bd9d783
improve simple agg reduction
rjzamora Nov 8, 2024
7363d91
cleanup fundamental bugs
rjzamora Nov 10, 2024
58ee5f4
move PartitionInfo
rjzamora Nov 10, 2024
ecc51ef
add Literal
rjzamora Nov 10, 2024
75eae0c
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 12, 2024
fb2d6bf
add lower_ir_graph
rjzamora Nov 12, 2024
c17564c
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
rjzamora Nov 12, 2024
6e66998
strip out most exploratory logic
rjzamora Nov 12, 2024
c41723d
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 12, 2024
d774f38
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 13, 2024
6886f8d
Add basic Dask evaluate test
pentschev Nov 13, 2024
29b2d7b
Replace environment variable with new `"executor"` config
pentschev Nov 13, 2024
3a68a6d
Add kwarg to specify executor in `assert_gpu_result_equal`
pentschev Nov 13, 2024
8079ac0
Add couple of Dask executor tests
pentschev Nov 13, 2024
6f7ccee
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev Nov 13, 2024
af4c5f5
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev Nov 13, 2024
8aed94f
Improve `count` code
pentschev Nov 13, 2024
aadaf10
Pass `executor` to `GPUEngine` in `assert_gpu_result_equal`
pentschev Nov 13, 2024
c3a6907
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev Nov 14, 2024
4f67819
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 14, 2024
c8ca09e
Clarify intent renaming executor to "dask-experimental"
pentschev Nov 14, 2024
3fd51bb
move PartitionInfo out of ir module
rjzamora Nov 14, 2024
bf182e4
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev Nov 14, 2024
453e274
skip coverage on sanity-check errors
rjzamora Nov 14, 2024
2b74f28
Add `--executor` to pytest
pentschev Nov 14, 2024
6d3cd55
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev Nov 14, 2024
2398a2e
Enable dask-experimental tests in CI, remove duplicates
pentschev Nov 14, 2024
9aa479a
Fix wrong protocol name in deserialization test
pentschev Nov 14, 2024
64ea98e
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev Nov 14, 2024
22678a5
Remove `executor` kwarg from `assert_gpu_result_equal`
pentschev Nov 14, 2024
41441ca
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
rjzamora Nov 15, 2024
efadb78
Reintroduce `executor` kwarg in `assert_gpu_result_equal`
pentschev Nov 15, 2024
9b78d8f
Add basic tests for all executors to ensure 100% coverage
pentschev Nov 15, 2024
c54c217
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev Nov 15, 2024
70da7a9
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev Nov 15, 2024
3aeb1e4
Fix `executor` in `assert_gpu_result_equal`
pentschev Nov 18, 2024
485a161
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev Nov 18, 2024
eb41100
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
rjzamora Nov 19, 2024
e91fdb9
Merge branch 'branch-25.02' into cudf-polars-dask-simple
pentschev Nov 20, 2024
cac9e4f
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 20, 2024
addae40
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 21, 2024
dbf37d3
address code review - round 1
rjzamora Nov 21, 2024
4d21f7c
move sort tupling
rjzamora Nov 21, 2024
e241af3
remove need for stringify
rjzamora Nov 21, 2024
1064fcb
address code review - round 2
rjzamora Nov 21, 2024
bbddfb6
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 21, 2024
aeecd4d
remove global caching
rjzamora Nov 21, 2024
09c5217
use general StateInfo
rjzamora Nov 21, 2024
62f10bc
revert (for now)
rjzamora Nov 21, 2024
77113d6
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 22, 2024
9967cfb
skip coverage on singledispatch miss
rjzamora Nov 22, 2024
075d41e
typo
rjzamora Nov 22, 2024
21e598a
Rename `"cudf"` executor to `"pylibcudf"`
pentschev Nov 25, 2024
7febe21
Update `test_evaluate_dask()` with `collect()`
pentschev Nov 25, 2024
ac4d2da
Remove tests docstrings
pentschev Nov 25, 2024
2ced4a0
Improve `executor` typing
pentschev Nov 25, 2024
98b0b36
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
pentschev Nov 25, 2024
af5aff8
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 25, 2024
f6f7eda
address code review
rjzamora Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ci/run_cudf_polars_pytests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ set -euo pipefail
# Support invoking run_cudf_polars_pytests.sh outside the script directory
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cudf_polars/

# Test the default "cudf" executor
python -m pytest --cache-clear "$@" tests

# Test the "dask-experimental" executor
python -m pytest --cache-clear "$@" tests --executor dask-experimental
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
18 changes: 15 additions & 3 deletions python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import os
import warnings
from functools import cache, partial
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal

import nvtx

Expand Down Expand Up @@ -181,6 +181,7 @@ def _callback(
*,
device: int | None,
memory_resource: int | None,
executor: Literal["pylibcudf", "dask-experimental"] | None,
) -> pl.DataFrame:
assert with_columns is None
assert pyarrow_predicate is None
Expand All @@ -191,7 +192,14 @@ def _callback(
set_device(device),
set_memory_resource(memory_resource),
):
return ir.evaluate(cache={}).to_polars()
if executor is None or executor == "pylibcudf":
return ir.evaluate(cache={}).to_polars()
elif executor == "dask-experimental":
from cudf_polars.experimental.parallel import evaluate_dask

return evaluate_dask(ir).to_polars()
else:
raise ValueError(f"Unknown executor '{executor}'")


def validate_config_options(config: dict) -> None:
Expand All @@ -208,7 +216,9 @@ def validate_config_options(config: dict) -> None:
ValueError
If the configuration contains unsupported options.
"""
if unsupported := (config.keys() - {"raise_on_fail", "parquet_options"}):
if unsupported := (
config.keys() - {"raise_on_fail", "parquet_options", "executor"}
):
raise ValueError(
f"Engine configuration contains unsupported settings: {unsupported}"
)
Expand Down Expand Up @@ -243,6 +253,7 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
device = config.device
memory_resource = config.memory_resource
raise_on_fail = config.config.get("raise_on_fail", False)
executor = config.config.get("executor", None)
validate_config_options(config.config)

with nvtx.annotate(message="ConvertIR", domain="cudf_polars"):
Expand Down Expand Up @@ -272,5 +283,6 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
ir,
device=device,
memory_resource=memory_resource,
executor=executor,
)
)
25 changes: 16 additions & 9 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -1599,13 +1599,15 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR):
# polars requires that all to-explode columns have the
# same sub-shapes
raise NotImplementedError("Explode with more than one column")
self.options = (tuple(to_explode),)
elif self.name == "rename":
old, new, _ = self.options
old, new, strict = self.options
# TODO: perhaps polars should validate renaming in the IR?
if len(new) != len(set(new)) or (
set(new) & (set(df.schema.keys()) - set(old))
):
raise NotImplementedError("Duplicate new names in rename.")
self.options = (tuple(old), tuple(new), strict)
elif self.name == "unpivot":
indices, pivotees, variable_name, value_name = self.options
value_name = "value" if value_name is None else value_name
Expand All @@ -1623,13 +1625,15 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR):
self.options = (
tuple(indices),
tuple(pivotees),
(variable_name, schema[variable_name]),
(value_name, schema[value_name]),
variable_name,
value_name,
)
self._non_child_args = (name, self.options)
self._non_child_args = (schema, name, self.options)

@classmethod
def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
def do_evaluate(
cls, schema: Schema, name: str, options: Any, df: DataFrame
) -> DataFrame:
"""Evaluate and return a dataframe."""
if name == "rechunk":
# No-op in our data model
Expand All @@ -1651,8 +1655,8 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
(
indices,
pivotees,
(variable_name, variable_dtype),
(value_name, value_dtype),
variable_name,
value_name,
) = options
npiv = len(pivotees)
index_columns = [
Expand All @@ -1669,15 +1673,18 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
plc.interop.from_arrow(
pa.array(
pivotees,
type=plc.interop.to_arrow(variable_dtype),
type=plc.interop.to_arrow(schema[variable_name]),
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
),
)
]
),
df.num_rows,
).columns()
value_column = plc.concatenate.concatenate(
[df.column_map[pivotee].astype(value_dtype).obj for pivotee in pivotees]
[
df.column_map[pivotee].astype(schema[value_name]).obj
for pivotee in pivotees
]
)
return DataFrame(
[
Expand Down
3 changes: 2 additions & 1 deletion python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,10 @@ def _(node: pl_expr.Sort, translator: Translator, dtype: plc.DataType) -> expr.E

@_translate_expr.register
def _(node: pl_expr.SortBy, translator: Translator, dtype: plc.DataType) -> expr.Expr:
options = node.sort_options
return expr.SortBy(
dtype,
node.sort_options,
(options[0], tuple(options[1]), tuple(options[2])),
translator.translate_expr(n=node.expr),
*(translator.translate_expr(n=n) for n in node.by),
)
Expand Down
236 changes: 236 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Partitioned LogicalPlan nodes."""

from __future__ import annotations

import operator
from functools import reduce, singledispatch
from typing import TYPE_CHECKING, Any

from cudf_polars.dsl.ir import IR
from cudf_polars.dsl.traversal import traversal

if TYPE_CHECKING:
from collections.abc import MutableMapping
from typing import TypeAlias

from cudf_polars.containers import DataFrame
from cudf_polars.dsl.nodebase import Node
from cudf_polars.typing import GenericTransformer


class PartitionInfo:
"""
Partitioning information.

This class only tracks the partition count (for now).
"""

__slots__ = ("count",)

def __init__(self, count: int):
self.count = count


LowerIRTransformer: TypeAlias = (
"GenericTransformer[IR, MutableMapping[IR, PartitionInfo]]"
)
"""Protocol for Lowering IR nodes."""


def get_key_name(node: Node) -> str:
"""Generate the key name for a Node."""
return f"{type(node).__name__.lower()}-{hash(node)}"


@singledispatch
def lower_ir_node(
ir: IR, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
"""
Rewrite an IR node and extract partitioning information.

Parameters
----------
ir
IR node to rewrite.
rec
Recursive LowerIRTransformer callable.

Returns
-------
new_ir, partition_info
The rewritten node, and a mapping from unique nodes in
the full IR graph to associated partitioning information.

Notes
-----
This function is used by `lower_ir_graph`.

See Also
--------
lower_ir_graph
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover


@lower_ir_node.register(IR)
def _(ir: IR, rec: LowerIRTransformer) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
if len(ir.children) == 0:
# Default leaf node has single partition
return ir, {ir: PartitionInfo(count=1)}

# Lower children
children, _partition_info = zip(*(rec(c) for c in ir.children), strict=False)
partition_info = reduce(operator.or_, _partition_info)

# Check that child partitioning is supported
count = max(partition_info[c].count for c in children)
if count > 1:
raise NotImplementedError(
f"Class {type(ir)} does not support multiple partitions."
) # pragma: no cover

# Return reconstructed node and partition-info dict
partition = PartitionInfo(count=1)
new_node = ir.reconstruct(children)
partition_info[new_node] = partition
return new_node, partition_info
Comment on lines +95 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: No need to address here, but just to note for a followup. This does "unnecessary" reconstruction if the children are unchanged. We could consider making reconstruct return self if the children match.



def lower_ir_graph(ir: IR) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
"""
Rewrite an IR graph and extract partitioning information.

Parameters
----------
ir
Root of the graph to rewrite.

Returns
-------
new_ir, partition_info
The rewritten graph, and a mapping from unique nodes
in the new graph to associated partitioning information.

Notes
-----
This function traverses the unique nodes of the graph with
root `ir`, and applies :func:`lower_ir_node` to each node.

See Also
--------
lower_ir_node
"""
from cudf_polars.dsl.traversal import CachingVisitor

mapper = CachingVisitor(lower_ir_node)
return mapper(ir)


@singledispatch
def generate_ir_tasks(
ir: IR, partition_info: MutableMapping[IR, PartitionInfo]
) -> MutableMapping[Any, Any]:
"""
Generate a task graph for evaluation of an IR node.

Parameters
----------
ir
IR node to generate tasks for.
partition_info
Partitioning information, obtained from :func:`lower_ir_graph`.

Returns
-------
mapping
A (partial) dask task graph for the evaluation of an ir node.

Notes
-----
Task generation should only produce the tasks for the current node,
referring to child tasks by name.

See Also
--------
task_graph
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover


@generate_ir_tasks.register(IR)
def _(
ir: IR, partition_info: MutableMapping[IR, PartitionInfo]
) -> MutableMapping[Any, Any]:
# Single-partition default behavior.
# This is used by `generate_ir_tasks` for all unregistered IR sub-types.
if partition_info[ir].count > 1:
raise NotImplementedError(
f"Failed to generate multiple output tasks for {ir}."
) # pragma: no cover

child_names = []
for child in ir.children:
child_names.append(get_key_name(child))
if partition_info[child].count > 1:
raise NotImplementedError(
f"Failed to generate tasks for {ir} with child {child}."
) # pragma: no cover

key_name = get_key_name(ir)
return {
(key_name, 0): (
ir.do_evaluate,
*ir._non_child_args,
*((child_name, 0) for child_name in child_names),
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
)
}


def task_graph(
ir: IR, partition_info: MutableMapping[IR, PartitionInfo]
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
) -> tuple[MutableMapping[Any, Any], str | tuple[str, int]]:
"""
Construct a task graph for evaluation of an IR graph.

Parameters
----------
ir
Root of the graph to rewrite.
partition_info
A mapping from all unique IR nodes to the
associated partitioning information.

Returns
-------
graph
A Dask-compatible task graph for the entire
IR graph with root `ir`.

Notes
-----
This function traverses the unique nodes of the
graph with root `ir`, and extracts the tasks for
each node with :func:`generate_ir_tasks`.

See Also
--------
generate_ir_tasks
"""
graph = reduce(
operator.or_,
(generate_ir_tasks(node, partition_info) for node in traversal(ir)),
)
return graph, (get_key_name(ir), 0)


def evaluate_dask(ir: IR) -> DataFrame:
"""Evaluate an IR graph with Dask."""
from dask import get
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

ir, partition_info = lower_ir_graph(ir)

graph, key = task_graph(ir, partition_info)
return get(graph, key)
Loading
Loading