From 146ac4592fa776c2f68f51fe989a38043c95d1ce Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 10 Sep 2024 14:20:34 -0700 Subject: [PATCH 01/24] access and config chunked parquet reader --- python/cudf_polars/cudf_polars/callback.py | 5 ++- python/cudf_polars/cudf_polars/dsl/ir.py | 52 +++++++++++++++++----- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index 76816ee0a61..798896d30c2 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -18,6 +18,7 @@ import rmm from rmm._cuda import gpu +import cudf_polars.dsl.translate from cudf_polars.dsl.translate import translate_ir if TYPE_CHECKING: @@ -174,7 +175,9 @@ def execute_with_cudf( device = config.device memory_resource = config.memory_resource raise_on_fail = config.config.get("raise_on_fail", False) - if unsupported := (config.config.keys() - {"raise_on_fail"}): + parquet_options = config.config.get("parquet_options", {}) + cudf_polars.dsl.translate.ir.parquet_options = parquet_options + if unsupported := (config.config.keys() - {"raise_on_fail", "parquet_options"}): raise ValueError( f"Engine configuration contains unsupported settings {unsupported}" ) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 8cd56c8ee3a..cd23272d3b4 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -35,6 +35,7 @@ from cudf_polars.typing import Schema +parquet_options: dict[str, Any] = {} __all__ = [ "IR", @@ -339,17 +340,46 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: colnames[0], ) elif self.typ == "parquet": - tbl_w_meta = plc.io.parquet.read_parquet( - plc.io.SourceInfo(self.paths), - columns=with_columns, - nrows=n_rows, - skip_rows=self.skip_rows, - ) - df = DataFrame.from_table( - tbl_w_meta.tbl, - # TODO: consider nested column names? - tbl_w_meta.column_names(include_children=False), - ) + if parquet_options.get("chunked", True): + reader = plc.io.parquet.ChunkedParquetReader( + plc.io.SourceInfo(self.paths), + columns=with_columns, + num_rows=n_rows, + skip_rows=self.skip_rows, + chunk_read_limit=parquet_options.get("chunk_read_limit", 0), + pass_read_limit=parquet_options.get("pass_read_limit", 1024000000), + ) + chk = reader.read_chunk() + tbl = chk.tbl + names = chk.column_names() + concatenated_columns = tbl.columns() + while reader.has_next(): + tbl = reader.read_chunk().tbl + + for i in range(tbl.num_columns()): + concatenated_columns[i] = plc.concatenate.concatenate( + [concatenated_columns[i], tbl._columns[i]] + ) + # Drop residual columns to save memory + tbl._columns[i] = None + + return DataFrame.from_table( + plc.Table(concatenated_columns), + names=names, + ) + else: + tbl_w_meta = plc.io.parquet.read_parquet( + plc.io.SourceInfo(self.paths), + columns=with_columns, + num_rows=n_rows, + skip_rows=self.skip_rows, + ) + return DataFrame.from_table( + tbl_w_meta.tbl, + # TODO: consider nested column names? + tbl_w_meta.column_names(include_children=False), + ) + elif self.typ == "ndjson": json_schema: list[tuple[str, str, list]] = [ (name, typ, []) for name, typ in self.schema.items() From 02424955c2f231e4ce96a40ec0274a71208eb65d Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Mon, 16 Sep 2024 06:02:04 -0700 Subject: [PATCH 02/24] do not early return df --- python/cudf_polars/cudf_polars/dsl/ir.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index cd23272d3b4..c74c1e42597 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -363,7 +363,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: # Drop residual columns to save memory tbl._columns[i] = None - return DataFrame.from_table( + df = DataFrame.from_table( plc.Table(concatenated_columns), names=names, ) @@ -374,7 +374,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: num_rows=n_rows, skip_rows=self.skip_rows, ) - return DataFrame.from_table( + df = DataFrame.from_table( tbl_w_meta.tbl, # TODO: consider nested column names? tbl_w_meta.column_names(include_children=False), From 95ebf4d059a20f03929c1aedad907ee668f9074d Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Wed, 9 Oct 2024 07:02:11 -0700 Subject: [PATCH 03/24] fix nrows --- python/cudf_polars/cudf_polars/dsl/ir.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 6116c6a79d3..6e97bab79a0 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -342,7 +342,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: reader = plc.io.parquet.ChunkedParquetReader( plc.io.SourceInfo(self.paths), columns=with_columns, - num_rows=n_rows, + nrows=n_rows, skip_rows=self.skip_rows, chunk_read_limit=parquet_options.get("chunk_read_limit", 0), pass_read_limit=parquet_options.get("pass_read_limit", 1024000000), @@ -369,7 +369,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: tbl_w_meta = plc.io.parquet.read_parquet( plc.io.SourceInfo(self.paths), columns=with_columns, - num_rows=n_rows, + nrows=n_rows, skip_rows=self.skip_rows, ) df = DataFrame.from_table( From 6ddf12825e1f7ad15586740abc83e26a3dac5187 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Mon, 28 Oct 2024 19:30:02 -0700 Subject: [PATCH 04/24] updates, set defaults --- python/cudf_polars/cudf_polars/callback.py | 5 +---- .../cudf_polars/dsl/expressions/base.py | 7 ++++++- python/cudf_polars/cudf_polars/dsl/ir.py | 21 ++++++++++++++----- .../cudf_polars/cudf_polars/dsl/translate.py | 16 +++++++++++++- 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index 798896d30c2..d9a061d1336 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -18,7 +18,6 @@ import rmm from rmm._cuda import gpu -import cudf_polars.dsl.translate from cudf_polars.dsl.translate import translate_ir if TYPE_CHECKING: @@ -175,8 +174,6 @@ def execute_with_cudf( device = config.device memory_resource = config.memory_resource raise_on_fail = config.config.get("raise_on_fail", False) - parquet_options = config.config.get("parquet_options", {}) - cudf_polars.dsl.translate.ir.parquet_options = parquet_options if unsupported := (config.config.keys() - {"raise_on_fail", "parquet_options"}): raise ValueError( f"Engine configuration contains unsupported settings {unsupported}" @@ -186,7 +183,7 @@ def execute_with_cudf( nt.set_udf( partial( _callback, - translate_ir(nt), + translate_ir(nt, config), device=device, memory_resource=memory_resource, ) diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/base.py b/python/cudf_polars/cudf_polars/dsl/expressions/base.py index effe8cb2378..8f43491aee5 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/base.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/base.py @@ -18,6 +18,8 @@ if TYPE_CHECKING: from collections.abc import Mapping + from polars import GPUEngine + from cudf_polars.containers import Column, DataFrame __all__ = ["Expr", "NamedExpr", "Col", "AggInfo", "ExecutionContext"] @@ -36,13 +38,16 @@ class ExecutionContext(IntEnum): class Expr(Node["Expr"]): """An abstract expression object.""" - __slots__ = ("dtype",) + __slots__ = ("dtype", "_config") dtype: plc.DataType """Data type of the expression.""" # This annotation is needed because of https://github.com/python/mypy/issues/17981 _non_child: ClassVar[tuple[str, ...]] = ("dtype",) """Names of non-child data (not Exprs) for reconstruction.""" + _config: GPUEngine + """GPU engine configuration.""" + def do_evaluate( self, df: DataFrame, diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 9cfc6e3d44a..970274be35a 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -34,9 +34,12 @@ from collections.abc import Callable, Hashable, MutableMapping, Sequence from typing import Literal + from polars import GPUEngine + from cudf_polars.typing import Schema -parquet_options: dict[str, Any] = {} +PARQUET_DEFAULT_CHUNK_SIZE = 0 +PARQUET_DEFAULT_PASS_LIMIT = 17179869184 # 16GiB __all__ = [ "IR", @@ -126,12 +129,15 @@ def broadcast(*columns: Column, target_length: int | None = None) -> list[Column class IR(Node["IR"]): """Abstract plan node, representing an unevaluated dataframe.""" - __slots__ = ("schema",) + __slots__ = ("schema", "_config") # This annotation is needed because of https://github.com/python/mypy/issues/17981 _non_child: ClassVar[tuple[str, ...]] = ("schema",) schema: Schema """Mapping from column names to their data types.""" + _config: GPUEngine + """GPU engine configuration.""" + def get_hashable(self) -> Hashable: """ Hashable representation of node, treating schema dictionary. @@ -419,14 +425,19 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: colnames[0], ) elif self.typ == "parquet": - if parquet_options.get("chunked", True): + parquet_options = self._config.config.get("parquet_options", {}) + if parquet_options.get("chunked", False): reader = plc.io.parquet.ChunkedParquetReader( plc.io.SourceInfo(self.paths), columns=with_columns, nrows=n_rows, skip_rows=self.skip_rows, - chunk_read_limit=parquet_options.get("chunk_read_limit", 0), - pass_read_limit=parquet_options.get("pass_read_limit", 1024000000), + chunk_read_limit=parquet_options.get( + "chunk_read_limit", PARQUET_DEFAULT_CHUNK_SIZE + ), + pass_read_limit=parquet_options.get( + "pass_read_limit", PARQUET_DEFAULT_PASS_LIMIT + ), ) chk = reader.read_chunk() tbl = chk.tbl diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index c28f2c2651a..a11417d1fb6 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -17,9 +17,12 @@ import polars as pl import polars.polars as plrs +from polars import GPUEngine from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir from cudf_polars.dsl import expr, ir +from cudf_polars.dsl.expr import Expr +from cudf_polars.dsl.ir import IR from cudf_polars.dsl.traversal import make_recursive, reuse_if_unchanged from cudf_polars.typing import NodeTraverser from cudf_polars.utils import dtypes, sorting @@ -27,6 +30,7 @@ if TYPE_CHECKING: from cudf_polars.typing import ExprTransformer + __all__ = ["translate_ir", "translate_named_expr"] @@ -361,7 +365,9 @@ def _( return ir.HConcat(schema, *(translate_ir(visitor, n=n) for n in node.inputs)) -def translate_ir(visitor: NodeTraverser, *, n: int | None = None) -> ir.IR: +def translate_ir( + visitor: NodeTraverser, config: GPUEngine = None, *, n: int | None = None +) -> ir.IR: """ Translate a polars-internal IR node to our representation. @@ -369,6 +375,8 @@ def translate_ir(visitor: NodeTraverser, *, n: int | None = None) -> ir.IR: ---------- visitor Polars NodeTraverser object + config + GPUEngine configuration object n Optional node to start traversing from, if not provided uses current polars-internal node. @@ -382,6 +390,9 @@ def translate_ir(visitor: NodeTraverser, *, n: int | None = None) -> ir.IR: NotImplementedError If we can't translate the nodes due to unsupported functionality. """ + if not config: + config = GPUEngine() + ctx: AbstractContextManager[None] = ( set_node(visitor, n) if n is not None else noop_context ) @@ -393,6 +404,9 @@ def translate_ir(visitor: NodeTraverser, *, n: int | None = None) -> ir.IR: f"No support for polars IR {version=}" ) # pragma: no cover; no such version for now. + IR._config = config + Expr._config = config + with ctx: polars_schema = visitor.get_schema() node = visitor.view_current_node() From fea77d79dd4d369dda7e0a734373783972d0bc6c Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 29 Oct 2024 20:22:59 -0700 Subject: [PATCH 05/24] pass config through evaluate --- python/cudf_polars/cudf_polars/callback.py | 6 +- .../cudf_polars/dsl/expressions/base.py | 7 +- python/cudf_polars/cudf_polars/dsl/ir.py | 111 ++++++++++++------ .../cudf_polars/cudf_polars/dsl/translate.py | 5 - .../cudf_polars/tests/dsl/test_traversal.py | 6 +- .../tests/expressions/test_sort.py | 2 +- 6 files changed, 81 insertions(+), 56 deletions(-) diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index d9a061d1336..aa8cc3de70f 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -129,6 +129,7 @@ def set_device(device: int | None) -> Generator[int, None, None]: def _callback( ir: IR, + config: GPUEngine, with_columns: list[str] | None, pyarrow_predicate: str | None, n_rows: int | None, @@ -145,7 +146,7 @@ def _callback( set_device(device), set_memory_resource(memory_resource), ): - return ir.evaluate(cache={}).to_polars() + return ir.evaluate(cache={}, config=config).to_polars() def execute_with_cudf( @@ -183,7 +184,8 @@ def execute_with_cudf( nt.set_udf( partial( _callback, - translate_ir(nt, config), + translate_ir(nt), + config, device=device, memory_resource=memory_resource, ) diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/base.py b/python/cudf_polars/cudf_polars/dsl/expressions/base.py index 8f43491aee5..effe8cb2378 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/base.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/base.py @@ -18,8 +18,6 @@ if TYPE_CHECKING: from collections.abc import Mapping - from polars import GPUEngine - from cudf_polars.containers import Column, DataFrame __all__ = ["Expr", "NamedExpr", "Col", "AggInfo", "ExecutionContext"] @@ -38,16 +36,13 @@ class ExecutionContext(IntEnum): class Expr(Node["Expr"]): """An abstract expression object.""" - __slots__ = ("dtype", "_config") + __slots__ = ("dtype",) dtype: plc.DataType """Data type of the expression.""" # This annotation is needed because of https://github.com/python/mypy/issues/17981 _non_child: ClassVar[tuple[str, ...]] = ("dtype",) """Names of non-child data (not Exprs) for reconstruction.""" - _config: GPUEngine - """GPU engine configuration.""" - def do_evaluate( self, df: DataFrame, diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 970274be35a..a116d167d68 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -129,15 +129,12 @@ def broadcast(*columns: Column, target_length: int | None = None) -> list[Column class IR(Node["IR"]): """Abstract plan node, representing an unevaluated dataframe.""" - __slots__ = ("schema", "_config") + __slots__ = ("schema",) # This annotation is needed because of https://github.com/python/mypy/issues/17981 _non_child: ClassVar[tuple[str, ...]] = ("schema",) schema: Schema """Mapping from column names to their data types.""" - _config: GPUEngine - """GPU engine configuration.""" - def get_hashable(self) -> Hashable: """ Hashable representation of node, treating schema dictionary. @@ -151,7 +148,9 @@ def get_hashable(self) -> Hashable: schema_hash = tuple(self.schema.items()) return (type(self), schema_hash, args) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """ Evaluate the node and return a dataframe. @@ -160,6 +159,8 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: cache Mapping from cached node ids to constructed DataFrames. Used to implement evaluation of the `Cache` node. + config + GPU engine configuration. Returns ------- @@ -346,7 +347,9 @@ def get_hashable(self) -> Hashable: self.predicate, ) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" with_columns = self.with_columns row_index = self.row_index @@ -425,7 +428,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: colnames[0], ) elif self.typ == "parquet": - parquet_options = self._config.config.get("parquet_options", {}) + parquet_options = config.config.get("parquet_options", {}) if parquet_options.get("chunked", False): reader = plc.io.parquet.ChunkedParquetReader( plc.io.SourceInfo(self.paths), @@ -540,13 +543,17 @@ def __init__(self, schema: Schema, key: int, value: IR): self.key = key self.children = (value,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" try: return cache[self.key] except KeyError: (value,) = self.children - return cache.setdefault(self.key, value.evaluate(cache=cache)) + return cache.setdefault( + self.key, value.evaluate(cache=cache, config=config) + ) class DataFrameScan(IR): @@ -588,7 +595,9 @@ def get_hashable(self) -> Hashable: schema_hash = tuple(self.schema.items()) return (type(self), schema_hash, id(self.df), self.projection, self.predicate) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" pdf = pl.DataFrame._from_pydf(self.df) if self.projection is not None: @@ -627,10 +636,12 @@ def __init__( self.should_broadcast = should_broadcast self.children = (df,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" (child,) = self.children - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) # Handle any broadcasting columns = [e.evaluate(df) for e in self.exprs] if self.should_broadcast: @@ -658,11 +669,11 @@ def __init__( self.children = (df,) def evaluate( - self, *, cache: MutableMapping[int, DataFrame] + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine ) -> DataFrame: # pragma: no cover; polars doesn't emit this node yet """Evaluate and return a dataframe.""" (child,) = self.children - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) columns = broadcast(*(e.evaluate(df) for e in self.exprs)) assert all(column.obj.size() == 1 for column in columns) return DataFrame(columns) @@ -741,10 +752,12 @@ def check_agg(agg: expr.Expr) -> int: else: raise NotImplementedError(f"No handler for {agg=}") - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" (child,) = self.children - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) keys = broadcast( *(k.evaluate(df) for k in self.keys), target_length=df.num_rows ) @@ -970,9 +983,11 @@ def _reorder_maps( [plc.types.NullOrder.AFTER, plc.types.NullOrder.AFTER], ).columns() - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" - left, right = (c.evaluate(cache=cache) for c in self.children) + left, right = (c.evaluate(cache=cache, config=config) for c in self.children) how, join_nulls, zlice, suffix, coalesce = self.options if how == "cross": # Separate implementation, since cross_join returns the @@ -1079,10 +1094,12 @@ def __init__( self.should_broadcast = should_broadcast self.children = (df,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" (child,) = self.children - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) columns = [c.evaluate(df) for c in self.columns] if self.should_broadcast: columns = broadcast(*columns, target_length=df.num_rows) @@ -1136,10 +1153,12 @@ def __init__( "any": plc.stream_compaction.DuplicateKeepOption.KEEP_ANY, } - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" (child,) = self.children - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) if self.subset is None: indices = list(range(df.num_columns)) keys_sorted = all(c.is_sorted for c in df.column_map.values()) @@ -1212,10 +1231,12 @@ def __init__( self.zlice = zlice self.children = (df,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" (child,) = self.children - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) sort_keys = broadcast( *(k.evaluate(df) for k in self.by), target_length=df.num_rows ) @@ -1265,10 +1286,12 @@ def __init__(self, schema: Schema, offset: int, length: int, df: IR): self.length = length self.children = (df,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" (child,) = self.children - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) return df.slice((self.offset, self.length)) @@ -1285,10 +1308,12 @@ def __init__(self, schema: Schema, mask: expr.NamedExpr, df: IR): self.mask = mask self.children = (df,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" (child,) = self.children - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) (mask,) = broadcast(self.mask.evaluate(df), target_length=df.num_rows) return df.filter(mask) @@ -1303,10 +1328,12 @@ def __init__(self, schema: Schema, df: IR): self.schema = schema self.children = (df,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" (child,) = self.children - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) # This can reorder things. columns = broadcast( *(df.column_map[name] for name in self.schema), target_length=df.num_rows @@ -1374,21 +1401,23 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR): ) self.options = (tuple(indices), tuple(pivotees), variable_name, value_name) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" (child,) = self.children if self.name == "rechunk": # No-op in our data model # Don't think this appears in a plan tree from python - return child.evaluate(cache=cache) # pragma: no cover + return child.evaluate(cache=cache, config=config) # pragma: no cover elif self.name == "rename": - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) # final tag is "swapping" which is useful for the # optimiser (it blocks some pushdown operations) old, new, _ = self.options return df.rename_columns(dict(zip(old, new, strict=True))) elif self.name == "explode": - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) ((to_explode,),) = self.options index = df.column_names.index(to_explode) subset = df.column_names_set - {to_explode} @@ -1398,7 +1427,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: elif self.name == "unpivot": indices, pivotees, variable_name, value_name = self.options npiv = len(pivotees) - df = child.evaluate(cache=cache) + df = child.evaluate(cache=cache, config=config) index_columns = [ Column(col, name=name) for col, name in zip( @@ -1453,10 +1482,12 @@ def __init__(self, schema: Schema, zlice: tuple[int, int] | None, *children: IR) if not all(s.schema == schema for s in self.children[1:]): raise NotImplementedError("Schema mismatch") - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" # TODO: only evaluate what we need if we have a slice - dfs = [df.evaluate(cache=cache) for df in self.children] + dfs = [df.evaluate(cache=cache, config=config) for df in self.children] return DataFrame.from_table( plc.concatenate.concatenate([df.table for df in dfs]), dfs[0].column_names ).slice(self.zlice) @@ -1500,9 +1531,11 @@ def _extend_with_nulls(table: plc.Table, *, nrows: int) -> plc.Table: ] ) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" - dfs = [df.evaluate(cache=cache) for df in self.children] + dfs = [df.evaluate(cache=cache, config=config) for df in self.children] max_rows = max(df.num_rows for df in dfs) # Horizontal concatenation extends shorter tables with nulls dfs = [ diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index a11417d1fb6..412a20e3db2 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -21,8 +21,6 @@ from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir from cudf_polars.dsl import expr, ir -from cudf_polars.dsl.expr import Expr -from cudf_polars.dsl.ir import IR from cudf_polars.dsl.traversal import make_recursive, reuse_if_unchanged from cudf_polars.typing import NodeTraverser from cudf_polars.utils import dtypes, sorting @@ -404,9 +402,6 @@ def translate_ir( f"No support for polars IR {version=}" ) # pragma: no cover; no such version for now. - IR._config = config - Expr._config = config - with ctx: polars_schema = visitor.get_schema() node = visitor.view_current_node() diff --git a/python/cudf_polars/tests/dsl/test_traversal.py b/python/cudf_polars/tests/dsl/test_traversal.py index 6505a786855..56f425acdf4 100644 --- a/python/cudf_polars/tests/dsl/test_traversal.py +++ b/python/cudf_polars/tests/dsl/test_traversal.py @@ -124,7 +124,7 @@ def replace_df(node, rec): new = mapper(orig) - result = new.evaluate(cache={}).to_polars() + result = new.evaluate(cache={}, config=pl.GPUEngine()).to_polars() expect = pl.DataFrame({"a": [2, 1], "b": [-4, -3]}) @@ -153,7 +153,7 @@ def replace_scan(node, rec): orig = translate_ir(q._ldf.visit()) new = mapper(orig) - result = new.evaluate(cache={}).to_polars() + result = new.evaluate(cache={}, config=pl.GPUEngine()).to_polars() expect = q.collect() @@ -224,6 +224,6 @@ def _(node: ir.Select, fn: IRTransformer): new_ir = rewriter(qir) - got = new_ir.evaluate(cache={}).to_polars() + got = new_ir.evaluate(cache={}, config=pl.GPUEngine()).to_polars() assert_frame_equal(expect, got) diff --git a/python/cudf_polars/tests/expressions/test_sort.py b/python/cudf_polars/tests/expressions/test_sort.py index 2a37683478b..67c14c731d4 100644 --- a/python/cudf_polars/tests/expressions/test_sort.py +++ b/python/cudf_polars/tests/expressions/test_sort.py @@ -67,7 +67,7 @@ def test_setsorted(descending, nulls_last, with_nulls): assert_gpu_result_equal(q) - df = translate_ir(q._ldf.visit()).evaluate(cache={}) + df = translate_ir(q._ldf.visit()).evaluate(cache={}, config=pl.GPUEngine()) a = df.column_map["a"] From 53b0b2a08e719130b232df1f6a083a3f05973482 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Wed, 30 Oct 2024 18:44:33 -0700 Subject: [PATCH 06/24] a trial commit to test a different concatenation strategy --- python/cudf_polars/cudf_polars/dsl/ir.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index a116d167d68..c4c3de4623e 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -442,22 +442,18 @@ def evaluate( "pass_read_limit", PARQUET_DEFAULT_PASS_LIMIT ), ) + + chunks = [] # type: ignore chk = reader.read_chunk() tbl = chk.tbl + chunks.append(tbl) names = chk.column_names() - concatenated_columns = tbl.columns() while reader.has_next(): - tbl = reader.read_chunk().tbl - - for i in range(tbl.num_columns()): - concatenated_columns[i] = plc.concatenate.concatenate( - [concatenated_columns[i], tbl._columns[i]] - ) - # Drop residual columns to save memory - tbl._columns[i] = None + chunks.append(reader.read_chunk().tbl) + chunks = plc.concatenate.concatenate(chunks) df = DataFrame.from_table( - plc.Table(concatenated_columns), + chunks, names=names, ) else: From 62c277bd64b7b608dabd3a4bb23ef6b34150449a Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Thu, 7 Nov 2024 07:15:09 -0800 Subject: [PATCH 07/24] address reviews --- python/cudf_polars/cudf_polars/dsl/ir.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 49477e0c0a8..5ba6654e400 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -40,8 +40,6 @@ from cudf_polars.typing import Schema -PARQUET_DEFAULT_CHUNK_SIZE = 0 -PARQUET_DEFAULT_PASS_LIMIT = 17179869184 # 16GiB __all__ = [ "IR", @@ -283,6 +281,9 @@ class Scan(IR): predicate: expr.NamedExpr | None """Mask to apply to the read dataframe.""" + PARQUET_DEFAULT_CHUNK_SIZE: int = 0 + PARQUET_DEFAULT_PASS_LIMIT: int = 17179869184 # 16GiB + def __init__( self, schema: Schema, @@ -496,10 +497,10 @@ def do_evaluate( nrows=n_rows, skip_rows=skip_rows, chunk_read_limit=parquet_options.get( - "chunk_read_limit", PARQUET_DEFAULT_CHUNK_SIZE + "chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE ), pass_read_limit=parquet_options.get( - "pass_read_limit", PARQUET_DEFAULT_PASS_LIMIT + "pass_read_limit", cls.PARQUET_DEFAULT_PASS_LIMIT ), ) @@ -512,10 +513,7 @@ def do_evaluate( chunks.append(reader.read_chunk().tbl) chunks = plc.concatenate.concatenate(chunks) - df = DataFrame.from_table( - chunks, - names=names, - ) + df = DataFrame.from_table(chunks, names=names) else: filters = None if predicate is not None and row_index is None: From 13df5aaea2f6847488c81398861872d68b6f1ac6 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Thu, 7 Nov 2024 07:17:36 -0800 Subject: [PATCH 08/24] revert translate.py changes --- python/cudf_polars/cudf_polars/dsl/translate.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index af6e48fb643..5181214819e 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -16,7 +16,6 @@ import polars as pl import polars.polars as plrs -from polars import GPUEngine from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir import pylibcudf as plc @@ -29,7 +28,6 @@ if TYPE_CHECKING: from cudf_polars.typing import ExprTransformer - __all__ = ["translate_ir", "translate_named_expr"] @@ -364,9 +362,7 @@ def _( return ir.HConcat(schema, *(translate_ir(visitor, n=n) for n in node.inputs)) -def translate_ir( - visitor: NodeTraverser, config: GPUEngine = None, *, n: int | None = None -) -> ir.IR: +def translate_ir(visitor: NodeTraverser, *, n: int | None = None) -> ir.IR: """ Translate a polars-internal IR node to our representation. @@ -374,8 +370,6 @@ def translate_ir( ---------- visitor Polars NodeTraverser object - config - GPUEngine configuration object n Optional node to start traversing from, if not provided uses current polars-internal node. @@ -389,9 +383,6 @@ def translate_ir( NotImplementedError If we can't translate the nodes due to unsupported functionality. """ - if not config: - config = GPUEngine() - ctx: AbstractContextManager[None] = ( set_node(visitor, n) if n is not None else noop_context ) From 4aee59fd625a0aa854e9a02abdba0fcc9dcf6f91 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Thu, 7 Nov 2024 07:18:52 -0800 Subject: [PATCH 09/24] Revert "a trial commit to test a different concatenation strategy" This reverts commit 53b0b2a08e719130b232df1f6a083a3f05973482. --- python/cudf_polars/cudf_polars/dsl/ir.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 5ba6654e400..11c22371cbd 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -503,17 +503,24 @@ def do_evaluate( "pass_read_limit", cls.PARQUET_DEFAULT_PASS_LIMIT ), ) - - chunks = [] # type: ignore chk = reader.read_chunk() tbl = chk.tbl - chunks.append(tbl) names = chk.column_names() + concatenated_columns = tbl.columns() while reader.has_next(): - chunks.append(reader.read_chunk().tbl) + tbl = reader.read_chunk().tbl + + for i in range(tbl.num_columns()): + concatenated_columns[i] = plc.concatenate.concatenate( + [concatenated_columns[i], tbl._columns[i]] + ) + # Drop residual columns to save memory + tbl._columns[i] = None - chunks = plc.concatenate.concatenate(chunks) - df = DataFrame.from_table(chunks, names=names) + df = DataFrame.from_table( + plc.Table(concatenated_columns), + names=names, + ) else: filters = None if predicate is not None and row_index is None: From a113737374252bda2ca1c86a7e91fe2d2b469d25 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 8 Nov 2024 08:38:28 -0800 Subject: [PATCH 10/24] add docs --- .../cudf/source/cudf_polars/engine_options.md | 24 +++++++++++++++++++ docs/cudf/source/cudf_polars/index.rst | 6 +++++ 2 files changed, 30 insertions(+) create mode 100644 docs/cudf/source/cudf_polars/engine_options.md diff --git a/docs/cudf/source/cudf_polars/engine_options.md b/docs/cudf/source/cudf_polars/engine_options.md new file mode 100644 index 00000000000..9447047123a --- /dev/null +++ b/docs/cudf/source/cudf_polars/engine_options.md @@ -0,0 +1,24 @@ +# GPUEngine Configuration Options + +The `polars.GPUEngine` object may be configured in several different ways. + +## Parquet Reader Options + +Chunked reading is controlled by passing a dictionary of options to the `GPUEngine` object. Details may be found following the links to the underlying `libcudf` reader. +- `parquet_chunked`, indicicates is chunked parquet reading is to be used, default True. +- [chunk_read_limit](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum size per chunk, default unlimited. +- [pass_read_limit](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum memory used for decompression, default 16GiB. + +For example, one would pass these parameters as follows: +```python +engine = GPUEngine( + raise_on_fail=True, + parquet_options={ + 'parquet_chunked': True, + 'chunk_read_limit': int(1e9), + 'pass_read_limit': int(4e9) + } +) +result = query.collect(engine=engine) +``` +Note that passing `parquet_chunked: False` disables chunked reading entirely, and thus `chunk_read_limit` and `pass_read_limit` will have no effect. diff --git a/docs/cudf/source/cudf_polars/index.rst b/docs/cudf/source/cudf_polars/index.rst index 0a3a0d86b2c..6fd98a6b5da 100644 --- a/docs/cudf/source/cudf_polars/index.rst +++ b/docs/cudf/source/cudf_polars/index.rst @@ -39,3 +39,9 @@ Launch on Google Colab :target: https://colab.research.google.com/github/rapidsai-community/showcase/blob/main/accelerated_data_processing_examples/polars_gpu_engine_demo.ipynb Try out the GPU engine for Polars in a free GPU notebook environment. Sign in with your Google account and `launch the demo on Colab `__. + +.. toctree:: + :maxdepth: 1 + :caption: Engine Config Options: + + engine_options From 48d3edc922f725b11230fd7cc84b44e5255d220d Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 12 Nov 2024 12:38:32 -0800 Subject: [PATCH 11/24] add test coverage --- .../cudf_polars/cudf_polars/testing/asserts.py | 6 +++++- python/cudf_polars/tests/test_scan.py | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/python/cudf_polars/cudf_polars/testing/asserts.py b/python/cudf_polars/cudf_polars/testing/asserts.py index 2207545aa60..63f9fa505d9 100644 --- a/python/cudf_polars/cudf_polars/testing/asserts.py +++ b/python/cudf_polars/cudf_polars/testing/asserts.py @@ -19,10 +19,13 @@ __all__: list[str] = ["assert_gpu_result_equal", "assert_ir_translation_raises"] +DEFAULT_GPU_ENGINE: GPUEngine = GPUEngine() + def assert_gpu_result_equal( lazydf: pl.LazyFrame, *, + engine: GPUEngine = DEFAULT_GPU_ENGINE, collect_kwargs: dict[OptimizationArgs, bool] | None = None, polars_collect_kwargs: dict[OptimizationArgs, bool] | None = None, cudf_collect_kwargs: dict[OptimizationArgs, bool] | None = None, @@ -41,6 +44,8 @@ def assert_gpu_result_equal( ---------- lazydf frame to collect. + engine + Custom GPU engine configuration. collect_kwargs Common keyword arguments to pass to collect for both polars CPU and cudf-polars. @@ -81,7 +86,6 @@ def assert_gpu_result_equal( ) expect = lazydf.collect(**final_polars_collect_kwargs) - engine = GPUEngine(raise_on_fail=True) got = lazydf.collect(**final_cudf_collect_kwargs, engine=engine) assert_frame_equal( expect, diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 792b136acd8..f379004f458 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -323,6 +323,24 @@ def test_scan_parquet_only_row_index_raises(df, tmp_path): assert_ir_translation_raises(q, NotImplementedError) +@pytest.mark.parametrize("chunk_read_limit", [0, 1, 2, 4, 8, 16]) +@pytest.mark.parametrize("pass_read_limit", [0, 1, 2, 4, 8, 16]) +def test_scan_parquet_chunked(df, tmp_path, chunk_read_limit, pass_read_limit): + df = pl.concat([df] * 1000) # makes something large enough to meaningfully chunk + make_source(df, tmp_path / "file", "parquet") + q = pl.scan_parquet(tmp_path / "file") + assert_gpu_result_equal( + q, + engine=pl.GPUEngine( + parquet_options={ + "chunked": True, + "chunk_read_limit": chunk_read_limit, + "pass_read_limit": pass_read_limit, + } + ), + ) + + def test_scan_hf_url_raises(): q = pl.scan_csv("hf://datasets/scikit-learn/iris/Iris.csv") assert_ir_translation_raises(q, NotImplementedError) From 9c4c1bfea4e26152221a01f9c64be2398bae9414 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 12 Nov 2024 17:47:15 -0800 Subject: [PATCH 12/24] raise on fail true for default testing engine --- python/cudf_polars/cudf_polars/testing/asserts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf_polars/cudf_polars/testing/asserts.py b/python/cudf_polars/cudf_polars/testing/asserts.py index 63f9fa505d9..8930a342182 100644 --- a/python/cudf_polars/cudf_polars/testing/asserts.py +++ b/python/cudf_polars/cudf_polars/testing/asserts.py @@ -19,7 +19,7 @@ __all__: list[str] = ["assert_gpu_result_equal", "assert_ir_translation_raises"] -DEFAULT_GPU_ENGINE: GPUEngine = GPUEngine() +DEFAULT_GPU_ENGINE: GPUEngine = GPUEngine(raise_on_fail=True) def assert_gpu_result_equal( From d6aa66895ddeb11724a963e7fb236f067bd45375 Mon Sep 17 00:00:00 2001 From: brandon-b-miller <53796099+brandon-b-miller@users.noreply.github.com> Date: Wed, 13 Nov 2024 05:55:06 -0600 Subject: [PATCH 13/24] Apply suggestions from code review Co-authored-by: Lawrence Mitchell --- docs/cudf/source/cudf_polars/engine_options.md | 13 ++++++------- python/cudf_polars/cudf_polars/dsl/ir.py | 4 ++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/docs/cudf/source/cudf_polars/engine_options.md b/docs/cudf/source/cudf_polars/engine_options.md index 9447047123a..95696099a97 100644 --- a/docs/cudf/source/cudf_polars/engine_options.md +++ b/docs/cudf/source/cudf_polars/engine_options.md @@ -5,20 +5,19 @@ The `polars.GPUEngine` object may be configured in several different ways. ## Parquet Reader Options Chunked reading is controlled by passing a dictionary of options to the `GPUEngine` object. Details may be found following the links to the underlying `libcudf` reader. -- `parquet_chunked`, indicicates is chunked parquet reading is to be used, default True. -- [chunk_read_limit](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum size per chunk, default unlimited. -- [pass_read_limit](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum memory used for decompression, default 16GiB. +- `chunked`, indicates if chunked parquet reading is to be used, default False. +- [`chunk_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum size per chunk, default unlimited. +- [`pass_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum memory used for decompression, default 16GiB. -For example, one would pass these parameters as follows: +For example, to select the chunked reader with custom values for `pass_read_limit` and `chunk_read_limit`: ```python engine = GPUEngine( - raise_on_fail=True, parquet_options={ - 'parquet_chunked': True, + 'chunked': True, 'chunk_read_limit': int(1e9), 'pass_read_limit': int(4e9) } ) result = query.collect(engine=engine) ``` -Note that passing `parquet_chunked: False` disables chunked reading entirely, and thus `chunk_read_limit` and `pass_read_limit` will have no effect. +Note that passing `chunked: False` disables chunked reading entirely, and thus `chunk_read_limit` and `pass_read_limit` will have no effect. diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 9657c05ec2c..e9bc1f35530 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -301,8 +301,8 @@ class Scan(IR): predicate: expr.NamedExpr | None """Mask to apply to the read dataframe.""" - PARQUET_DEFAULT_CHUNK_SIZE: int = 0 - PARQUET_DEFAULT_PASS_LIMIT: int = 17179869184 # 16GiB + PARQUET_DEFAULT_CHUNK_SIZE: int = 0 # unlimited + PARQUET_DEFAULT_PASS_LIMIT: int = 16 * 1024**3 # 16GiB def __init__( self, From a06f0ae3369419e6659c8d9c87f2c506c0853916 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Wed, 13 Nov 2024 03:55:21 -0800 Subject: [PATCH 14/24] reword Parquet Reader Options --- docs/cudf/source/cudf_polars/engine_options.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/cudf/source/cudf_polars/engine_options.md b/docs/cudf/source/cudf_polars/engine_options.md index 95696099a97..b88713121e5 100644 --- a/docs/cudf/source/cudf_polars/engine_options.md +++ b/docs/cudf/source/cudf_polars/engine_options.md @@ -3,9 +3,11 @@ The `polars.GPUEngine` object may be configured in several different ways. ## Parquet Reader Options +Reading large parquet files incurs significant memory overhead, especially when the files are compressed. This may lead to out of memory errors for some workflows. To mitigate this, the "chunked" parquet reader may be selected. When enabled, parquet files are read in chunks with a limit on the amount of memory that is used, at the cost of a small drop in performance. -Chunked reading is controlled by passing a dictionary of options to the `GPUEngine` object. Details may be found following the links to the underlying `libcudf` reader. -- `chunked`, indicates if chunked parquet reading is to be used, default False. + +To configure the parquet reader, we provide a dictionary of options to the `parquet_options` keyword of the `GPUEngine` object. Valid keys and values are: +- `chunked`, indicicates is chunked parquet reading is to be used, default True. - [`chunk_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum size per chunk, default unlimited. - [`pass_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum memory used for decompression, default 16GiB. From 9930d2eafac6965a9369d142543c591b7a0af5ee Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Wed, 13 Nov 2024 04:19:40 -0800 Subject: [PATCH 15/24] partially address reviews --- python/cudf_polars/cudf_polars/callback.py | 30 ++++++++++++++++--- .../cudf_polars/testing/asserts.py | 7 +++-- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index 6b5484d8562..a742ed66ffd 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -149,6 +149,30 @@ def _callback( return ir.evaluate(cache={}, config=config).to_polars() +def validate_config_options(config: dict) -> None: + """ + Validate the configuration options for the GPU engine. + + Parameters + ---------- + config + Configuration options to validate. + + Raises + ------ + ValueError + If the configuration contains unsupported options. + """ + if unsupported := (config.keys() - {"raise_on_fail", "parquet_options"}): + raise ValueError( + f"Engine configuration contains unsupported settings {unsupported}" + ) + if parquet_options := config.get("parquet_options", {}): + assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset( + parquet_options + ) + + def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: """ A post optimization callback that attempts to execute the plan with cudf. @@ -175,10 +199,8 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: device = config.device memory_resource = config.memory_resource raise_on_fail = config.config.get("raise_on_fail", False) - if unsupported := (config.config.keys() - {"raise_on_fail", "parquet_options"}): - raise ValueError( - f"Engine configuration contains unsupported settings {unsupported}" - ) + validate_config_options(config.config) + with nvtx.annotate(message="ConvertIR", domain="cudf_polars"): translator = Translator(nt) ir = translator.translate_ir() diff --git a/python/cudf_polars/cudf_polars/testing/asserts.py b/python/cudf_polars/cudf_polars/testing/asserts.py index 8930a342182..1821cfedfb8 100644 --- a/python/cudf_polars/cudf_polars/testing/asserts.py +++ b/python/cudf_polars/cudf_polars/testing/asserts.py @@ -19,13 +19,11 @@ __all__: list[str] = ["assert_gpu_result_equal", "assert_ir_translation_raises"] -DEFAULT_GPU_ENGINE: GPUEngine = GPUEngine(raise_on_fail=True) - def assert_gpu_result_equal( lazydf: pl.LazyFrame, *, - engine: GPUEngine = DEFAULT_GPU_ENGINE, + engine: GPUEngine | None = None, collect_kwargs: dict[OptimizationArgs, bool] | None = None, polars_collect_kwargs: dict[OptimizationArgs, bool] | None = None, cudf_collect_kwargs: dict[OptimizationArgs, bool] | None = None, @@ -81,6 +79,9 @@ def assert_gpu_result_equal( NotImplementedError If GPU collection failed in some way. """ + if engine is None: + engine = GPUEngine(raise_on_fail=True) + final_polars_collect_kwargs, final_cudf_collect_kwargs = _process_kwargs( collect_kwargs, polars_collect_kwargs, cudf_collect_kwargs ) From b2530a4e4ebd4c77cfab14404488c14dacf888b0 Mon Sep 17 00:00:00 2001 From: brandon-b-miller <53796099+brandon-b-miller@users.noreply.github.com> Date: Wed, 13 Nov 2024 06:23:40 -0600 Subject: [PATCH 16/24] Apply suggestions from code review Co-authored-by: Lawrence Mitchell --- docs/cudf/source/cudf_polars/engine_options.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/cudf/source/cudf_polars/engine_options.md b/docs/cudf/source/cudf_polars/engine_options.md index b88713121e5..10a1d915530 100644 --- a/docs/cudf/source/cudf_polars/engine_options.md +++ b/docs/cudf/source/cudf_polars/engine_options.md @@ -3,11 +3,11 @@ The `polars.GPUEngine` object may be configured in several different ways. ## Parquet Reader Options -Reading large parquet files incurs significant memory overhead, especially when the files are compressed. This may lead to out of memory errors for some workflows. To mitigate this, the "chunked" parquet reader may be selected. When enabled, parquet files are read in chunks with a limit on the amount of memory that is used, at the cost of a small drop in performance. +Reading large parquet files can use a large amount of memory, especially when the files are compressed. This may lead to out of memory errors for some workflows. To mitigate this, the "chunked" parquet reader may be selected. When enabled, parquet files are read in chunks with a limit on the amount of memory that is used, at the cost of a small drop in performance. To configure the parquet reader, we provide a dictionary of options to the `parquet_options` keyword of the `GPUEngine` object. Valid keys and values are: -- `chunked`, indicicates is chunked parquet reading is to be used, default True. +- `chunked`, indicates is chunked parquet reading is to be used, default True. - [`chunk_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum size per chunk, default unlimited. - [`pass_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum memory used for decompression, default 16GiB. From 9958fe947393a6c63566706f71c34c4d12498af8 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Wed, 13 Nov 2024 04:22:48 -0800 Subject: [PATCH 17/24] chunk on by default --- python/cudf_polars/cudf_polars/dsl/ir.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index e9bc1f35530..3d7b71081a3 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -510,7 +510,7 @@ def do_evaluate( ) elif typ == "parquet": parquet_options = config.config.get("parquet_options", {}) - if parquet_options.get("chunked", False): + if parquet_options.get("chunked", True): reader = plc.io.parquet.ChunkedParquetReader( plc.io.SourceInfo(paths), columns=with_columns, From d33ec5e0589dd04c94535a432eeff0887fb12130 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Wed, 13 Nov 2024 04:29:40 -0800 Subject: [PATCH 18/24] turn OFF chunking in existing parquet tests --- python/cudf_polars/tests/test_scan.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index f379004f458..0c19ce9a0eb 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -13,6 +13,8 @@ assert_ir_translation_raises, ) +NO_CHUNK_ENGINE = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": False}) + @pytest.fixture( params=[(None, None), ("row-index", 0), ("index", 10)], @@ -117,7 +119,7 @@ def test_scan( q = q.filter(mask) if columns is not None: q = q.select(*columns) - assert_gpu_result_equal(q) + assert_gpu_result_equal(q, engine=NO_CHUNK_ENGINE) def test_negative_slice_pushdown_raises(tmp_path): @@ -153,7 +155,7 @@ def test_scan_row_index_projected_out(tmp_path): q = pl.scan_parquet(tmp_path / "df.pq").with_row_index().select(pl.col("a")) - assert_gpu_result_equal(q) + assert_gpu_result_equal(q, engine=NO_CHUNK_ENGINE) def test_scan_csv_column_renames_projection_schema(tmp_path): @@ -332,11 +334,12 @@ def test_scan_parquet_chunked(df, tmp_path, chunk_read_limit, pass_read_limit): assert_gpu_result_equal( q, engine=pl.GPUEngine( + raise_on_fail=True, parquet_options={ "chunked": True, "chunk_read_limit": chunk_read_limit, "pass_read_limit": pass_read_limit, - } + }, ), ) From 2be2847bf95aaca2f8453ccfd3f9d840fe0d1078 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Wed, 13 Nov 2024 08:32:01 -0800 Subject: [PATCH 19/24] disable slice pushdown with parquet --- python/cudf_polars/cudf_polars/dsl/ir.py | 4 +-- python/cudf_polars/tests/test_scan.py | 39 ++++++++++++++++++++---- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 3d7b71081a3..8130b18333c 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -349,10 +349,10 @@ def __init__( # TODO: polars has this implemented for parquet, # maybe we can do this too? raise NotImplementedError("slice pushdown for negative slices") - if self.typ == "csv" and self.skip_rows != 0: # pragma: no cover + if self.typ in {"csv", "parquet"} and self.skip_rows != 0: # pragma: no cover # This comes from slice pushdown, but that # optimization doesn't happen right now - raise NotImplementedError("skipping rows in CSV reader") + raise NotImplementedError("skipping rows in CSV or Parquet reader") if self.cloud_options is not None and any( self.cloud_options.get(k) is not None for k in ("aws", "azure", "gcp") ): diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 0c19ce9a0eb..dbc7a0a0f51 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -16,6 +16,11 @@ NO_CHUNK_ENGINE = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": False}) +@pytest.fixture(params=[True, False], ids=["chunked", "no-chunked"]) +def chunked(request): + return request.param + + @pytest.fixture( params=[(None, None), ("row-index", 0), ("index", 10)], ids=["no-row-index", "zero-offset-row-index", "offset-row-index"], @@ -97,7 +102,17 @@ def make_source(df, path, format): ], ) def test_scan( - tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, slice, request + tmp_path, + df, + format, + scan_fn, + row_index, + n_rows, + columns, + mask, + slice, + chunked, + request, ): name, offset = row_index make_source(df, tmp_path / "file", format) @@ -115,11 +130,23 @@ def test_scan( ) if slice is not None: q = q.slice(*slice) - if mask is not None: - q = q.filter(mask) - if columns is not None: - q = q.select(*columns) - assert_gpu_result_equal(q, engine=NO_CHUNK_ENGINE) + if scan_fn is pl.scan_parquet and slice[0] != 0: + # slicing a scan optimizes to a skip_rows which + # the chunked reader does not yet support + assert_ir_translation_raises(q, NotImplementedError) + else: + assert_gpu_result_equal(q) + else: + if mask is not None: + q = q.filter(mask) + if columns is not None: + q = q.select(*columns) + assert_gpu_result_equal( + q, + engine=pl.GPUEngine( + raise_on_fail=True, parquet_options={"chunked": chunked} + ), + ) def test_negative_slice_pushdown_raises(tmp_path): From c23afd9f42f2737a99e0b8379ab720bf3ad12e8a Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Thu, 14 Nov 2024 13:05:15 +0000 Subject: [PATCH 20/24] Test parquet filters with chunking off and on --- python/cudf_polars/tests/test_parquet_filters.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/python/cudf_polars/tests/test_parquet_filters.py b/python/cudf_polars/tests/test_parquet_filters.py index 545a89250fc..5c5f927e4f4 100644 --- a/python/cudf_polars/tests/test_parquet_filters.py +++ b/python/cudf_polars/tests/test_parquet_filters.py @@ -5,7 +5,8 @@ import pytest import polars as pl -from polars.testing import assert_frame_equal + +from cudf_polars.testing.asserts import assert_gpu_result_equal @pytest.fixture(scope="module") @@ -50,11 +51,9 @@ def pq_file(tmp_path_factory, df): ], ) @pytest.mark.parametrize("selection", [["c", "b"], ["a"], ["a", "c"], ["b"], "c"]) -def test_scan_by_hand(expr, selection, pq_file): - df = pq_file.collect() +@pytest.mark.parametrize("chunked", [False, True], ids=["unchunked", "chunked"]) +def test_scan_by_hand(expr, selection, pq_file, chunked): q = pq_file.filter(expr).select(*selection) - # Not using assert_gpu_result_equal because - # https://github.com/pola-rs/polars/issues/19238 - got = q.collect(engine=pl.GPUEngine(raise_on_fail=True)) - expect = df.filter(expr).select(*selection) - assert_frame_equal(got, expect) + assert_gpu_result_equal( + q, engine=pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": chunked}) + ) From df341ea408c76d29a2456493be9644fbd71abecd Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Thu, 14 Nov 2024 13:05:45 +0000 Subject: [PATCH 21/24] Implement workaround for #16186 Rather than falling back to CPU for chunked read + skip_rows, just read chunks and skip manually after the fact. Simplify the parquet scan tests a bit and add better coverage of both chunked and unchunked behaviour. --- python/cudf_polars/cudf_polars/dsl/ir.py | 32 ++++-- python/cudf_polars/tests/test_scan.py | 140 ++++++++++++++--------- 2 files changed, 113 insertions(+), 59 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 8130b18333c..dd54c9202a4 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -349,10 +349,10 @@ def __init__( # TODO: polars has this implemented for parquet, # maybe we can do this too? raise NotImplementedError("slice pushdown for negative slices") - if self.typ in {"csv", "parquet"} and self.skip_rows != 0: # pragma: no cover + if self.typ in {"csv"} and self.skip_rows != 0: # pragma: no cover # This comes from slice pushdown, but that # optimization doesn't happen right now - raise NotImplementedError("skipping rows in CSV or Parquet reader") + raise NotImplementedError("skipping rows in CSV reader") if self.cloud_options is not None and any( self.cloud_options.get(k) is not None for k in ("aws", "azure", "gcp") ): @@ -514,8 +514,14 @@ def do_evaluate( reader = plc.io.parquet.ChunkedParquetReader( plc.io.SourceInfo(paths), columns=with_columns, - nrows=n_rows, - skip_rows=skip_rows, + # We handle skip_rows != 0 by reading from the + # up to n_rows + skip_rows and slicing off the + # first skip_rows entries. + # TODO: Remove this workaround once + # https://github.com/rapidsai/cudf/issues/16186 + # is fixed + nrows=n_rows + skip_rows, + skip_rows=0, chunk_read_limit=parquet_options.get( "chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE ), @@ -524,11 +530,23 @@ def do_evaluate( ), ) chk = reader.read_chunk() - tbl = chk.tbl - names = chk.column_names() + rows_left_to_skip = skip_rows + + def slice_skip(tbl: plc.Table): + nonlocal rows_left_to_skip + if rows_left_to_skip > 0: + table_rows = tbl.num_rows() + chunk_skip = min(rows_left_to_skip, table_rows) + (tbl,) = plc.copying.slice(tbl, [chunk_skip, table_rows]) + rows_left_to_skip -= chunk_skip + return tbl + + tbl = slice_skip(chk.tbl) + # TODO: Nested column names + names = chk.column_names(include_children=False) concatenated_columns = tbl.columns() while reader.has_next(): - tbl = reader.read_chunk().tbl + tbl = slice_skip(reader.read_chunk().tbl) for i in range(tbl.num_columns()): concatenated_columns[i] = plc.concatenate.concatenate( diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index dbc7a0a0f51..572905102d5 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -16,22 +16,17 @@ NO_CHUNK_ENGINE = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": False}) -@pytest.fixture(params=[True, False], ids=["chunked", "no-chunked"]) -def chunked(request): - return request.param - - @pytest.fixture( params=[(None, None), ("row-index", 0), ("index", 10)], - ids=["no-row-index", "zero-offset-row-index", "offset-row-index"], + ids=["no_row_index", "zero_offset_row_index", "offset_row_index"], ) def row_index(request): return request.param @pytest.fixture( - params=[None, 2, 3], - ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"], + params=[None, 3], + ids=["all_rows", "some_rows"], ) def n_rows(request): return request.param @@ -58,21 +53,15 @@ def columns(request, row_index): @pytest.fixture( - params=[None, pl.col("c").is_not_null()], ids=["no-mask", "c-is-not-null"] + params=[None, pl.col("c").is_not_null()], ids=["no_mask", "c_is_not_null"] ) def mask(request): return request.param @pytest.fixture( - params=[ - None, - (1, 1), - ], - ids=[ - "no-slice", - "slice-second", - ], + params=[None, (1, 1)], + ids=["no_slice", "slice_second"], ) def slice(request): # For use in testing that we handle @@ -99,22 +88,16 @@ def make_source(df, path, format): ("csv", pl.scan_csv), ("ndjson", pl.scan_ndjson), ("parquet", pl.scan_parquet), + ("chunked_parquet", pl.scan_parquet), ], ) def test_scan( - tmp_path, - df, - format, - scan_fn, - row_index, - n_rows, - columns, - mask, - slice, - chunked, - request, + tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, slice, request ): name, offset = row_index + is_chunked = format == "chunked_parquet" + if is_chunked: + format = "parquet" make_source(df, tmp_path / "file", format) request.applymarker( pytest.mark.xfail( @@ -128,25 +111,40 @@ def test_scan( row_index_offset=offset, n_rows=n_rows, ) + engine = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": is_chunked}) + if ( + is_chunked + and (columns is None or columns[0] != "a") + and ( + # When we mask with the slice, it happens to remove the + # bad row + (mask is None and slice is not None) + # When we both slice and read a subset of rows it also + # removes the bad row + or (slice is None and n_rows is not None) + ) + ): + # slice read produces wrong result for string column + request.applymarker( + pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") + ) + # if ( + # (mask is None and n_rows is not None) + # or (n_rows is not None and mask is not None and slice is None) + # or (n_rows is None and mask is None and slice is not None) + # ): + # request.applymarker( + # pytest.mark.xfail( + # reason="https://github.com/rapidsai/cudf/issues/17311" + # ) + # ) if slice is not None: q = q.slice(*slice) - if scan_fn is pl.scan_parquet and slice[0] != 0: - # slicing a scan optimizes to a skip_rows which - # the chunked reader does not yet support - assert_ir_translation_raises(q, NotImplementedError) - else: - assert_gpu_result_equal(q) - else: - if mask is not None: - q = q.filter(mask) - if columns is not None: - q = q.select(*columns) - assert_gpu_result_equal( - q, - engine=pl.GPUEngine( - raise_on_fail=True, parquet_options={"chunked": chunked} - ), - ) + if mask is not None: + q = q.filter(mask) + if columns is not None: + q = q.select(*columns) + assert_gpu_result_equal(q, engine=engine) def test_negative_slice_pushdown_raises(tmp_path): @@ -352,14 +350,52 @@ def test_scan_parquet_only_row_index_raises(df, tmp_path): assert_ir_translation_raises(q, NotImplementedError) -@pytest.mark.parametrize("chunk_read_limit", [0, 1, 2, 4, 8, 16]) -@pytest.mark.parametrize("pass_read_limit", [0, 1, 2, 4, 8, 16]) -def test_scan_parquet_chunked(df, tmp_path, chunk_read_limit, pass_read_limit): - df = pl.concat([df] * 1000) # makes something large enough to meaningfully chunk - make_source(df, tmp_path / "file", "parquet") - q = pl.scan_parquet(tmp_path / "file") +@pytest.fixture( + scope="module", params=["no_slice", "skip_to_end", "skip_partial", "partial"] +) +def chunked_slice(request): + return request.param + + +@pytest.fixture(scope="module") +def large_df(df, tmpdir_factory, chunked_slice): + # Something big enough that we get more than a single chunk, + # empirically determined + df = pl.concat([df] * 1000) + df = pl.concat([df] * 10) + df = pl.concat([df] * 10) + path = str(tmpdir_factory.mktemp("data") / "large.pq") + make_source(df, path, "parquet") + n_rows = len(df) + q = pl.scan_parquet(path) + if chunked_slice == "no_slice": + return q + elif chunked_slice == "skip_to_end": + return q.slice(int(n_rows * 0.6), n_rows) + elif chunked_slice == "skip_partial": + return q.slice(int(n_rows * 0.6), int(n_rows * 0.2)) + else: + return q.slice(0, int(n_rows * 0.6)) + + +@pytest.mark.parametrize( + "chunk_read_limit", [0, 1, 2, 4, 8, 16], ids=lambda x: f"chunk_{x}" +) +@pytest.mark.parametrize( + "pass_read_limit", [0, 1, 2, 4, 8, 16], ids=lambda x: f"pass_{x}" +) +def test_scan_parquet_chunked( + request, chunked_slice, large_df, chunk_read_limit, pass_read_limit +): + if chunked_slice in {"skip_partial", "partial"} and ( + chunk_read_limit == 0 and pass_read_limit != 0 + ): + request.applymarker( + pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") + ) + assert_gpu_result_equal( - q, + large_df, engine=pl.GPUEngine( raise_on_fail=True, parquet_options={ From beb24625bb67600619b105f4e4964af61cdfef9d Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Thu, 14 Nov 2024 14:01:33 +0000 Subject: [PATCH 22/24] xfail a polars test --- python/cudf_polars/cudf_polars/testing/plugin.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/cudf_polars/cudf_polars/testing/plugin.py b/python/cudf_polars/cudf_polars/testing/plugin.py index 080a1af6e19..2a9104d8c82 100644 --- a/python/cudf_polars/cudf_polars/testing/plugin.py +++ b/python/cudf_polars/cudf_polars/testing/plugin.py @@ -64,6 +64,7 @@ def pytest_configure(config: pytest.Config) -> None: "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read[False]": "Incomplete handling of projected reads with mismatching schemas, cudf#16394", "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_dtype_mismatch[False]": "Different exception raised, but correctly raises an exception", "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_missing_cols_from_first[False]": "Different exception raised, but correctly raises an exception", + "tests/unit/io/test_lazy_parquet.py::test_glob_n_rows": "https://github.com/rapidsai/cudf/issues/17311", "tests/unit/io/test_parquet.py::test_read_parquet_only_loads_selected_columns_15098": "Memory usage won't be correct due to GPU", "tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-False-none]": "Mismatching column read cudf#16394", "tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-False-none]": "Mismatching column read cudf#16394", From b398172003ef25d01200fd403e0efdaefc143fc2 Mon Sep 17 00:00:00 2001 From: brandon-b-miller <53796099+brandon-b-miller@users.noreply.github.com> Date: Thu, 14 Nov 2024 21:05:56 -0600 Subject: [PATCH 23/24] Apply suggestions from code review Co-authored-by: Vyas Ramasubramani --- docs/cudf/source/cudf_polars/engine_options.md | 8 ++++---- python/cudf_polars/cudf_polars/callback.py | 9 ++++----- python/cudf_polars/cudf_polars/dsl/ir.py | 3 +++ 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/docs/cudf/source/cudf_polars/engine_options.md b/docs/cudf/source/cudf_polars/engine_options.md index 10a1d915530..4c930c7392d 100644 --- a/docs/cudf/source/cudf_polars/engine_options.md +++ b/docs/cudf/source/cudf_polars/engine_options.md @@ -3,13 +3,13 @@ The `polars.GPUEngine` object may be configured in several different ways. ## Parquet Reader Options -Reading large parquet files can use a large amount of memory, especially when the files are compressed. This may lead to out of memory errors for some workflows. To mitigate this, the "chunked" parquet reader may be selected. When enabled, parquet files are read in chunks with a limit on the amount of memory that is used, at the cost of a small drop in performance. +Reading large parquet files can use a large amount of memory, especially when the files are compressed. This may lead to out of memory errors for some workflows. To mitigate this, the "chunked" parquet reader may be selected. When enabled, parquet files are read in chunks, limiting the peak memory usage at the cost of a small drop in performance. To configure the parquet reader, we provide a dictionary of options to the `parquet_options` keyword of the `GPUEngine` object. Valid keys and values are: -- `chunked`, indicates is chunked parquet reading is to be used, default True. -- [`chunk_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum size per chunk, default unlimited. -- [`pass_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum memory used for decompression, default 16GiB. +- `chunked` indicates that chunked parquet reading is to be used. By default, chunked reading is turned on. +- [`chunk_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum size per chunk. By default, the maximum chunk size is unlimited. +- [`pass_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum memory used for decompression. The default pass read limit is 16GiB. For example, to select the chunked reader with custom values for `pass_read_limit` and `chunk_read_limit`: ```python diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index a742ed66ffd..c446ce0384e 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -165,12 +165,11 @@ def validate_config_options(config: dict) -> None: """ if unsupported := (config.keys() - {"raise_on_fail", "parquet_options"}): raise ValueError( - f"Engine configuration contains unsupported settings {unsupported}" - ) - if parquet_options := config.get("parquet_options", {}): - assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset( - parquet_options + f"Engine configuration contains unsupported settings: {unsupported}" ) + assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset( + config.get("parquet_options", {}) + ) def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index dd54c9202a4..e44a0e0857a 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -537,6 +537,9 @@ def slice_skip(tbl: plc.Table): if rows_left_to_skip > 0: table_rows = tbl.num_rows() chunk_skip = min(rows_left_to_skip, table_rows) + # TODO: Check performance impact of skipping this + # call and creating an empty table manually when the + # slice would be empty (chunk_skip == table_rows). (tbl,) = plc.copying.slice(tbl, [chunk_skip, table_rows]) rows_left_to_skip -= chunk_skip return tbl From e67614a850db1474743d1b0411a4a9207724274a Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Fri, 15 Nov 2024 10:28:02 +0000 Subject: [PATCH 24/24] Remove commented code --- python/cudf_polars/tests/test_scan.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 572905102d5..61925b21a97 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -128,16 +128,6 @@ def test_scan( request.applymarker( pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") ) - # if ( - # (mask is None and n_rows is not None) - # or (n_rows is not None and mask is not None and slice is None) - # or (n_rows is None and mask is None and slice is not None) - # ): - # request.applymarker( - # pytest.mark.xfail( - # reason="https://github.com/rapidsai/cudf/issues/17311" - # ) - # ) if slice is not None: q = q.slice(*slice) if mask is not None: