Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort + limit topk optimization (initial) #893

Merged
merged 14 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dask_planner/src/sql/logical/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ impl PySort {
pub fn sort_expressions(&self) -> PyResult<Vec<PyExpr>> {
py_expr_list(&self.sort.input, &self.sort.expr)
}

#[pyo3(name = "getNumRows")]
pub fn get_fetch_val(&self) -> PyResult<Option<usize>> {
Ok(self.sort.fetch)
}
}

impl TryFrom<LogicalPlan> for PySort {
Expand Down
9 changes: 6 additions & 3 deletions dask_sql/physical/rel/logical/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
(dc,) = self.assert_inputs(rel, 1, context)
df = dc.df
cc = dc.column_container
sort_expressions = rel.sort().getCollation()
sort_plan = rel.sort()
sort_expressions = sort_plan.getCollation()
sort_columns = [
cc.get_backend_by_frontend_name(expr.column_name(rel))
for expr in sort_expressions
]
sort_ascending = [expr.isSortAscending() for expr in sort_expressions]
sort_null_first = [expr.isSortNullsFirst() for expr in sort_expressions]
sort_num_rows = sort_plan.getNumRows()

df = df.persist()
df = apply_sort(df, sort_columns, sort_ascending, sort_null_first)
df = apply_sort(
df, sort_columns, sort_ascending, sort_null_first, sort_num_rows
)

cc = self.fix_column_to_row_type(cc, rel.getRowType())
# No column type has changed, so no need to cast again
Expand Down
58 changes: 58 additions & 0 deletions dask_sql/physical/utils/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dask.dataframe as dd
import pandas as pd
from dask import config as dask_config
from dask.utils import M

from dask_sql.utils import make_pickable_without_dask_sql
Expand All @@ -12,13 +13,32 @@ def apply_sort(
sort_columns: List[str],
sort_ascending: List[bool],
sort_null_first: List[bool],
sort_num_rows: int = None,
) -> dd.DataFrame:
# when sort_values doesn't support lists of ascending / null
# position booleans, we can still do the sort provided that
# the list(s) are homogeneous:
single_ascending = len(set(sort_ascending)) == 1
single_null_first = len(set(sort_null_first)) == 1

if is_topk_optimizable(
df=df,
sort_columns=sort_columns,
single_ascending=single_ascending,
sort_null_first=sort_null_first,
sort_num_rows=sort_num_rows,
):
return topk_sort(
df=df,
sort_columns=sort_columns,
sort_ascending=sort_ascending,
sort_num_rows=sort_num_rows,
)

else:
# Pre persist before sort to avoid duplicate compute
df = df.persist()

# pandas / cudf don't support lists of null positions
if df.npartitions == 1 and single_null_first:
return df.map_partitions(
Expand Down Expand Up @@ -57,6 +77,18 @@ def apply_sort(
).persist()


def topk_sort(
df: dd.DataFrame,
sort_columns: List[str],
sort_ascending: List[bool],
sort_num_rows: int = None,
):
if sort_ascending[0]:
return df.nsmallest(n=sort_num_rows, columns=sort_columns)
else:
return df.nlargest(n=sort_num_rows, columns=sort_columns)


def sort_partition_func(
partition: pd.DataFrame,
sort_columns: List[str],
Expand Down Expand Up @@ -85,3 +117,29 @@ def sort_partition_func(
)

return partition


def is_topk_optimizable(
df: dd.DataFrame,
sort_columns: List[str],
single_ascending: bool,
sort_null_first: List[bool],
sort_num_rows: int = None,
):
if (
sort_num_rows is None
or not single_ascending
or any(sort_null_first)
# pandas doesnt support nsmallest/nlargest with object dtypes
or (
"pandas" in str(df._partition_type)
and any(df[sort_columns].dtypes == "object")
)
or (
sort_num_rows * len(df.columns)
> dask_config.get("sql.sort.topk-nelem-limit")
)
):
return False

return True
20 changes: 16 additions & 4 deletions dask_sql/sql-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,24 @@ properties:
queries, but can signicantly reduce memory usage when querying a small subset of a large
table. Default is ``true``.

predicate_pushdown:
optimize:
type: boolean
description: |
Whether to try pushing down filter predicates into IO (when possible).
Whether the first generated logical plan should be further optimized or used as is.

optimize:
predicate_pushdown:
type: boolean
description: |
Whether the first generated logical plan should be further optimized or used as is.
Whether to try pushing down filter predicates into IO (when possible).

sort:
type: object
properties:

topk-nelem-limit:
type: integer
description: |
Total number of elements below which dask-sql should attempt to apply the top-k
optimization (when possible). ``nelem`` is defined as the limit or ``k`` value times the
number of columns. Default is 1000000, corresponding to a LIMIT clause of 1 million in a
1 column table.
5 changes: 4 additions & 1 deletion dask_sql/sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ sql:
limit:
check-first-partition: True

optimize: True

predicate_pushdown: True

optimize: True
sort:
topk-nelem-limit: 1000000
83 changes: 83 additions & 0 deletions tests/integration/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,86 @@ def test_sort_by_old_alias(c, input_table_1, request):
df_expected = user_table_1.sort_values(["b", "user_id"], ascending=[True, False])[
["b"]
]


@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)])
def test_sort_topk(gpu):
c = Context()
df = pd.DataFrame(
{
"a": [float("nan"), 1] * 30,
"b": [1, 2, 3] * 20,
"c": ["a", "b", "c"] * 20,
}
)
c.create_table("df", dd.from_pandas(df, npartitions=10), gpu=gpu)

