Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-partition DataFrameScan support to cuDF-Polars #17441

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f651515
add multi-partition DataFrameScan IR node
rjzamora Nov 25, 2024
17fa65a
add multi-partition DataFrameScan IR node
rjzamora Nov 25, 2024
e7e2a37
avoid redirection
rjzamora Nov 25, 2024
b587ea3
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 26, 2024
7da3209
adjust coverage
rjzamora Nov 26, 2024
7acdee2
pull in _from_pydf change needed by 17364
rjzamora Nov 26, 2024
dcede57
Merge branch 'branch-25.02' into cudf-polars-multi-dataframe-scan
rjzamora Nov 26, 2024
69a76ee
avoid reconstruction (future concern)
rjzamora Nov 26, 2024
a7be622
Merge branch 'cudf-polars-multi-dataframe-scan' of github.com:rjzamor…
rjzamora Nov 26, 2024
c311590
apply code review suggestion
rjzamora Nov 26, 2024
f6bb5d1
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 26, 2024
c8af6fc
roll back unnecessary changes
rjzamora Nov 26, 2024
a765cbc
rename to executor_options
rjzamora Nov 26, 2024
b18121b
fix coverage
rjzamora Nov 26, 2024
325051b
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 27, 2024
1be7228
improve test coverage
rjzamora Nov 27, 2024
36f59f1
split logic into dedicated io.py file
rjzamora Nov 27, 2024
0b63126
remove __init__.py additions
rjzamora Nov 27, 2024
c6eb1b8
Apply suggestions from code review
rjzamora Nov 27, 2024
35e5493
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Nov 27, 2024
5cfef05
Merge branch 'branch-25.02' into cudf-polars-multi-dataframe-scan
rjzamora Nov 27, 2024
c031b01
Merge branch 'cudf-polars-multi-dataframe-scan' of github.com:rjzamor…
rjzamora Nov 27, 2024
93f6a86
fix code suggestions
rjzamora Nov 27, 2024
646ddba
refactor to avoid circular imports
rjzamora Nov 27, 2024
e0929e4
fix test coverage
rjzamora Nov 27, 2024
46531aa
Merge branch 'branch-25.02' into cudf-polars-multi-dataframe-scan
rjzamora Dec 2, 2024
8fb2833
remove problematic default mapping
rjzamora Dec 2, 2024
925cb47
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Dec 2, 2024
503ad59
improve comment
rjzamora Dec 2, 2024
043d268
move back lower_ir_node default
rjzamora Dec 2, 2024
332ced3
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,16 @@ def validate_config_options(config: dict) -> None:
If the configuration contains unsupported options.
"""
if unsupported := (
config.keys() - {"raise_on_fail", "parquet_options", "executor"}
config.keys()
- {"raise_on_fail", "parquet_options", "parallel_options", "executor"}
):
raise ValueError(
f"Engine configuration contains unsupported settings: {unsupported}"
)
assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset(
config.get("parquet_options", {})
)
assert {"num_rows_threshold"}.issuperset(config.get("parallel_options", {}))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to nest all multi-gpu options within the "parallel_options" moving forward (to avoid adding more top-level keys).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might imagine that these options are executor-specific, does it make sense to have a nesting that is:

executor: str | tuple[str, dict]

So the executor argument is either a name, or a ("name", name-specific-options)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems fine to me. Any opinion on this @pentschev ?

I do think it's a good idea to consider how the number of these options will inevitably grow over time (and that they will probably be executor-specific).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. The str | tuple[str, dict] logic actually feels a bit clumsy when I think about how to implement it.

How about we just rename "parallel_options" to "executor_options" (to make it clear that the options are executor-specific)? This still allows us to validate that the specified arguments are actually supported by the "active" executor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As much as I agree that it is indeed clumsy it feels like we'll soon need to have nested options and inevitably make "executor_options" require accepting str | tuple[str, dict], so we may as well just do that in executor and with that allow as many levels of nested options as needed as part of executor. I think a better alternative may be an abstract base class Executor that we can specialize with the options we need for each executor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better alternative may be an abstract base class Executor that we can specialize with the options we need for each executor.

I do think this is the best long-term solution, but I also don't think it will be difficult to migrate from the "executor_options" approach currently used in this PR.

I don't think I understand why it is inevitable that "executor_options" would need to accept str | tuple[str, dict]. However, I do see why it would be useful to attach all executor-specific options to an Executor object. That said, I don't really want to deal with serialization/etc in this PR :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand why it is inevitable that "executor_options" would need to accept str | tuple[str, dict].

It's possible I'm overestimating the amount of options we'll end up introducing here, but once we need nested options we'll need something more complex like the tuple[str, dict], or the abstract base class. Thus why I think it's inevitable.



def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
Expand Down
17 changes: 14 additions & 3 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,26 +683,30 @@ class DataFrameScan(IR):
This typically arises from ``q.collect().lazy()``
"""

__slots__ = ("df", "projection", "predicate")
_non_child = ("schema", "df", "projection", "predicate")
__slots__ = ("df", "projection", "predicate", "config_options")
_non_child = ("schema", "df", "projection", "predicate", "config_options")
df: Any
"""Polars LazyFrame object."""
projection: tuple[str, ...] | None
"""List of columns to project out."""
predicate: expr.NamedExpr | None
"""Mask to apply."""
config_options: dict[str, Any]
"""GPU-specific configuration options"""

def __init__(
self,
schema: Schema,
df: Any,
projection: Sequence[str] | None,
predicate: expr.NamedExpr | None,
config_options: dict[str, Any],
):
self.schema = schema
self.df = df
self.projection = tuple(projection) if projection is not None else None
self.predicate = predicate
self.config_options = config_options
self._non_child_args = (schema, df, self.projection, predicate)
self.children = ()

Expand All @@ -714,7 +718,14 @@ def get_hashable(self) -> Hashable:
not stable across runs, or repeat instances of the same equal dataframes.
"""
schema_hash = tuple(self.schema.items())
return (type(self), schema_hash, id(self.df), self.projection, self.predicate)
return (
type(self),
schema_hash,
id(self.df),
self.projection,
self.predicate,
json.dumps(self.config_options),
)

@classmethod
def do_evaluate(
Expand Down
1 change: 1 addition & 0 deletions python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def _(
translate_named_expr(translator, n=node.selection)
if node.selection is not None
else None,
translator.config.config.copy(),
)


Expand Down
83 changes: 83 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Parallel IO Logic."""

from __future__ import annotations

import math
from functools import cached_property
from typing import TYPE_CHECKING, Any

from cudf_polars.dsl.ir import DataFrameScan
from cudf_polars.experimental.parallel import (
PartitionInfo,
generate_ir_tasks,
get_key_name,
)

if TYPE_CHECKING:
from collections.abc import MutableMapping

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.parallel import LowerIRTransformer


##
## DataFrameScan
##


class ParDataFrameScan(DataFrameScan):
"""Parallel DataFrameScan."""

@property
def _max_n_rows(self) -> int:
"""Row-count threshold for splitting a DataFrame."""
parallel_options = self.config_options.get("parallel_options", {})
return parallel_options.get("num_rows_threshold", 1_000_000)

@cached_property
def _count(self) -> int:
"""Partition count."""
total_rows = max(self.df.shape()[0], 1)
return math.ceil(total_rows / self._max_n_rows)

def _tasks(self) -> MutableMapping[Any, Any]:
"""Task graph."""
total_rows = max(self.df.shape()[0], 1)
stride = math.ceil(total_rows / self._count)
key_name = get_key_name(self)
return {
(key_name, i): (
self.do_evaluate,
self.schema,
self.df.slice(offset, stride),
self.projection,
self.predicate,
)
for i, offset in enumerate(range(0, total_rows, stride))
}


