Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-partition DataFrameScan support to cuDF-Polars #17441

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f651515
add multi-partition DataFrameScan IR node
rjzamora Nov 25, 2024
17fa65a
add multi-partition DataFrameScan IR node
rjzamora Nov 25, 2024
e7e2a37
avoid redirection
rjzamora Nov 25, 2024
b587ea3
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 26, 2024
7da3209
adjust coverage
rjzamora Nov 26, 2024
7acdee2
pull in _from_pydf change needed by 17364
rjzamora Nov 26, 2024
dcede57
Merge branch 'branch-25.02' into cudf-polars-multi-dataframe-scan
rjzamora Nov 26, 2024
69a76ee
avoid reconstruction (future concern)
rjzamora Nov 26, 2024
a7be622
Merge branch 'cudf-polars-multi-dataframe-scan' of github.com:rjzamor…
rjzamora Nov 26, 2024
c311590
apply code review suggestion
rjzamora Nov 26, 2024
f6bb5d1
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 26, 2024
c8af6fc
roll back unnecessary changes
rjzamora Nov 26, 2024
a765cbc
rename to executor_options
rjzamora Nov 26, 2024
b18121b
fix coverage
rjzamora Nov 26, 2024
325051b
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 27, 2024
1be7228
improve test coverage
rjzamora Nov 27, 2024
36f59f1
split logic into dedicated io.py file
rjzamora Nov 27, 2024
0b63126
remove __init__.py additions
rjzamora Nov 27, 2024
c6eb1b8
Apply suggestions from code review
rjzamora Nov 27, 2024
35e5493
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 27, 2024
5cfef05
Merge branch 'branch-25.02' into cudf-polars-multi-dataframe-scan
rjzamora Nov 27, 2024
c031b01
Merge branch 'cudf-polars-multi-dataframe-scan' of github.com:rjzamor…
rjzamora Nov 27, 2024
93f6a86
fix code suggestions
rjzamora Nov 27, 2024
646ddba
refactor to avoid circular imports
rjzamora Nov 27, 2024
e0929e4
fix test coverage
rjzamora Nov 27, 2024
46531aa
Merge branch 'branch-25.02' into cudf-polars-multi-dataframe-scan
rjzamora Dec 2, 2024
8fb2833
remove problematic default mapping
rjzamora Dec 2, 2024
925cb47
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Dec 2, 2024
503ad59
improve comment
rjzamora Dec 2, 2024
043d268
move back lower_ir_node default
rjzamora Dec 2, 2024
332ced3
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ def validate_config_options(config: dict) -> None:
If the configuration contains unsupported options.
"""
if unsupported := (
config.keys() - {"raise_on_fail", "parquet_options", "executor"}
config.keys()
- {"raise_on_fail", "parquet_options", "executor", "executor_options"}
):
raise ValueError(
f"Engine configuration contains unsupported settings: {unsupported}"
Expand All @@ -226,6 +227,15 @@ def validate_config_options(config: dict) -> None:
config.get("parquet_options", {})
)

# Validate executor_options
executor = config.get("executor", "pylibcudf")
if executor == "dask-experimental":
unsupported = config.get("executor_options", {}).keys() - {"num_rows_threshold"}
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
else:
unsupported = config.get("executor_options", {}).keys()
if unsupported:
raise ValueError(f"Unsupported executor_options for {executor}: {unsupported}")
rjzamora marked this conversation as resolved.
Show resolved Hide resolved


def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
"""
Expand Down
17 changes: 14 additions & 3 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,26 +688,30 @@ class DataFrameScan(IR):
This typically arises from ``q.collect().lazy()``
"""

__slots__ = ("df", "projection", "predicate")
_non_child = ("schema", "df", "projection", "predicate")
__slots__ = ("df", "projection", "predicate", "config_options")
_non_child = ("schema", "df", "projection", "predicate", "config_options")
df: Any
"""Polars LazyFrame object."""
projection: tuple[str, ...] | None
"""List of columns to project out."""
predicate: expr.NamedExpr | None
"""Mask to apply."""
config_options: dict[str, Any]
"""GPU-specific configuration options"""

def __init__(
self,
schema: Schema,
df: Any,
projection: Sequence[str] | None,
predicate: expr.NamedExpr | None,
config_options: dict[str, Any],
):
self.schema = schema
self.df = df
self.projection = tuple(projection) if projection is not None else None
self.predicate = predicate
self.config_options = config_options
self._non_child_args = (schema, df, self.projection, predicate)
self.children = ()

Expand All @@ -719,7 +723,14 @@ def get_hashable(self) -> Hashable:
not stable across runs, or repeat instances of the same equal dataframes.
"""
schema_hash = tuple(self.schema.items())
return (type(self), schema_hash, id(self.df), self.projection, self.predicate)
return (
type(self),
schema_hash,
id(self.df),
self.projection,
self.predicate,
json.dumps(self.config_options),
)

