Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort + limit topk optimization (initial) #893

Merged
merged 14 commits into from
Nov 7, 2022

Conversation

ayushdg
Copy link
Collaborator

@ayushdg ayushdg commented Oct 26, 2022

This pr attempts to optimize sql queries that have a combination of ORDER BY followed by LIMIT to use the smallest/nlargest api which does a partition wise sort+limit+combine instead of a full shuffle and can lead to significant performance improvements.

The implementation is currently only limited to cases where all sort columns are either ASCENDING NULLS LAST(default) or DESCENDING NULLS LAST, and does not work when sorting on object columns such as strings for pandas backed dataframes.

Todo:

  • Add tests
  • Benchmark perf in different scenarios (cpu, gpu varying limit rows)
  • Explore cpu perf for higher limit values
    • Seems like the transfer time on the cpu based tests is significantly slower than a gpu dataframe using tcp protocol.
    • Here's a gist with some tests.
    • Cluprit may be string columns. Delta in xfer time without strings is much lower.

Here are some benchmarks carried out on my workstation on a sample timeseries dataset with 14.4M rows X 4 columns sorting on an int column

Values represent wall time in seconds

This PR

CPU GPU
LIMIT
100 0.5 0.08
100k 0.86 0.11
7.2M (nrows/2) 8.8 0.64
14.4M (nrows) 10.8 0.66
No Limit 3.4 0.65

Main

CPU GPU
LIMIT
100 2.55 0.55
100k 2.83 0.59
7.2M (nrows/2) 6.13 0.96
14.4M (nrows) 6.18 0.97
No Limit 3.5 0.63

@codecov-commenter
Copy link

codecov-commenter commented Oct 27, 2022

Codecov Report

Merging #893 (8e9c2c5) into main (10de5ef) will increase coverage by 0.21%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main     #893      +/-   ##
==========================================
+ Coverage   75.25%   75.46%   +0.21%     
==========================================
  Files          72       72              
  Lines        3786     3799      +13     
  Branches      675      678       +3     
==========================================
+ Hits         2849     2867      +18     
+ Misses        804      795       -9     
- Partials      133      137       +4     
Impacted Files Coverage Δ
dask_sql/physical/rel/logical/sort.py 86.36% <100.00%> (+0.64%) ⬆️
dask_sql/physical/utils/sort.py 94.59% <100.00%> (+2.59%) ⬆️
dask_sql/_version.py 35.31% <0.00%> (+1.41%) ⬆️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@ayushdg ayushdg marked this pull request as ready for review November 1, 2022 00:43
@ayushdg
Copy link
Collaborator Author

ayushdg commented Nov 2, 2022

Based on the perf benchmarks above and the fact that nsmallest/nlargest brings things down to a single partition, it might make sense to cap using this optimization for smaller k.

I'm not quite sure on how we can decide on this k value, for now are we okay adding it as a config with some fixed arbitrary number like 500k for the number of elements? (k * num_columns)

cc: @charlesbluca

@charlesbluca
Copy link
Collaborator

Yeah, I think it makes sense to make this a Dask config option for now, that we can revisit if we're able to find a heuristic that reasonably decides an optimal k value

Copy link
Collaborator

@charlesbluca charlesbluca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ayushdg! A couple small comments:

dask_planner/src/sql/logical/sort.rs Outdated Show resolved Hide resolved
dask_sql/physical/rel/logical/sort.py Outdated Show resolved Hide resolved
dask_sql/sql-schema.yaml Outdated Show resolved Hide resolved
@@ -3,7 +3,6 @@
import pytest

XFAIL_QUERIES = (
4,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of interest, do we know what in particular in this PR caused q4 to start passing?

Copy link
Collaborator Author

@ayushdg ayushdg Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually looked into this a bit the errors from this query come from one of the rows having a non standard char C�TE D'IVOIRE that arrow cannot render.

It impacts the Dask Dataframe version and only impacts the dask-cudf version if we try to print/repr it.
For whatever reason dask-cudf sort_values ended up invoking the repr method in cudf which is a bit confusing. The nsmallest api doesn't causing the repr function to be invoked allowing the query to pass

../../dask_sql/physical/utils/sort.py:36: in apply_sort
    return df.sort_values(
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/contextlib.py:79: in inner
    return func(*args, **kwds)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask_cudf/core.py:249: in sort_values
    df = sorting.sort_values(
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/contextlib.py:79: in inner
    return func(*args, **kwds)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask_cudf/sorting.py:277: in sort_values
    partitions = df[by].map_partitions(
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask/dataframe/core.py:872: in map_partitions
    return map_partitions(func, self, *args, **kwargs)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask/dataframe/core.py:6610: in map_partitions
    token = tokenize(func, meta, *args, **kwargs)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask/base.py:933: in tokenize
    hasher.update(str(normalize_token(kwargs)).encode())
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask/utils.py:640: in __call__
    return meth(arg, *args, **kwargs)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/dask/base.py:961: in normalize_dict
    return normalize_token(sorted(d.items(), key=str))
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/contextlib.py:79: in inner
    return func(*args, **kwds)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/cudf/core/dataframe.py:1880: in __repr__
    return self._clean_renderable_dataframe(output)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/cudf/core/dataframe.py:1758: in _clean_renderable_dataframe
    output = output.to_pandas().to_string(
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/contextlib.py:79: in inner
    return func(*args, **kwds)
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/cudf/core/dataframe.py:4813: in to_pandas
    out_data[i] = self._data[col_key].to_pandas(
/datasets/adattagupta/miniconda3/envs/dask-sql-rust2/lib/python3.9/site-packages/cudf/core/column/string.py:5475: in to_pandas
    pd_series = self.to_arrow().to_pandas(**kwargs)
pyarrow/array.pxi:823: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/array.pxi:1396: in pyarrow.lib.Array._to_pandas
    ???
pyarrow/array.pxi:1597: in pyarrow.lib._array_like_to_pandas
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???
E   pyarrow.lib.ArrowException: Unknown error: Wrapping CTE D'IVOIRE failed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay that makes sense - looks like what's happening here is that as part of Dask's sorting algorithm, we pass a dataframe of quantile division to map_partitions, which Dask then attempts to tokenize using a string representation of the frame.

Copy link
Collaborator

@charlesbluca charlesbluca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ayushdg! LGTM

@ayushdg ayushdg merged commit 9bb37a7 into dask-contrib:main Nov 7, 2022
@ayushdg ayushdg deleted the sort-topk-optimization branch December 12, 2022 13:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants