Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement cudf-polars chunked parquet reading #16944

Merged
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
146ac45
access and config chunked parquet reader
brandon-b-miller Sep 10, 2024
0242495
do not early return df
brandon-b-miller Sep 16, 2024
e257242
Merge branch 'branch-24.12' into cudf-polars-chunked-parquet-reader
brandon-b-miller Oct 9, 2024
95ebf4d
fix nrows
brandon-b-miller Oct 9, 2024
7533ed3
Merge branch 'branch-24.12' into cudf-polars-chunked-parquet-reader
brandon-b-miller Oct 22, 2024
43acc47
Merge branch 'branch-24.12' into cudf-polars-chunked-parquet-reader
brandon-b-miller Oct 28, 2024
6ddf128
updates, set defaults
brandon-b-miller Oct 29, 2024
fea77d7
pass config through evaluate
brandon-b-miller Oct 30, 2024
53b0b2a
a trial commit to test a different concatenation strategy
brandon-b-miller Oct 31, 2024
ec298d3
merge/resolve
brandon-b-miller Nov 5, 2024
310f8c2
adjust for IR changes / pass tests
brandon-b-miller Nov 6, 2024
62c277b
address reviews
brandon-b-miller Nov 7, 2024
13df5aa
revert translate.py changes
brandon-b-miller Nov 7, 2024
4aee59f
Revert "a trial commit to test a different concatenation strategy"
brandon-b-miller Nov 7, 2024
50add3a
Merge branch 'branch-24.12' into cudf-polars-chunked-parquet-reader
brandon-b-miller Nov 8, 2024
a113737
add docs
brandon-b-miller Nov 8, 2024
828a3bb
merge/resolve
brandon-b-miller Nov 12, 2024
48d3edc
add test coverage
brandon-b-miller Nov 12, 2024
eacc73d
merge/resolve
brandon-b-miller Nov 13, 2024
9c4c1bf
raise on fail true for default testing engine
brandon-b-miller Nov 13, 2024
d6aa668
Apply suggestions from code review
brandon-b-miller Nov 13, 2024
a06f0ae
reword Parquet Reader Options
brandon-b-miller Nov 13, 2024
9930d2e
partially address reviews
brandon-b-miller Nov 13, 2024
b2530a4
Apply suggestions from code review
brandon-b-miller Nov 13, 2024
9958fe9
chunk on by default
brandon-b-miller Nov 13, 2024
d33ec5e
turn OFF chunking in existing parquet tests
brandon-b-miller Nov 13, 2024
b69eaa6
Merge branch 'branch-24.12' into cudf-polars-chunked-parquet-reader
galipremsagar Nov 13, 2024
2be2847
disable slice pushdown with parquet
brandon-b-miller Nov 13, 2024
e72215b
Merge branch 'branch-24.12' into HEAD
wence- Nov 14, 2024
c23afd9
Test parquet filters with chunking off and on
wence- Nov 14, 2024
df341ea
Implement workaround for #16186
wence- Nov 14, 2024
beb2462
xfail a polars test
wence- Nov 14, 2024
b398172
Apply suggestions from code review
brandon-b-miller Nov 15, 2024
2116d94
Merge branch 'branch-24.12' into cudf-polars-chunked-parquet-reader
galipremsagar Nov 15, 2024
e67614a
Remove commented code
wence- Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
pass config through evaluate
brandon-b-miller committed Oct 30, 2024

Verified

This commit was signed with the committer’s verified signature.
commit fea77d79dd4d369dda7e0a734373783972d0bc6c
6 changes: 4 additions & 2 deletions python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
@@ -129,6 +129,7 @@ def set_device(device: int | None) -> Generator[int, None, None]:

def _callback(
ir: IR,
config: GPUEngine,
with_columns: list[str] | None,
pyarrow_predicate: str | None,
n_rows: int | None,
@@ -145,7 +146,7 @@ def _callback(
set_device(device),
set_memory_resource(memory_resource),
):
return ir.evaluate(cache={}).to_polars()
return ir.evaluate(cache={}, config=config).to_polars()


def execute_with_cudf(
@@ -183,7 +184,8 @@ def execute_with_cudf(
nt.set_udf(
partial(
_callback,
translate_ir(nt, config),
translate_ir(nt),
config,
device=device,
memory_resource=memory_resource,
)
7 changes: 1 addition & 6 deletions python/cudf_polars/cudf_polars/dsl/expressions/base.py
Original file line number Diff line number Diff line change
@@ -18,8 +18,6 @@
if TYPE_CHECKING:
from collections.abc import Mapping

from polars import GPUEngine

from cudf_polars.containers import Column, DataFrame

__all__ = ["Expr", "NamedExpr", "Col", "AggInfo", "ExecutionContext"]
@@ -38,16 +36,13 @@ class ExecutionContext(IntEnum):
class Expr(Node["Expr"]):
"""An abstract expression object."""

__slots__ = ("dtype", "_config")
__slots__ = ("dtype",)
dtype: plc.DataType
"""Data type of the expression."""
# This annotation is needed because of https://github.com/python/mypy/issues/17981
_non_child: ClassVar[tuple[str, ...]] = ("dtype",)
"""Names of non-child data (not Exprs) for reconstruction."""

_config: GPUEngine
"""GPU engine configuration."""

def do_evaluate(
self,
df: DataFrame,
111 changes: 72 additions & 39 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
@@ -129,15 +129,12 @@ def broadcast(*columns: Column, target_length: int | None = None) -> list[Column
class IR(Node["IR"]):
"""Abstract plan node, representing an unevaluated dataframe."""

__slots__ = ("schema", "_config")
__slots__ = ("schema",)
# This annotation is needed because of https://github.com/python/mypy/issues/17981
_non_child: ClassVar[tuple[str, ...]] = ("schema",)
schema: Schema
"""Mapping from column names to their data types."""

_config: GPUEngine
"""GPU engine configuration."""

def get_hashable(self) -> Hashable:
"""
Hashable representation of node, treating schema dictionary.
@@ -151,7 +148,9 @@ def get_hashable(self) -> Hashable:
schema_hash = tuple(self.schema.items())
return (type(self), schema_hash, args)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""
Evaluate the node and return a dataframe.

@@ -160,6 +159,8 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
cache
Mapping from cached node ids to constructed DataFrames.
Used to implement evaluation of the `Cache` node.
config
GPU engine configuration.

Returns
-------
@@ -346,7 +347,9 @@ def get_hashable(self) -> Hashable:
self.predicate,
)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
with_columns = self.with_columns
row_index = self.row_index
@@ -425,7 +428,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
colnames[0],
)
elif self.typ == "parquet":
parquet_options = self._config.config.get("parquet_options", {})
parquet_options = config.config.get("parquet_options", {})
brandon-b-miller marked this conversation as resolved.
Show resolved Hide resolved
if parquet_options.get("chunked", False):
reader = plc.io.parquet.ChunkedParquetReader(
plc.io.SourceInfo(self.paths),
@@ -540,13 +543,17 @@ def __init__(self, schema: Schema, key: int, value: IR):
self.key = key
self.children = (value,)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
try:
return cache[self.key]
except KeyError:
(value,) = self.children
return cache.setdefault(self.key, value.evaluate(cache=cache))
return cache.setdefault(
self.key, value.evaluate(cache=cache, config=config)
)


class DataFrameScan(IR):
@@ -588,7 +595,9 @@ def get_hashable(self) -> Hashable:
schema_hash = tuple(self.schema.items())
return (type(self), schema_hash, id(self.df), self.projection, self.predicate)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
pdf = pl.DataFrame._from_pydf(self.df)
if self.projection is not None:
@@ -627,10 +636,12 @@ def __init__(
self.should_broadcast = should_broadcast
self.children = (df,)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
(child,) = self.children
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
# Handle any broadcasting
columns = [e.evaluate(df) for e in self.exprs]
if self.should_broadcast:
@@ -658,11 +669,11 @@ def __init__(
self.children = (df,)

def evaluate(
self, *, cache: MutableMapping[int, DataFrame]
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame: # pragma: no cover; polars doesn't emit this node yet
"""Evaluate and return a dataframe."""
(child,) = self.children
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
columns = broadcast(*(e.evaluate(df) for e in self.exprs))
assert all(column.obj.size() == 1 for column in columns)
return DataFrame(columns)
@@ -741,10 +752,12 @@ def check_agg(agg: expr.Expr) -> int:
else:
raise NotImplementedError(f"No handler for {agg=}")

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
(child,) = self.children
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
keys = broadcast(
*(k.evaluate(df) for k in self.keys), target_length=df.num_rows
)
@@ -970,9 +983,11 @@ def _reorder_maps(
[plc.types.NullOrder.AFTER, plc.types.NullOrder.AFTER],
).columns()

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
left, right = (c.evaluate(cache=cache) for c in self.children)
left, right = (c.evaluate(cache=cache, config=config) for c in self.children)
how, join_nulls, zlice, suffix, coalesce = self.options
if how == "cross":
# Separate implementation, since cross_join returns the
@@ -1079,10 +1094,12 @@ def __init__(
self.should_broadcast = should_broadcast
self.children = (df,)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
(child,) = self.children
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
columns = [c.evaluate(df) for c in self.columns]
if self.should_broadcast:
columns = broadcast(*columns, target_length=df.num_rows)
@@ -1136,10 +1153,12 @@ def __init__(
"any": plc.stream_compaction.DuplicateKeepOption.KEEP_ANY,
}

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
(child,) = self.children
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
if self.subset is None:
indices = list(range(df.num_columns))
keys_sorted = all(c.is_sorted for c in df.column_map.values())
@@ -1212,10 +1231,12 @@ def __init__(
self.zlice = zlice
self.children = (df,)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
(child,) = self.children
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
sort_keys = broadcast(
*(k.evaluate(df) for k in self.by), target_length=df.num_rows
)
@@ -1265,10 +1286,12 @@ def __init__(self, schema: Schema, offset: int, length: int, df: IR):
self.length = length
self.children = (df,)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
(child,) = self.children
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
return df.slice((self.offset, self.length))


@@ -1285,10 +1308,12 @@ def __init__(self, schema: Schema, mask: expr.NamedExpr, df: IR):
self.mask = mask
self.children = (df,)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
(child,) = self.children
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
(mask,) = broadcast(self.mask.evaluate(df), target_length=df.num_rows)
return df.filter(mask)

@@ -1303,10 +1328,12 @@ def __init__(self, schema: Schema, df: IR):
self.schema = schema
self.children = (df,)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
(child,) = self.children
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
# This can reorder things.
columns = broadcast(
*(df.column_map[name] for name in self.schema), target_length=df.num_rows
@@ -1374,21 +1401,23 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR):
)
self.options = (tuple(indices), tuple(pivotees), variable_name, value_name)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
(child,) = self.children
if self.name == "rechunk":
# No-op in our data model
# Don't think this appears in a plan tree from python
return child.evaluate(cache=cache) # pragma: no cover
return child.evaluate(cache=cache, config=config) # pragma: no cover
elif self.name == "rename":
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
# final tag is "swapping" which is useful for the
# optimiser (it blocks some pushdown operations)
old, new, _ = self.options
return df.rename_columns(dict(zip(old, new, strict=True)))
elif self.name == "explode":
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
((to_explode,),) = self.options
index = df.column_names.index(to_explode)
subset = df.column_names_set - {to_explode}
@@ -1398,7 +1427,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
elif self.name == "unpivot":
indices, pivotees, variable_name, value_name = self.options
npiv = len(pivotees)
df = child.evaluate(cache=cache)
df = child.evaluate(cache=cache, config=config)
index_columns = [
Column(col, name=name)
for col, name in zip(
@@ -1453,10 +1482,12 @@ def __init__(self, schema: Schema, zlice: tuple[int, int] | None, *children: IR)
if not all(s.schema == schema for s in self.children[1:]):
raise NotImplementedError("Schema mismatch")

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
# TODO: only evaluate what we need if we have a slice
dfs = [df.evaluate(cache=cache) for df in self.children]
dfs = [df.evaluate(cache=cache, config=config) for df in self.children]
return DataFrame.from_table(
plc.concatenate.concatenate([df.table for df in dfs]), dfs[0].column_names
).slice(self.zlice)
@@ -1500,9 +1531,11 @@ def _extend_with_nulls(table: plc.Table, *, nrows: int) -> plc.Table:
]
)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
dfs = [df.evaluate(cache=cache) for df in self.children]
dfs = [df.evaluate(cache=cache, config=config) for df in self.children]
max_rows = max(df.num_rows for df in dfs)
# Horizontal concatenation extends shorter tables with nulls
dfs = [
5 changes: 0 additions & 5 deletions python/cudf_polars/cudf_polars/dsl/translate.py
brandon-b-miller marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -21,8 +21,6 @@
from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir

from cudf_polars.dsl import expr, ir
from cudf_polars.dsl.expr import Expr
from cudf_polars.dsl.ir import IR
from cudf_polars.dsl.traversal import make_recursive, reuse_if_unchanged
from cudf_polars.typing import NodeTraverser
from cudf_polars.utils import dtypes, sorting
@@ -404,9 +402,6 @@ def translate_ir(
f"No support for polars IR {version=}"
) # pragma: no cover; no such version for now.

IR._config = config
Expr._config = config

with ctx:
polars_schema = visitor.get_schema()
node = visitor.view_current_node()
6 changes: 3 additions & 3 deletions python/cudf_polars/tests/dsl/test_traversal.py
Original file line number Diff line number Diff line change
@@ -124,7 +124,7 @@ def replace_df(node, rec):

new = mapper(orig)

result = new.evaluate(cache={}).to_polars()
result = new.evaluate(cache={}, config=pl.GPUEngine()).to_polars()

expect = pl.DataFrame({"a": [2, 1], "b": [-4, -3]})

@@ -153,7 +153,7 @@ def replace_scan(node, rec):
orig = translate_ir(q._ldf.visit())
new = mapper(orig)

result = new.evaluate(cache={}).to_polars()
result = new.evaluate(cache={}, config=pl.GPUEngine()).to_polars()

expect = q.collect()

@@ -224,6 +224,6 @@ def _(node: ir.Select, fn: IRTransformer):

new_ir = rewriter(qir)

got = new_ir.evaluate(cache={}).to_polars()
got = new_ir.evaluate(cache={}, config=pl.GPUEngine()).to_polars()

assert_frame_equal(expect, got)
2 changes: 1 addition & 1 deletion python/cudf_polars/tests/expressions/test_sort.py
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ def test_setsorted(descending, nulls_last, with_nulls):

assert_gpu_result_equal(q)

df = translate_ir(q._ldf.visit()).evaluate(cache={})
df = translate_ir(q._ldf.visit()).evaluate(cache={}, config=pl.GPUEngine())

a = df.column_map["a"]