df_result = c.sql("""SELECT * FROM df ORDER BY a LIMIT 10""")
assert any(["nsmallest" in key for key in df_result.dask.layers.keys()])
assert_eq(
df_result,
pd.DataFrame(
{
"a": [1.0] * 10,
"b": ([2, 1, 3] * 4)[:10],
"c": (["b", "a", "c"] * 4)[:10],
}
),
check_index=False,
)

df_result = c.sql("""SELECT * FROM df ORDER BY a, b LIMIT 10""")
assert any(["nsmallest" in key for key in df_result.dask.layers.keys()])
assert_eq(
df_result,
pd.DataFrame({"a": [1.0] * 10, "b": [1] * 10, "c": ["a"] * 10}),
check_index=False,
)

df_result = c.sql(
"""SELECT * FROM df ORDER BY a DESC NULLS LAST, b DESC NULLS LAST LIMIT 10"""
)
assert any(["nlargest" in key for key in df_result.dask.layers.keys()])
assert_eq(
df_result,
pd.DataFrame({"a": [1.0] * 10, "b": [3] * 10, "c": ["c"] * 10}),
check_index=False,
)

# String column nlargest/smallest not supported for pandas
df_result = c.sql("""SELECT * FROM df ORDER BY c LIMIT 10""")
if not gpu:
assert all(["nlargest" not in key for key in df_result.dask.layers.keys()])
assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()])
else:
assert_eq(
df_result,
pd.DataFrame({"a": [float("nan"), 1] * 5, "b": [1] * 10, "c": ["a"] * 10}),
check_index=False,
)

# Assert that the optimization isn't applied when there is any nulls first
df_result = c.sql(
"""SELECT * FROM df ORDER BY a DESC, b DESC NULLS LAST LIMIT 10"""
)
assert all(["nlargest" not in key for key in df_result.dask.layers.keys()])
assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()])

# Assert optimization isn't applied for mixed asc + desc sort
df_result = c.sql("""SELECT * FROM df ORDER BY a, b DESC NULLS LAST LIMIT 10""")
assert all(["nlargest" not in key for key in df_result.dask.layers.keys()])
assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()])

# Assert optimization isn't applied when the number of requested elements
# exceed topk-nelem-limit config value
# Default topk-nelem-limit is 1M and 334k*3columns takes it above this limit
df_result = c.sql("""SELECT * FROM df ORDER BY a, b LIMIT 333334""")
assert all(["nlargest" not in key for key in df_result.dask.layers.keys()])
assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()])

df_result = c.sql(
"""SELECT * FROM df ORDER BY a, b LIMIT 10""",
config_options={"sql.sort.topk-nelem-limit": 29},
)
assert all(["nlargest" not in key for key in df_result.dask.layers.keys()])
assert all(["nsmallest" not in key for key in df_result.dask.layers.keys()])
1 change: 0 additions & 1 deletion tests/unit/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pytest

XFAIL_QUERIES = (
4,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of interest, do we know what in particular in this PR caused q4 to start passing?

Copy link
Collaborator Author

@ayushdg ayushdg Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually looked into this a bit the errors from this query come from one of the rows having a non standard char C�TE D'IVOIRE that arrow cannot render.

It impacts the Dask Dataframe version and only impacts the dask-cudf version if we try to print/repr it.
For whatever reason dask-cudf sort_values ended up invoking the repr method in cudf which is a bit confusing. The nsmallest api doesn't causing the repr function to be invoked allowing the query to pass

../../dask_sql/physical/utils/sort.py:36: in apply_sort
    return df.sort_values(
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/contextlib.py:79: in inner
    return func(*args, **kwds)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask_cudf/core.py:249: in sort_values
    df = sorting.sort_values(
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/contextlib.py:79: in inner
    return func(*args, **kwds)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask_cudf/sorting.py:277: in sort_values
    partitions = df[by].map_partitions(
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask/dataframe/core.py:872: in map_partitions
    return map_partitions(func, self, *args, **kwargs)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask/dataframe/core.py:6610: in map_partitions
    token = tokenize(func, meta, *args, **kwargs)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask/base.py:933: in tokenize
    hasher.update(str(normalize_token(kwargs)).encode())
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask/utils.py:640: in __call__
    return meth(arg, *args, **kwargs)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask/base.py:961: in normalize_dict
    return normalize_token(sorted(d.items(), key=str))
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/contextlib.py:79: in inner
    return func(*args, **kwds)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/cudf/core/dataframe.py:1880: in __repr__
    return self._clean_renderable_dataframe(output)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/cudf/core/dataframe.py:1758: in _clean_renderable_dataframe
    output = output.to_pandas().to_string(
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/contextlib.py:79: in inner
    return func(*args, **kwds)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/cudf/core/dataframe.py:4813: in to_pandas
    out_data[i] = self._data[col_key].to_pandas(
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/cudf/core/column/string.py:5475: in to_pandas
    pd_series = self.to_arrow().to_pandas(**kwargs)
pyarrow/array.pxi:823: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/array.pxi:1396: in pyarrow.lib.Array._to_pandas
    ???
pyarrow/array.pxi:1597: in pyarrow.lib._array_like_to_pandas
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???
E   pyarrow.lib.ArrowException: Unknown error: Wrapping CTE D'IVOIRE failed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay that makes sense - looks like what's happening here is that as part of Dask's sorting algorithm, we pass a dataframe of quantile division to map_partitions, which Dask then attempts to tokenize using a string representation of the frame.

5,
6,
8,
Expand Down