From 36007fa379d46a8bb786c77d9ebbb41dcdbfec19 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 26 Oct 2022 10:04:14 -0700 Subject: [PATCH 01/11] Rust:Add method to retreive fetch rows during sort --- dask_planner/src/sql/logical/sort.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dask_planner/src/sql/logical/sort.rs b/dask_planner/src/sql/logical/sort.rs index 0bdd67e23..a3663ca7e 100644 --- a/dask_planner/src/sql/logical/sort.rs +++ b/dask_planner/src/sql/logical/sort.rs @@ -19,6 +19,11 @@ impl PySort { pub fn sort_expressions(&self) -> PyResult> { py_expr_list(&self.sort.input, &self.sort.expr) } + + #[pyo3(name = "getNumRows")] + pub fn get_fetch_val(&self) -> Option { + self.sort.fetch + } } impl TryFrom for PySort { From 262d0186714a21870c6b722e63390d1cfce992c3 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 26 Oct 2022 10:20:11 -0700 Subject: [PATCH 02/11] Update sort plugin to use nsmallest/largest if applicalbe --- dask_sql/physical/rel/logical/sort.py | 13 ++++++++++--- dask_sql/physical/utils/sort.py | 7 +++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/dask_sql/physical/rel/logical/sort.py b/dask_sql/physical/rel/logical/sort.py index 9800df978..6d3d36907 100644 --- a/dask_sql/physical/rel/logical/sort.py +++ b/dask_sql/physical/rel/logical/sort.py @@ -27,9 +27,16 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai ] sort_ascending = [expr.isSortAscending() for expr in sort_expressions] sort_null_first = [expr.isSortNullsFirst() for expr in sort_expressions] - - df = df.persist() - df = apply_sort(df, sort_columns, sort_ascending, sort_null_first) + sort_num_rows = rel.sort().getNumRows() + if ( + sort_num_rows is None + or any(sort_null_first) + or len(set(sort_ascending)) != 1 + ): + df = df.persist() + df = apply_sort( + df, sort_columns, sort_ascending, sort_null_first, sort_num_rows + ) cc = self.fix_column_to_row_type(cc, rel.getRowType()) # No column type has changed, so no need to cast again diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index c2ccce3c1..157e79013 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -12,6 +12,7 @@ def apply_sort( sort_columns: List[str], sort_ascending: List[bool], sort_null_first: List[bool], + sort_num_rows: int = None, ) -> dd.DataFrame: # when sort_values doesn't support lists of ascending / null # position booleans, we can still do the sort provided that @@ -19,6 +20,12 @@ def apply_sort( single_ascending = len(set(sort_ascending)) == 1 single_null_first = len(set(sort_null_first)) == 1 + if sort_num_rows is not None and single_ascending and not any(sort_null_first): + if sort_ascending[0]: + return df.nsmallest(n=sort_num_rows, columns=sort_columns) + else: + return df.nlargest(n=sort_num_rows, columns=sort_columns) + # pandas / cudf don't support lists of null positions if df.npartitions == 1 and single_null_first: return df.map_partitions( From 0b42662f5f3740ecc1e000e6af86a9b8bd358f76 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Thu, 27 Oct 2022 12:24:58 -0700 Subject: [PATCH 03/11] move topk optimization checks to it's own function --- dask_sql/physical/rel/logical/sort.py | 7 +--- dask_sql/physical/utils/sort.py | 53 ++++++++++++++++++++++++--- 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/dask_sql/physical/rel/logical/sort.py b/dask_sql/physical/rel/logical/sort.py index 6d3d36907..959b84232 100644 --- a/dask_sql/physical/rel/logical/sort.py +++ b/dask_sql/physical/rel/logical/sort.py @@ -28,12 +28,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai sort_ascending = [expr.isSortAscending() for expr in sort_expressions] sort_null_first = [expr.isSortNullsFirst() for expr in sort_expressions] sort_num_rows = rel.sort().getNumRows() - if ( - sort_num_rows is None - or any(sort_null_first) - or len(set(sort_ascending)) != 1 - ): - df = df.persist() + df = apply_sort( df, sort_columns, sort_ascending, sort_null_first, sort_num_rows ) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 157e79013..c94170160 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -20,11 +20,23 @@ def apply_sort( single_ascending = len(set(sort_ascending)) == 1 single_null_first = len(set(sort_null_first)) == 1 - if sort_num_rows is not None and single_ascending and not any(sort_null_first): - if sort_ascending[0]: - return df.nsmallest(n=sort_num_rows, columns=sort_columns) - else: - return df.nlargest(n=sort_num_rows, columns=sort_columns) + if is_topk_optimizable( + df=df, + sort_columns=sort_columns, + single_ascending=single_ascending, + sort_null_first=sort_null_first, + sort_num_rows=sort_num_rows, + ): + return topk_sort( + df=df, + sort_columns=sort_columns, + sort_ascending=sort_ascending, + sort_num_rows=sort_num_rows, + ) + + else: + # Pre persist before sort to avoid duplicate compute + df = df.persist() # pandas / cudf don't support lists of null positions if df.npartitions == 1 and single_null_first: @@ -64,6 +76,18 @@ def apply_sort( ).persist() +def topk_sort( + df: dd.DataFrame, + sort_columns: List[str], + sort_ascending: List[bool], + sort_num_rows: int = None, +): + if sort_ascending[0]: + return df.nsmallest(n=sort_num_rows, columns=sort_columns) + else: + return df.nlargest(n=sort_num_rows, columns=sort_columns) + + def sort_partition_func( partition: pd.DataFrame, sort_columns: List[str], @@ -92,3 +116,22 @@ def sort_partition_func( ) return partition + + +def is_topk_optimizable( + df: dd.DataFrame, + sort_columns: List[str], + single_ascending: bool, + sort_null_first: List[bool], + sort_num_rows: int = None, +): + if ( + sort_num_rows is None + or not single_ascending + or any(sort_null_first) + # pandas doesnt support nsmallest/nlargest with object dtypes + or ("pandas" in str(df._partition_type) and any(df[sort_columns] == "object")) + ): + return False + + return True From 7c91bf48942e11e40342c797473efe6741558fba Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Mon, 31 Oct 2022 16:07:54 -0700 Subject: [PATCH 04/11] Fix check for is_topk_optimizable --- dask_sql/physical/utils/sort.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index c94170160..bdcd89341 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -130,7 +130,10 @@ def is_topk_optimizable( or not single_ascending or any(sort_null_first) # pandas doesnt support nsmallest/nlargest with object dtypes - or ("pandas" in str(df._partition_type) and any(df[sort_columns] == "object")) + or ( + "pandas" in str(df._partition_type) + and any(df[sort_columns].dtypes == "object") + ) ): return False From 40e14e45fd3a4fc687df2ea262e316728ee1932b Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Mon, 31 Oct 2022 17:10:57 -0700 Subject: [PATCH 05/11] Add topk tests --- tests/integration/test_sort.py | 69 ++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/tests/integration/test_sort.py b/tests/integration/test_sort.py index 8b1d125a9..b3aaeb673 100644 --- a/tests/integration/test_sort.py +++ b/tests/integration/test_sort.py @@ -351,3 +351,72 @@ def test_sort_by_old_alias(c, input_table_1, request): df_expected = user_table_1.sort_values(["b", "user_id"], ascending=[True, False])[ ["b"] ] + + +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_topk(gpu): + c = Context() + df = pd.DataFrame( + { + "a": [float("nan"), 1] * 30, + "b": [1, 2, 3] * 20, + "c": ["a", "b", "c"] * 20, + } + ) + c.create_table("df", dd.from_pandas(df, npartitions=10), gpu=gpu) + + df_result = c.sql("""SELECT * FROM df ORDER BY a LIMIT 10""") + assert any(["nsmallest" in key for key in df_result.dask.layers.keys()]) + assert_eq( + df_result, + pd.DataFrame( + { + "a": [1.0] * 10, + "b": ([2, 1, 3] * 4)[:10], + "c": (["b", "a", "c"] * 4)[:10], + } + ), + check_index=False, + ) + + df_result = c.sql("""SELECT * FROM df ORDER BY a, b LIMIT 10""") + assert any(["nsmallest" in key for key in df_result.dask.layers.keys()]) + assert_eq( + df_result, + pd.DataFrame({"a": [1.0] * 10, "b": [1] * 10, "c": ["a"] * 10}), + check_index=False, + ) + + df_result = c.sql( + """SELECT * FROM df ORDER BY a DESC NULLS LAST, b DESC NULLS LAST LIMIT 10""" + ) + assert any(["nlargest" in key for key in df_result.dask.layers.keys()]) + assert_eq( + df_result, + pd.DataFrame({"a": [1.0] * 10, "b": [3] * 10, "c": ["c"] * 10}), + check_index=False, + ) + + # String column nlargest/smallest not supported for pandas + df_result = c.sql("""SELECT * FROM df ORDER BY c LIMIT 10""") + if not gpu: + assert all(["nlargest" not in key for key in df_result.dask.layers.keys()]) + assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()]) + else: + assert_eq( + df_result, + pd.DataFrame({"a": [float("nan"), 1] * 5, "b": [1] * 10, "c": ["a"] * 10}), + check_index=False, + ) + + # Assert that the optimization isn't applied when there is any nulls first + df_result = c.sql( + """SELECT * FROM df ORDER BY a DESC, b DESC NULLS LAST LIMIT 10""" + ) + assert all(["nlargest" not in key for key in df_result.dask.layers.keys()]) + assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()]) + + # Assert optimization isn't applied for mixed asc + desc sort + df_result = c.sql("""SELECT * FROM df ORDER BY a, b DESC NULLS LAST LIMIT 10""") + assert all(["nlargest" not in key for key in df_result.dask.layers.keys()]) + assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()]) From de0355b037aeddffca8ec82e99c607a03ae4faa4 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 1 Nov 2022 10:16:57 -0700 Subject: [PATCH 06/11] Un-xfail q4 --- tests/unit/test_queries.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/test_queries.py b/tests/unit/test_queries.py index 012f84e2f..f35bd5750 100644 --- a/tests/unit/test_queries.py +++ b/tests/unit/test_queries.py @@ -3,7 +3,6 @@ import pytest XFAIL_QUERIES = ( - 4, 5, 6, 8, From 36a79f8227d156f60b21fa2420d468c574e2251c Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Thu, 3 Nov 2022 16:47:46 -0700 Subject: [PATCH 07/11] Add sort topk-nelem-limit as config option --- dask_sql/sql-schema.yaml | 20 ++++++++++++++++---- dask_sql/sql.yaml | 5 ++++- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index c6d5bd3c0..52fe0536c 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -40,12 +40,24 @@ properties: queries, but can signicantly reduce memory usage when querying a small subset of a large table. Default is ``true``. - predicate_pushdown: + optimize: type: boolean description: | - Whether to try pushing down filter predicates into IO (when possible). + Whether the first generated logical plan should be further optimized or used as is. - optimize: + predicate_pushdown: type: boolean description: | - Whether the first generated logical plan should be further optimized or used as is. + Whether to try pushing down filter predicates into IO (when possible). + + sort: + type: object + properties: + + topk-nelem-limit: + type: integer + description: | + Configure the total number of elements below which dask should attempt to apply the top-k + optimization if possible. nelem is defined as the limit or ``k`` value times the number of + columns. Default is ``1000000`` corresponding to a LIMIT clause of 1 million in a 1 column + table. diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index 5c175320d..22ff68f70 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -9,6 +9,9 @@ sql: limit: check-first-partition: True + optimize: True + predicate_pushdown: True - optimize: True + sort: + topk-nelem-limit: 1000000 From 7eb095ef85e2d01ff0ee2a8a6418a23b15bbbeee Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Thu, 3 Nov 2022 16:49:34 -0700 Subject: [PATCH 08/11] Add check for topk config option to is_topk_optimizable --- dask_sql/physical/utils/sort.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index bdcd89341..e53134d44 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -2,6 +2,7 @@ import dask.dataframe as dd import pandas as pd +from dask import config as dask_config from dask.utils import M from dask_sql.utils import make_pickable_without_dask_sql @@ -134,6 +135,10 @@ def is_topk_optimizable( "pandas" in str(df._partition_type) and any(df[sort_columns].dtypes == "object") ) + or ( + sort_num_rows * len(df.columns) + > dask_config.get("sql.sort.topk-nelem-limit") + ) ): return False From 7d16e6d8423fd2d52439462be445079a6297a203 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Thu, 3 Nov 2022 16:49:53 -0700 Subject: [PATCH 09/11] Add more topk sort tests --- tests/integration/test_sort.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/integration/test_sort.py b/tests/integration/test_sort.py index b3aaeb673..1956a3bce 100644 --- a/tests/integration/test_sort.py +++ b/tests/integration/test_sort.py @@ -420,3 +420,17 @@ def test_sort_topk(gpu): df_result = c.sql("""SELECT * FROM df ORDER BY a, b DESC NULLS LAST LIMIT 10""") assert all(["nlargest" not in key for key in df_result.dask.layers.keys()]) assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()]) + + # Assert optimization isn't applied when the number of requested elements + # exceed topk-nelem-limit config value + # Default topk-nelem-limit is 1M and 334k*3columns takes it above this limit + df_result = c.sql("""SELECT * FROM df ORDER BY a, b LIMIT 333334""") + assert all(["nlargest" not in key for key in df_result.dask.layers.keys()]) + assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()]) + + df_result = c.sql( + """SELECT * FROM df ORDER BY a, b LIMIT 10""", + config_options={"sql.sort.topk-nelem-limit": 29}, + ) + assert all(["nlargest" not in key for key in df_result.dask.layers.keys()]) + assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()]) From e87f3c90ef130ebe80d1cfc2e9e81e53fa4f9399 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Fri, 4 Nov 2022 12:43:15 -0700 Subject: [PATCH 10/11] use common variable for rel.sort plan --- dask_sql/physical/rel/logical/sort.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dask_sql/physical/rel/logical/sort.py b/dask_sql/physical/rel/logical/sort.py index 959b84232..2e1376d41 100644 --- a/dask_sql/physical/rel/logical/sort.py +++ b/dask_sql/physical/rel/logical/sort.py @@ -20,14 +20,15 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai (dc,) = self.assert_inputs(rel, 1, context) df = dc.df cc = dc.column_container - sort_expressions = rel.sort().getCollation() + sort_plan = rel.sort() + sort_expressions = sort_plan.getCollation() sort_columns = [ cc.get_backend_by_frontend_name(expr.column_name(rel)) for expr in sort_expressions ] sort_ascending = [expr.isSortAscending() for expr in sort_expressions] sort_null_first = [expr.isSortNullsFirst() for expr in sort_expressions] - sort_num_rows = rel.sort().getNumRows() + sort_num_rows = sort_plan.getNumRows() df = apply_sort( df, sort_columns, sort_ascending, sort_null_first, sort_num_rows From 8e9c2c50ec2369c729f8abf27781b3ffd5765ffc Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Fri, 4 Nov 2022 12:45:02 -0700 Subject: [PATCH 11/11] Apply suggestions from code review Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> --- dask_planner/src/sql/logical/sort.rs | 4 ++-- dask_sql/sql-schema.yaml | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dask_planner/src/sql/logical/sort.rs b/dask_planner/src/sql/logical/sort.rs index a3663ca7e..06d35a28f 100644 --- a/dask_planner/src/sql/logical/sort.rs +++ b/dask_planner/src/sql/logical/sort.rs @@ -21,8 +21,8 @@ impl PySort { } #[pyo3(name = "getNumRows")] - pub fn get_fetch_val(&self) -> Option { - self.sort.fetch + pub fn get_fetch_val(&self) -> PyResult> { + Ok(self.sort.fetch) } } diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index 52fe0536c..993bf0031 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -57,7 +57,7 @@ properties: topk-nelem-limit: type: integer description: | - Configure the total number of elements below which dask should attempt to apply the top-k - optimization if possible. nelem is defined as the limit or ``k`` value times the number of - columns. Default is ``1000000`` corresponding to a LIMIT clause of 1 million in a 1 column - table. + Total number of elements below which dask-sql should attempt to apply the top-k + optimization (when possible). ``nelem`` is defined as the limit or ``k`` value times the + number of columns. Default is 1000000, corresponding to a LIMIT clause of 1 million in a + 1 column table.