diff --git a/dask_planner/src/sql/logical/sort.rs b/dask_planner/src/sql/logical/sort.rs index 0bdd67e23..06d35a28f 100644 --- a/dask_planner/src/sql/logical/sort.rs +++ b/dask_planner/src/sql/logical/sort.rs @@ -19,6 +19,11 @@ impl PySort { pub fn sort_expressions(&self) -> PyResult> { py_expr_list(&self.sort.input, &self.sort.expr) } + + #[pyo3(name = "getNumRows")] + pub fn get_fetch_val(&self) -> PyResult> { + Ok(self.sort.fetch) + } } impl TryFrom for PySort { diff --git a/dask_sql/physical/rel/logical/sort.py b/dask_sql/physical/rel/logical/sort.py index 9800df978..2e1376d41 100644 --- a/dask_sql/physical/rel/logical/sort.py +++ b/dask_sql/physical/rel/logical/sort.py @@ -20,16 +20,19 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai (dc,) = self.assert_inputs(rel, 1, context) df = dc.df cc = dc.column_container - sort_expressions = rel.sort().getCollation() + sort_plan = rel.sort() + sort_expressions = sort_plan.getCollation() sort_columns = [ cc.get_backend_by_frontend_name(expr.column_name(rel)) for expr in sort_expressions ] sort_ascending = [expr.isSortAscending() for expr in sort_expressions] sort_null_first = [expr.isSortNullsFirst() for expr in sort_expressions] + sort_num_rows = sort_plan.getNumRows() - df = df.persist() - df = apply_sort(df, sort_columns, sort_ascending, sort_null_first) + df = apply_sort( + df, sort_columns, sort_ascending, sort_null_first, sort_num_rows + ) cc = self.fix_column_to_row_type(cc, rel.getRowType()) # No column type has changed, so no need to cast again diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index c2ccce3c1..e53134d44 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -2,6 +2,7 @@ import dask.dataframe as dd import pandas as pd +from dask import config as dask_config from dask.utils import M from dask_sql.utils import make_pickable_without_dask_sql @@ -12,6 +13,7 @@ def apply_sort( sort_columns: List[str], sort_ascending: List[bool], sort_null_first: List[bool], + sort_num_rows: int = None, ) -> dd.DataFrame: # when sort_values doesn't support lists of ascending / null # position booleans, we can still do the sort provided that @@ -19,6 +21,24 @@ def apply_sort( single_ascending = len(set(sort_ascending)) == 1 single_null_first = len(set(sort_null_first)) == 1 + if is_topk_optimizable( + df=df, + sort_columns=sort_columns, + single_ascending=single_ascending, + sort_null_first=sort_null_first, + sort_num_rows=sort_num_rows, + ): + return topk_sort( + df=df, + sort_columns=sort_columns, + sort_ascending=sort_ascending, + sort_num_rows=sort_num_rows, + ) + + else: + # Pre persist before sort to avoid duplicate compute + df = df.persist() + # pandas / cudf don't support lists of null positions if df.npartitions == 1 and single_null_first: return df.map_partitions( @@ -57,6 +77,18 @@ def apply_sort( ).persist() +def topk_sort( + df: dd.DataFrame, + sort_columns: List[str], + sort_ascending: List[bool], + sort_num_rows: int = None, +): + if sort_ascending[0]: + return df.nsmallest(n=sort_num_rows, columns=sort_columns) + else: + return df.nlargest(n=sort_num_rows, columns=sort_columns) + + def sort_partition_func( partition: pd.DataFrame, sort_columns: List[str], @@ -85,3 +117,29 @@ def sort_partition_func( ) return partition + + +def is_topk_optimizable( + df: dd.DataFrame, + sort_columns: List[str], + single_ascending: bool, + sort_null_first: List[bool], + sort_num_rows: int = None, +): + if ( + sort_num_rows is None + or not single_ascending + or any(sort_null_first) + # pandas doesnt support nsmallest/nlargest with object dtypes + or ( + "pandas" in str(df._partition_type) + and any(df[sort_columns].dtypes == "object") + ) + or ( + sort_num_rows * len(df.columns) + > dask_config.get("sql.sort.topk-nelem-limit") + ) + ): + return False + + return True diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index c6d5bd3c0..993bf0031 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -40,12 +40,24 @@ properties: queries, but can signicantly reduce memory usage when querying a small subset of a large table. Default is ``true``. - predicate_pushdown: + optimize: type: boolean description: | - Whether to try pushing down filter predicates into IO (when possible). + Whether the first generated logical plan should be further optimized or used as is. - optimize: + predicate_pushdown: type: boolean description: | - Whether the first generated logical plan should be further optimized or used as is. + Whether to try pushing down filter predicates into IO (when possible). + + sort: + type: object + properties: + + topk-nelem-limit: + type: integer + description: | + Total number of elements below which dask-sql should attempt to apply the top-k + optimization (when possible). ``nelem`` is defined as the limit or ``k`` value times the + number of columns. Default is 1000000, corresponding to a LIMIT clause of 1 million in a + 1 column table. diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index 5c175320d..22ff68f70 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -9,6 +9,9 @@ sql: limit: check-first-partition: True + optimize: True + predicate_pushdown: True - optimize: True + sort: + topk-nelem-limit: 1000000 diff --git a/tests/integration/test_sort.py b/tests/integration/test_sort.py index 8b1d125a9..1956a3bce 100644 --- a/tests/integration/test_sort.py +++ b/tests/integration/test_sort.py @@ -351,3 +351,86 @@ def test_sort_by_old_alias(c, input_table_1, request): df_expected = user_table_1.sort_values(["b", "user_id"], ascending=[True, False])[ ["b"] ] + + +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_topk(gpu): + c = Context() + df = pd.DataFrame( + { + "a": [float("nan"), 1] * 30, + "b": [1, 2, 3] * 20, + "c": ["a", "b", "c"] * 20, + } + ) + c.create_table("df", dd.from_pandas(df, npartitions=10), gpu=gpu) + + df_result = c.sql("""SELECT * FROM df ORDER BY a LIMIT 10""") + assert any(["nsmallest" in key for key in df_result.dask.layers.keys()]) + assert_eq( + df_result, + pd.DataFrame( + { + "a": [1.0] * 10, + "b": ([2, 1, 3] * 4)[:10], + "c": (["b", "a", "c"] * 4)[:10], + } + ), + check_index=False, + ) + + df_result = c.sql("""SELECT * FROM df ORDER BY a, b LIMIT 10""") + assert any(["nsmallest" in key for key in df_result.dask.layers.keys()]) + assert_eq( + df_result, + pd.DataFrame({"a": [1.0] * 10, "b": [1] * 10, "c": ["a"] * 10}), + check_index=False, + ) + + df_result = c.sql( + """SELECT * FROM df ORDER BY a DESC NULLS LAST, b DESC NULLS LAST LIMIT 10""" + ) + assert any(["nlargest" in key for key in df_result.dask.layers.keys()]) + assert_eq( + df_result, + pd.DataFrame({"a": [1.0] * 10, "b": [3] * 10, "c": ["c"] * 10}), + check_index=False, + ) + + # String column nlargest/smallest not supported for pandas + df_result = c.sql("""SELECT * FROM df ORDER BY c LIMIT 10""") + if not gpu: + assert all(["nlargest" not in key for key in df_result.dask.layers.keys()]) + assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()]) + else: + assert_eq( + df_result, + pd.DataFrame({"a": [float("nan"), 1] * 5, "b": [1] * 10, "c": ["a"] * 10}), + check_index=False, + ) + + # Assert that the optimization isn't applied when there is any nulls first + df_result = c.sql( + """SELECT * FROM df ORDER BY a DESC, b DESC NULLS LAST LIMIT 10""" + ) + assert all(["nlargest" not in key for key in df_result.dask.layers.keys()]) + assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()]) + + # Assert optimization isn't applied for mixed asc + desc sort + df_result = c.sql("""SELECT * FROM df ORDER BY a, b DESC NULLS LAST LIMIT 10""") + assert all(["nlargest" not in key for key in df_result.dask.layers.keys()]) + assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()]) + + # Assert optimization isn't applied when the number of requested elements + # exceed topk-nelem-limit config value + # Default topk-nelem-limit is 1M and 334k*3columns takes it above this limit + df_result = c.sql("""SELECT * FROM df ORDER BY a, b LIMIT 333334""") + assert all(["nlargest" not in key for key in df_result.dask.layers.keys()]) + assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()]) + + df_result = c.sql( + """SELECT * FROM df ORDER BY a, b LIMIT 10""", + config_options={"sql.sort.topk-nelem-limit": 29}, + ) + assert all(["nlargest" not in key for key in df_result.dask.layers.keys()]) + assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()]) diff --git a/tests/unit/test_queries.py b/tests/unit/test_queries.py index 012f84e2f..f35bd5750 100644 --- a/tests/unit/test_queries.py +++ b/tests/unit/test_queries.py @@ -3,7 +3,6 @@ import pytest XFAIL_QUERIES = ( - 4, 5, 6, 8,