Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] chunked reader + kvikio fixes #17297

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
146ac45
access and config chunked parquet reader
brandon-b-miller Sep 10, 2024
0242495
do not early return df
brandon-b-miller Sep 16, 2024
e257242
Merge branch 'branch-24.12' into cudf-polars-chunked-parquet-reader
brandon-b-miller Oct 9, 2024
95ebf4d
fix nrows
brandon-b-miller Oct 9, 2024
7533ed3
Merge branch 'branch-24.12' into cudf-polars-chunked-parquet-reader
brandon-b-miller Oct 22, 2024
43acc47
Merge branch 'branch-24.12' into cudf-polars-chunked-parquet-reader
brandon-b-miller Oct 28, 2024
6ddf128
updates, set defaults
brandon-b-miller Oct 29, 2024
fea77d7
pass config through evaluate
brandon-b-miller Oct 30, 2024
53b0b2a
a trial commit to test a different concatenation strategy
brandon-b-miller Oct 31, 2024
ec298d3
merge/resolve
brandon-b-miller Nov 5, 2024
310f8c2
adjust for IR changes / pass tests
brandon-b-miller Nov 6, 2024
62eed59
remove threshold for kvikIO
vuule Nov 6, 2024
62c277b
address reviews
brandon-b-miller Nov 7, 2024
13df5aa
revert translate.py changes
brandon-b-miller Nov 7, 2024
4aee59f
Revert "a trial commit to test a different concatenation strategy"
brandon-b-miller Nov 7, 2024
76f1f62
Merge branch 'branch-24.12' into impr-prefer-kvikio-device-read
vuule Nov 7, 2024
50add3a
Merge branch 'branch-24.12' into cudf-polars-chunked-parquet-reader
brandon-b-miller Nov 8, 2024
a113737
add docs
brandon-b-miller Nov 8, 2024
55afaf6
Merge remote-tracking branch 'vuule/impr-prefer-kvikio-device-read' i…
brandon-b-miller Nov 11, 2024
828a3bb
merge/resolve
brandon-b-miller Nov 12, 2024
b1a7dea
Merge branch 'cudf-polars-chunked-parquet-reader' into chunked-reader…
brandon-b-miller Nov 12, 2024
10dc679
Merge branch 'branch-24.12' into chunked-reader-kvikio-fixes
galipremsagar Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ class file_source : public datasource {

[[nodiscard]] bool is_device_read_preferred(size_t size) const override
{
if (size < _gds_read_preferred_threshold) { return false; }
return supports_device_read();
if (!supports_device_read()) { return false; }

// Always prefer device reads if kvikio is enabled
if (!_kvikio_file.closed()) { return true; }

return size >= _gds_read_preferred_threshold;
}

std::future<size_t> device_read_async(size_t offset,
Expand Down
24 changes: 24 additions & 0 deletions docs/cudf/source/cudf_polars/engine_options.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# GPUEngine Configuration Options

The `polars.GPUEngine` object may be configured in several different ways.

## Parquet Reader Options

Chunked reading is controlled by passing a dictionary of options to the `GPUEngine` object. Details may be found following the links to the underlying `libcudf` reader.
- `parquet_chunked`, indicicates is chunked parquet reading is to be used, default True.
- [chunk_read_limit](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum size per chunk, default unlimited.
- [pass_read_limit](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum memory used for decompression, default 16GiB.

For example, one would pass these parameters as follows:
```python
engine = GPUEngine(
raise_on_fail=True,
parquet_options={
'parquet_chunked': True,
'chunk_read_limit': int(1e9),
'pass_read_limit': int(4e9)
}
)
result = query.collect(engine=engine)
```
Note that passing `parquet_chunked: False` disables chunked reading entirely, and thus `chunk_read_limit` and `pass_read_limit` will have no effect.
6 changes: 6 additions & 0 deletions docs/cudf/source/cudf_polars/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@ Launch on Google Colab
:target: https://colab.research.google.com/github/rapidsai-community/showcase/blob/main/accelerated_data_processing_examples/polars_gpu_engine_demo.ipynb

Try out the GPU engine for Polars in a free GPU notebook environment. Sign in with your Google account and `launch the demo on Colab <https://colab.research.google.com/github/rapidsai-community/showcase/blob/main/accelerated_data_processing_examples/polars_gpu_engine_demo.ipynb>`__.

.. toctree::
:maxdepth: 1
:caption: Engine Config Options:

engine_options
11 changes: 8 additions & 3 deletions python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def set_device(device: int | None) -> Generator[int, None, None]:

def _callback(
ir: IR,
config: GPUEngine,
with_columns: list[str] | None,
pyarrow_predicate: str | None,
n_rows: int | None,
Expand All @@ -145,7 +146,7 @@ def _callback(
set_device(device),
set_memory_resource(memory_resource),
):
return ir.evaluate(cache={}).to_polars()
return ir.evaluate(cache={}, config=config).to_polars()


def execute_with_cudf(
Expand Down Expand Up @@ -174,7 +175,7 @@ def execute_with_cudf(
device = config.device
memory_resource = config.memory_resource
raise_on_fail = config.config.get("raise_on_fail", False)
if unsupported := (config.config.keys() - {"raise_on_fail"}):
if unsupported := (config.config.keys() - {"raise_on_fail", "parquet_options"}):
raise ValueError(
f"Engine configuration contains unsupported settings {unsupported}"
)
Expand All @@ -201,7 +202,11 @@ def execute_with_cudf(
else:
nt.set_udf(
partial(
_callback, ir, device=device, memory_resource=memory_resource
_callback,
ir,
config,
device=device,
memory_resource=memory_resource,
)
)
except exception as e:
Expand Down
130 changes: 99 additions & 31 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
from collections.abc import Callable, Hashable, MutableMapping, Sequence
from typing import Literal

from polars import GPUEngine

from cudf_polars.typing import Schema


Expand Down Expand Up @@ -180,7 +182,9 @@ def get_hashable(self) -> Hashable:
translation phase should fail earlier.
"""

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""
Evaluate the node (recursively) and return a dataframe.

Expand All @@ -189,6 +193,8 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
cache
Mapping from cached node ids to constructed DataFrames.
Used to implement evaluation of the `Cache` node.
config
GPU engine configuration.

Notes
-----
Expand All @@ -208,8 +214,9 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
translation phase should fail earlier.
"""
return self.do_evaluate(
config,
*self._non_child_args,
*(child.evaluate(cache=cache) for child in self.children),
*(child.evaluate(cache=cache, config=config) for child in self.children),
)


Expand Down Expand Up @@ -293,6 +300,9 @@ class Scan(IR):
predicate: expr.NamedExpr | None
"""Mask to apply to the read dataframe."""

PARQUET_DEFAULT_CHUNK_SIZE: int = 0
PARQUET_DEFAULT_PASS_LIMIT: int = 17179869184 # 16GiB

def __init__(
self,
schema: Schema,
Expand Down Expand Up @@ -412,6 +422,7 @@ def get_hashable(self) -> Hashable:
@classmethod
def do_evaluate(
cls,
config: GPUEngine,
schema: Schema,
typ: str,
reader_options: dict[str, Any],
Expand Down Expand Up @@ -497,25 +508,59 @@ def do_evaluate(
colnames[0],
)
elif typ == "parquet":
filters = None
if predicate is not None and row_index is None:
# Can't apply filters during read if we have a row index.
filters = to_parquet_filter(predicate.value)
tbl_w_meta = plc.io.parquet.read_parquet(
plc.io.SourceInfo(paths),
columns=with_columns,
filters=filters,
nrows=n_rows,
skip_rows=skip_rows,
)
df = DataFrame.from_table(
tbl_w_meta.tbl,
# TODO: consider nested column names?
tbl_w_meta.column_names(include_children=False),
)
if filters is not None:
# Mask must have been applied.
return df
parquet_options = config.config.get("parquet_options", {})
if parquet_options.get("chunked", False):
reader = plc.io.parquet.ChunkedParquetReader(
plc.io.SourceInfo(paths),
columns=with_columns,
nrows=n_rows,
skip_rows=skip_rows,
chunk_read_limit=parquet_options.get(
"chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE
),
pass_read_limit=parquet_options.get(
"pass_read_limit", cls.PARQUET_DEFAULT_PASS_LIMIT
),
)
chk = reader.read_chunk()
tbl = chk.tbl
names = chk.column_names()
concatenated_columns = tbl.columns()
while reader.has_next():
tbl = reader.read_chunk().tbl

for i in range(tbl.num_columns()):
concatenated_columns[i] = plc.concatenate.concatenate(
[concatenated_columns[i], tbl._columns[i]]
)
# Drop residual columns to save memory
tbl._columns[i] = None

df = DataFrame.from_table(
plc.Table(concatenated_columns),
names=names,
)
else:
filters = None
if predicate is not None and row_index is None:
# Can't apply filters during read if we have a row index.
filters = to_parquet_filter(predicate.value)
tbl_w_meta = plc.io.parquet.read_parquet(
plc.io.SourceInfo(paths),
columns=with_columns,
filters=filters,
nrows=n_rows,
skip_rows=skip_rows,
)
df = DataFrame.from_table(
tbl_w_meta.tbl,
# TODO: consider nested column names?
tbl_w_meta.column_names(include_children=False),
)
if filters is not None:
# Mask must have been applied.
return df

elif typ == "ndjson":
json_schema: list[plc.io.json.NameAndType] = [
(name, typ, []) for name, typ in schema.items()
Expand Down Expand Up @@ -590,22 +635,26 @@ def __init__(self, schema: Schema, key: int, value: IR):

@classmethod
def do_evaluate(
cls, key: int, df: DataFrame
cls, config: GPUEngine, key: int, df: DataFrame
) -> DataFrame: # pragma: no cover; basic evaluation never calls this
"""Evaluate and return a dataframe."""
# Our value has already been computed for us, so let's just
# return it.
return df

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
def evaluate(
self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine
) -> DataFrame:
"""Evaluate and return a dataframe."""
# We must override the recursion scheme because we don't want
# to recurse if we're in the cache.
try:
return cache[self.key]
except KeyError:
(value,) = self.children
return cache.setdefault(self.key, value.evaluate(cache=cache))
return cache.setdefault(
self.key, value.evaluate(cache=cache, config=config)
)


class DataFrameScan(IR):
Expand Down Expand Up @@ -651,6 +700,7 @@ def get_hashable(self) -> Hashable:
@classmethod
def do_evaluate(
cls,
config: GPUEngine,
schema: Schema,
df: Any,
projection: tuple[str, ...] | None,
Expand Down Expand Up @@ -698,6 +748,7 @@ def __init__(
@classmethod
def do_evaluate(
cls,
config: GPUEngine,
exprs: tuple[expr.NamedExpr, ...],
should_broadcast: bool, # noqa: FBT001
df: DataFrame,
Expand Down Expand Up @@ -732,7 +783,10 @@ def __init__(

@classmethod
def do_evaluate(
cls, exprs: tuple[expr.NamedExpr, ...], df: DataFrame
cls,
config: GPUEngine,
exprs: tuple[expr.NamedExpr, ...],
df: DataFrame,
) -> DataFrame: # pragma: no cover; not exposed by polars yet
"""Evaluate and return a dataframe."""
columns = broadcast(*(e.evaluate(df) for e in exprs))
Expand Down Expand Up @@ -823,6 +877,7 @@ def check_agg(agg: expr.Expr) -> int:
@classmethod
def do_evaluate(
cls,
config: GPUEngine,
keys_in: Sequence[expr.NamedExpr],
agg_requests: Sequence[expr.NamedExpr],
maintain_order: bool, # noqa: FBT001
Expand Down Expand Up @@ -944,6 +999,7 @@ def __init__(
@classmethod
def do_evaluate(
cls,
config: GPUEngine,
predicate: plc.expressions.Expression,
zlice: tuple[int, int] | None,
suffix: str,
Expand Down Expand Up @@ -1116,6 +1172,7 @@ def _reorder_maps(
@classmethod
def do_evaluate(
cls,
config: GPUEngine,
left_on_exprs: Sequence[expr.NamedExpr],
right_on_exprs: Sequence[expr.NamedExpr],
options: tuple[
Expand Down Expand Up @@ -1239,6 +1296,7 @@ def __init__(
@classmethod
def do_evaluate(
cls,
config: GPUEngine,
exprs: Sequence[expr.NamedExpr],
should_broadcast: bool, # noqa: FBT001
df: DataFrame,
Expand Down Expand Up @@ -1301,6 +1359,7 @@ def __init__(
@classmethod
def do_evaluate(
cls,
config: GPUEngine,
keep: plc.stream_compaction.DuplicateKeepOption,
subset: frozenset[str] | None,
zlice: tuple[int, int] | None,
Expand Down Expand Up @@ -1390,6 +1449,7 @@ def __init__(
@classmethod
def do_evaluate(
cls,
config: GPUEngine,
by: Sequence[expr.NamedExpr],
order: Sequence[plc.types.Order],
null_order: Sequence[plc.types.NullOrder],
Expand Down Expand Up @@ -1445,7 +1505,9 @@ def __init__(self, schema: Schema, offset: int, length: int, df: IR):
self.children = (df,)

@classmethod
def do_evaluate(cls, offset: int, length: int, df: DataFrame) -> DataFrame:
def do_evaluate(
cls, config: GPUEngine, offset: int, length: int, df: DataFrame
) -> DataFrame:
"""Evaluate and return a dataframe."""
return df.slice((offset, length))

Expand All @@ -1465,7 +1527,9 @@ def __init__(self, schema: Schema, mask: expr.NamedExpr, df: IR):
self.children = (df,)

@classmethod
def do_evaluate(cls, mask_expr: expr.NamedExpr, df: DataFrame) -> DataFrame:
def do_evaluate(
cls, config: GPUEngine, mask_expr: expr.NamedExpr, df: DataFrame
) -> DataFrame:
"""Evaluate and return a dataframe."""
(mask,) = broadcast(mask_expr.evaluate(df), target_length=df.num_rows)
return df.filter(mask)
Expand All @@ -1483,7 +1547,7 @@ def __init__(self, schema: Schema, df: IR):
self.children = (df,)

@classmethod
def do_evaluate(cls, schema: Schema, df: DataFrame) -> DataFrame:
def do_evaluate(cls, config: GPUEngine, schema: Schema, df: DataFrame) -> DataFrame:
"""Evaluate and return a dataframe."""
# This can reorder things.
columns = broadcast(
Expand Down Expand Up @@ -1559,7 +1623,9 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR):
self._non_child_args = (name, self.options)

@classmethod
def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
def do_evaluate(
cls, config: GPUEngine, name: str, options: Any, df: DataFrame
) -> DataFrame:
"""Evaluate and return a dataframe."""
if name == "rechunk":
# No-op in our data model
Expand Down Expand Up @@ -1638,7 +1704,9 @@ def __init__(self, schema: Schema, zlice: tuple[int, int] | None, *children: IR)
raise NotImplementedError("Schema mismatch")

@classmethod
def do_evaluate(cls, zlice: tuple[int, int] | None, *dfs: DataFrame) -> DataFrame:
def do_evaluate(
cls, config: GPUEngine, zlice: tuple[int, int] | None, *dfs: DataFrame
) -> DataFrame:
"""Evaluate and return a dataframe."""
# TODO: only evaluate what we need if we have a slice?
return DataFrame.from_table(
Expand Down Expand Up @@ -1687,7 +1755,7 @@ def _extend_with_nulls(table: plc.Table, *, nrows: int) -> plc.Table:
)

@classmethod
def do_evaluate(cls, *dfs: DataFrame) -> DataFrame:
def do_evaluate(cls, config: GPUEngine, *dfs: DataFrame) -> DataFrame:
"""Evaluate and return a dataframe."""
max_rows = max(df.num_rows for df in dfs)
# Horizontal concatenation extends shorter tables with nulls
Expand Down
Loading
Loading