Skip to content

Commit

Permalink
Sort + limit topk optimization (initial) (#893)
Browse files Browse the repository at this point in the history
* Rust:Add method to retreive fetch rows during sort

* Update sort plugin to use nsmallest/largest if applicalbe

* move topk optimization checks to it's own function

* Fix check for is_topk_optimizable

* Add topk tests

* Un-xfail q4

* Add sort topk-nelem-limit as config option

* Add check for topk config option to is_topk_optimizable

* Add more topk sort tests

* use common variable for rel.sort plan

* Apply suggestions from code review

Co-authored-by: Charles Blackmon-Luca <[email protected]>

Co-authored-by: Charles Blackmon-Luca <[email protected]>
  • Loading branch information
ayushdg and charlesbluca authored Nov 7, 2022
1 parent 10de5ef commit 9bb37a7
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 9 deletions.
5 changes: 5 additions & 0 deletions dask_planner/src/sql/logical/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ impl PySort {
pub fn sort_expressions(&self) -> PyResult<Vec<PyExpr>> {
py_expr_list(&self.sort.input, &self.sort.expr)
}

#[pyo3(name = "getNumRows")]
pub fn get_fetch_val(&self) -> PyResult<Option<usize>> {
Ok(self.sort.fetch)
}
}

impl TryFrom<LogicalPlan> for PySort {
Expand Down
9 changes: 6 additions & 3 deletions dask_sql/physical/rel/logical/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
(dc,) = self.assert_inputs(rel, 1, context)
df = dc.df
cc = dc.column_container
sort_expressions = rel.sort().getCollation()
sort_plan = rel.sort()
sort_expressions = sort_plan.getCollation()
sort_columns = [
cc.get_backend_by_frontend_name(expr.column_name(rel))
for expr in sort_expressions
]
sort_ascending = [expr.isSortAscending() for expr in sort_expressions]
sort_null_first = [expr.isSortNullsFirst() for expr in sort_expressions]
sort_num_rows = sort_plan.getNumRows()

df = df.persist()
df = apply_sort(df, sort_columns, sort_ascending, sort_null_first)
df = apply_sort(
df, sort_columns, sort_ascending, sort_null_first, sort_num_rows
)

cc = self.fix_column_to_row_type(cc, rel.getRowType())
# No column type has changed, so no need to cast again
Expand Down
58 changes: 58 additions & 0 deletions dask_sql/physical/utils/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dask.dataframe as dd
import pandas as pd
from dask import config as dask_config
from dask.utils import M

from dask_sql.utils import make_pickable_without_dask_sql
Expand All @@ -12,13 +13,32 @@ def apply_sort(
sort_columns: List[str],
sort_ascending: List[bool],
sort_null_first: List[bool],
sort_num_rows: int = None,
) -> dd.DataFrame:
# when sort_values doesn't support lists of ascending / null
# position booleans, we can still do the sort provided that
# the list(s) are homogeneous:
single_ascending = len(set(sort_ascending)) == 1
single_null_first = len(set(sort_null_first)) == 1

if is_topk_optimizable(
df=df,
sort_columns=sort_columns,
single_ascending=single_ascending,
sort_null_first=sort_null_first,
sort_num_rows=sort_num_rows,
):
return topk_sort(
df=df,
sort_columns=sort_columns,
sort_ascending=sort_ascending,
sort_num_rows=sort_num_rows,
)

else:
# Pre persist before sort to avoid duplicate compute
df = df.persist()

# pandas / cudf don't support lists of null positions
if df.npartitions == 1 and single_null_first:
return df.map_partitions(
Expand Down Expand Up @@ -57,6 +77,18 @@ def apply_sort(
).persist()


def topk_sort(
df: dd.DataFrame,
sort_columns: List[str],
sort_ascending: List[bool],
sort_num_rows: int = None,
):
if sort_ascending[0]:
return df.nsmallest(n=sort_num_rows, columns=sort_columns)
else:
return df.nlargest(n=sort_num_rows, columns=sort_columns)


def sort_partition_func(
partition: pd.DataFrame,
sort_columns: List[str],
Expand Down Expand Up @@ -85,3 +117,29 @@ def sort_partition_func(
)

return partition


def is_topk_optimizable(
df: dd.DataFrame,
sort_columns: List[str],
single_ascending: bool,
sort_null_first: List[bool],
sort_num_rows: int = None,
):
if (
sort_num_rows is None
or not single_ascending
or any(sort_null_first)
# pandas doesnt support nsmallest/nlargest with object dtypes
or (
"pandas" in str(df._partition_type)
and any(df[sort_columns].dtypes == "object")
)
or (
sort_num_rows * len(df.columns)
> dask_config.get("sql.sort.topk-nelem-limit")
)
):
return False

return True
20 changes: 16 additions & 4 deletions dask_sql/sql-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,24 @@ properties:
queries, but can signicantly reduce memory usage when querying a small subset of a large
table. Default is ``true``.
predicate_pushdown:
optimize:
type: boolean
description: |
Whether to try pushing down filter predicates into IO (when possible).
Whether the first generated logical plan should be further optimized or used as is.
optimize:
predicate_pushdown:
type: boolean
description: |
Whether the first generated logical plan should be further optimized or used as is.
Whether to try pushing down filter predicates into IO (when possible).
sort:
type: object
properties:

topk-nelem-limit:
type: integer
description: |
Total number of elements below which dask-sql should attempt to apply the top-k
optimization (when possible). ``nelem`` is defined as the limit or ``k`` value times the
number of columns. Default is 1000000, corresponding to a LIMIT clause of 1 million in a
1 column table.
5 changes: 4 additions & 1 deletion dask_sql/sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ sql:
limit:
check-first-partition: True

optimize: True

predicate_pushdown: True

optimize: True
sort:
topk-nelem-limit: 1000000
83 changes: 83 additions & 0 deletions tests/integration/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,86 @@ def test_sort_by_old_alias(c, input_table_1, request):
df_expected = user_table_1.sort_values(["b", "user_id"], ascending=[True, False])[
["b"]
]


@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)])
def test_sort_topk(gpu):
c = Context()
df = pd.DataFrame(
{
"a": [float("nan"), 1] * 30,
"b": [1, 2, 3] * 20,
"c": ["a", "b", "c"] * 20,
}
)
c.create_table("df", dd.from_pandas(df, npartitions=10), gpu=gpu)

df_result = c.sql("""SELECT * FROM df ORDER BY a LIMIT 10""")
assert any(["nsmallest" in key for key in df_result.dask.layers.keys()])
assert_eq(
df_result,
pd.DataFrame(
{
"a": [1.0] * 10,
"b": ([2, 1, 3] * 4)[:10],
"c": (["b", "a", "c"] * 4)[:10],
}
),
check_index=False,
)

df_result = c.sql("""SELECT * FROM df ORDER BY a, b LIMIT 10""")
assert any(["nsmallest" in key for key in df_result.dask.layers.keys()])
assert_eq(
df_result,
pd.DataFrame({"a": [1.0] * 10, "b": [1] * 10, "c": ["a"] * 10}),
check_index=False,
)

df_result = c.sql(
"""SELECT * FROM df ORDER BY a DESC NULLS LAST, b DESC NULLS LAST LIMIT 10"""
)
assert any(["nlargest" in key for key in df_result.dask.layers.keys()])
assert_eq(
df_result,
pd.DataFrame({"a": [1.0] * 10, "b": [3] * 10, "c": ["c"] * 10}),
check_index=False,
)

# String column nlargest/smallest not supported for pandas
df_result = c.sql("""SELECT * FROM df ORDER BY c LIMIT 10""")
if not gpu:
assert all(["nlargest" not in key for key in df_result.dask.layers.keys()])
assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()])
else:
assert_eq(
df_result,
pd.DataFrame({"a": [float("nan"), 1] * 5, "b": [1] * 10, "c": ["a"] * 10}),
check_index=False,
)