@classmethod
def do_evaluate(
Expand Down
1 change: 1 addition & 0 deletions python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def _(
translate_named_expr(translator, n=node.selection)
if node.selection is not None
else None,
translator.config.config.copy(),
)


Expand Down
43 changes: 43 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Multi-partition base classes."""

from __future__ import annotations

from typing import TYPE_CHECKING

from cudf_polars.dsl.ir import Union

if TYPE_CHECKING:
from collections.abc import Iterator, Sequence

from cudf_polars.containers import DataFrame
from cudf_polars.dsl.nodebase import Node


class PartitionInfo:
"""
Partitioning information.

This class only tracks the partition count (for now).
"""

__slots__ = ("count",)

def __init__(self, count: int):
self.count = count

def keys(self, node: Node) -> Iterator[tuple[str, int]]:
"""Return the partitioned keys for a given node."""
name = get_key_name(node)
yield from ((name, i) for i in range(self.count))


def get_key_name(node: Node) -> str:
"""Generate the key name for a Node."""
return f"{type(node).__name__.lower()}-{hash(node)}"


def _concat(dfs: Sequence[DataFrame]) -> DataFrame:
# Concatenate a sequence of DataFrames vertically
return Union.do_evaluate(None, *dfs)
84 changes: 84 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Multi-partition dispatch functions."""

from __future__ import annotations

from functools import singledispatch
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from collections.abc import MutableMapping
from typing import TypeAlias

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.typing import GenericTransformer


LowerIRTransformer: TypeAlias = (
"GenericTransformer[IR, tuple[IR, MutableMapping[IR, PartitionInfo]]]"
)
"""Protocol for Lowering IR nodes."""


@singledispatch
def lower_ir_node(
ir: IR, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
"""
Rewrite an IR node and extract partitioning information.

Parameters
----------
ir
IR node to rewrite.
rec
Recursive LowerIRTransformer callable.

Returns
-------
new_ir, partition_info
The rewritten node, and a mapping from unique nodes in
the full IR graph to associated partitioning information.

Notes
-----
This function is used by `lower_ir_graph`.

See Also
--------
lower_ir_graph
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover


@singledispatch
def generate_ir_tasks(
ir: IR, partition_info: MutableMapping[IR, PartitionInfo]
) -> MutableMapping[Any, Any]:
"""
Generate a task graph for evaluation of an IR node.

Parameters
----------
ir
IR node to generate tasks for.
partition_info
Partitioning information, obtained from :func:`lower_ir_graph`.

Returns
-------
mapping
A (partial) dask task graph for the evaluation of an ir node.

Notes
-----
Task generation should only produce the tasks for the current node,
referring to child tasks by name.

See Also
--------
task_graph
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover
54 changes: 54 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Multi-partition IO Logic."""

from __future__ import annotations

import math
from typing import TYPE_CHECKING

from cudf_polars.dsl.ir import DataFrameScan, Union
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.dispatch import lower_ir_node

if TYPE_CHECKING:
from collections.abc import MutableMapping

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.dispatch import LowerIRTransformer


##
## DataFrameScan
##


@lower_ir_node.register(DataFrameScan)
def _(
ir: DataFrameScan, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
rows_per_partition = ir.config_options.get("executor_options", {}).get(
"num_rows_threshold", 1_000_000
)

nrows = max(ir.df.shape()[0], 1)
count = math.ceil(nrows / rows_per_partition)

if count > 1:
length = math.ceil(nrows / count)
slices = [
DataFrameScan(
ir.schema,
ir.df.slice(offset, length),
ir.projection,
ir.predicate,
ir.config_options,
)
for offset in range(0, nrows, length)
]
new_node = Union(ir.schema, None, *slices)
return new_node, {slice: PartitionInfo(count=1) for slice in slices} | {
new_node: PartitionInfo(count=count)
}

return rec.state["default_mapper"](ir)
Loading
Loading