def lower_dataframescan_node(
ir: DataFrameScan, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
"""Rewrite a Scan node with proper partitioning."""
new_node = ParDataFrameScan(
ir.schema,
ir.df,
ir.projection,
ir.predicate,
ir.config_options,
)
return new_node, {new_node: PartitionInfo(count=new_node._count)}


@generate_ir_tasks.register(ParDataFrameScan)
def _(
ir: ParDataFrameScan, partition_info: MutableMapping[IR, PartitionInfo]
) -> MutableMapping[Any, Any]:
assert (
partition_info[ir].count == ir._count
), "Inconsistent ParDataFrameScan partitioning."
return ir._tasks()
32 changes: 29 additions & 3 deletions python/cudf_polars/cudf_polars/experimental/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
from functools import reduce, singledispatch
from typing import TYPE_CHECKING, Any

from cudf_polars.dsl.ir import IR
from cudf_polars.dsl.ir import IR, DataFrameScan, Union
from cudf_polars.dsl.traversal import traversal

if TYPE_CHECKING:
from collections.abc import MutableMapping
from collections.abc import MutableMapping, Sequence
from typing import TypeAlias

from cudf_polars.containers import DataFrame
Expand Down Expand Up @@ -223,7 +223,14 @@ def task_graph(
operator.or_,
(generate_ir_tasks(node, partition_info) for node in traversal(ir)),
)
return graph, (get_key_name(ir), 0)

key_name = get_key_name(ir)
partition_count = partition_info[ir].count
if partition_count > 1:
graph[key_name] = (_concat, [(key_name, i) for i in range(partition_count)])
return graph, key_name
else:
return graph, (key_name, 0)


def evaluate_dask(ir: IR) -> DataFrame:
Expand All @@ -234,3 +241,22 @@ def evaluate_dask(ir: IR) -> DataFrame:

graph, key = task_graph(ir, partition_info)
return get(graph, key)


def _concat(dfs: Sequence[DataFrame]) -> DataFrame:
# Concatenate a sequence of DataFrames vertically
return Union.do_evaluate(None, *dfs)


##
## DataFrameScan
##


@lower_ir_node.register(DataFrameScan)
def _(
ir: DataFrameScan, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
import cudf_polars.experimental.io as _io

return _io.lower_dataframescan_node(ir, rec)
12 changes: 10 additions & 2 deletions python/cudf_polars/tests/dsl/test_traversal.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ def test_rewrite_ir_node():
def replace_df(node, rec):
if isinstance(node, ir.DataFrameScan):
return ir.DataFrameScan(
node.schema, new_df._df, node.projection, node.predicate
node.schema,
new_df._df,
node.projection,
node.predicate,
node.config_options,
)
return reuse_if_unchanged(node, rec)

Expand Down Expand Up @@ -144,7 +148,11 @@ def test_rewrite_scan_node(tmp_path):
def replace_scan(node, rec):
if isinstance(node, ir.Scan):
return ir.DataFrameScan(
node.schema, right._df, node.with_columns, node.predicate
node.schema,
right._df,
node.with_columns,
node.predicate,
node.config_options,
)
return reuse_if_unchanged(node, rec)

Expand Down
43 changes: 43 additions & 0 deletions python/cudf_polars/tests/experimental/test_dataframescan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import pytest

import polars as pl

from cudf_polars import Translator
from cudf_polars.experimental.parallel import lower_ir_graph
from cudf_polars.testing.asserts import assert_gpu_result_equal


@pytest.fixture(scope="module")
def df():
return pl.LazyFrame(
{
"x": range(30_000),
"y": ["cat", "dog", "fish"] * 10_000,
"z": [1.0, 2.0, 3.0, 4.0, 5.0] * 6_000,
}
)


@pytest.mark.parametrize("num_rows_threshold", [1_000, 1_000_000])
def test_parallel_dataframescan(df, num_rows_threshold):
total_row_count = len(df.collect())
engine = pl.GPUEngine(
raise_on_fail=True,
parallel_options={"num_rows_threshold": num_rows_threshold},
executor="dask-experimental",
)
assert_gpu_result_equal(df, engine=engine)

# Check partitioning
qir = Translator(df._ldf.visit(), engine).translate_ir()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we need to remember to check that we didn't get any errors. I will t ry and open a PR that does this automatically.

ir, info = lower_ir_graph(qir)
count = info[ir].count
if num_rows_threshold < total_row_count:
assert count > 1
else:
assert count == 1
Loading