# Assert that the optimization isn't applied when there is any nulls first
df_result = c.sql(
"""SELECT * FROM df ORDER BY a DESC, b DESC NULLS LAST LIMIT 10"""
)
assert all(["nlargest" not in key for key in df_result.dask.layers.keys()])
assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()])

# Assert optimization isn't applied for mixed asc + desc sort
df_result = c.sql("""SELECT * FROM df ORDER BY a, b DESC NULLS LAST LIMIT 10""")
assert all(["nlargest" not in key for key in df_result.dask.layers.keys()])
assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()])

# Assert optimization isn't applied when the number of requested elements
# exceed topk-nelem-limit config value
# Default topk-nelem-limit is 1M and 334k*3columns takes it above this limit
df_result = c.sql("""SELECT * FROM df ORDER BY a, b LIMIT 333334""")
assert all(["nlargest" not in key for key in df_result.dask.layers.keys()])
assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()])

df_result = c.sql(
"""SELECT * FROM df ORDER BY a, b LIMIT 10""",
config_options={"sql.sort.topk-nelem-limit": 29},
)
assert all(["nlargest" not in key for key in df_result.dask.layers.keys()])
assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()])
1 change: 0 additions & 1 deletion tests/unit/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pytest

XFAIL_QUERIES = (
4,
5,
6,
8,
Expand Down

0 comments on commit 9bb37a7

Please sign in to comment.