From c2e0fec0eb1f4e5b8a9e7fa9d7d399e5ba13520c Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 29 Nov 2021 13:23:07 -0500 Subject: [PATCH 1/4] Allow custom sorting functions for dask-cudf sort_values --- python/dask_cudf/dask_cudf/core.py | 18 +++++++++++++++--- python/dask_cudf/dask_cudf/sorting.py | 7 +++---- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/python/dask_cudf/dask_cudf/core.py b/python/dask_cudf/dask_cudf/core.py index bf063918c89..fd72aaa4ae8 100644 --- a/python/dask_cudf/dask_cudf/core.py +++ b/python/dask_cudf/dask_cudf/core.py @@ -235,6 +235,8 @@ def sort_values( set_divisions=False, ascending=True, na_position="last", + sort_function=None, + sort_function_kwargs=None, **kwargs, ): if kwargs: @@ -242,10 +244,18 @@ def sort_values( f"Unsupported input arguments passed : {list(kwargs.keys())}" ) + sort_kwargs = { + "by": by, + "ascending": ascending, + "na_position": na_position, + } + if sort_function is None: + sort_function = M.sort_values + if sort_function_kwargs is not None: + sort_kwargs.update(sort_function_kwargs) + if self.npartitions == 1: - df = self.map_partitions( - M.sort_values, by, ascending=ascending, na_position=na_position - ) + df = self.map_partitions(sort_function, **sort_kwargs) else: df = sorting.sort_values( self, @@ -256,6 +266,8 @@ def sort_values( ignore_index=ignore_index, ascending=ascending, na_position=na_position, + sort_function=sort_function, + sort_function_kwargs=sort_kwargs, ) if ignore_index: diff --git a/python/dask_cudf/dask_cudf/sorting.py b/python/dask_cudf/dask_cudf/sorting.py index 5f2af445170..d604eab7111 100644 --- a/python/dask_cudf/dask_cudf/sorting.py +++ b/python/dask_cudf/dask_cudf/sorting.py @@ -10,7 +10,6 @@ from dask.dataframe.core import DataFrame, Index, Series from dask.dataframe.shuffle import rearrange_by_column from dask.highlevelgraph import HighLevelGraph -from dask.utils import M import cudf as gd from cudf.api.types import is_categorical_dtype @@ -222,6 +221,8 @@ def sort_values( ignore_index=False, ascending=True, na_position="last", + sort_function=None, + sort_function_kwargs=None, ): """Sort by the given list/tuple of column names.""" if na_position not in ("first", "last"): @@ -263,9 +264,7 @@ def sort_values( df3.divisions = (None,) * (df3.npartitions + 1) # Step 3 - Return final sorted df - df4 = df3.map_partitions( - M.sort_values, by, ascending=ascending, na_position=na_position - ) + df4 = df3.map_partitions(sort_function, **sort_function_kwargs) if not isinstance(divisions, gd.DataFrame) and set_divisions: # Can't have multi-column divisions elsewhere in dask (yet) df4.divisions = methods.tolist(divisions) From ca8e4970031cdb8ea9c8be8844dd5f1572db8918 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 29 Nov 2021 13:59:56 -0500 Subject: [PATCH 2/4] Add custom sort function test --- python/dask_cudf/dask_cudf/tests/test_sort.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/python/dask_cudf/dask_cudf/tests/test_sort.py b/python/dask_cudf/dask_cudf/tests/test_sort.py index f4ae83245cb..0b258dd33e7 100644 --- a/python/dask_cudf/dask_cudf/tests/test_sort.py +++ b/python/dask_cudf/dask_cudf/tests/test_sort.py @@ -83,3 +83,22 @@ def test_sort_values_with_nulls(data, by, ascending, na_position): # cudf ordering for nulls is non-deterministic dd.assert_eq(got[by], expect[by], check_index=False) + + +@pytest.mark.parametrize("by", [["a", "b"], ["b", "a"]]) +@pytest.mark.parametrize("nparts", [1, 10]) +def test_sort_values_custom_function(by, nparts): + df = cudf.DataFrame({"a": [1, 2, 3] * 20, "b": [4, 5, 6, 7] * 15}) + ddf = dd.from_pandas(df, npartitions=nparts) + + def f(partition, by_columns, ascending, na_position, **kwargs): + return partition.sort_values( + by_columns, ascending=ascending, na_position=na_position + ) + + with dask.config.set(scheduler="single-threaded"): + got = ddf.sort_values( + by=by[0], sort_function=f, sort_function_kwargs={"by_columns": by} + ) + expect = df.sort_values(by=by) + dd.assert_eq(got, expect, check_index=False) From e54f1bfecf086010a45e52677012dfb026dda956 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 10 Jan 2022 14:17:02 -0500 Subject: [PATCH 3/4] Move custom sort function logic to internal sort_values --- python/dask_cudf/dask_cudf/core.py | 37 +++++++++------------------ python/dask_cudf/dask_cudf/sorting.py | 16 ++++++++++++ 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/python/dask_cudf/dask_cudf/core.py b/python/dask_cudf/dask_cudf/core.py index fd72aaa4ae8..e191873f82b 100644 --- a/python/dask_cudf/dask_cudf/core.py +++ b/python/dask_cudf/dask_cudf/core.py @@ -244,31 +244,18 @@ def sort_values( f"Unsupported input arguments passed : {list(kwargs.keys())}" ) - sort_kwargs = { - "by": by, - "ascending": ascending, - "na_position": na_position, - } - if sort_function is None: - sort_function = M.sort_values - if sort_function_kwargs is not None: - sort_kwargs.update(sort_function_kwargs) - - if self.npartitions == 1: - df = self.map_partitions(sort_function, **sort_kwargs) - else: - df = sorting.sort_values( - self, - by, - max_branch=max_branch, - divisions=divisions, - set_divisions=set_divisions, - ignore_index=ignore_index, - ascending=ascending, - na_position=na_position, - sort_function=sort_function, - sort_function_kwargs=sort_kwargs, - ) + df = sorting.sort_values( + self, + by, + max_branch=max_branch, + divisions=divisions, + set_divisions=set_divisions, + ignore_index=ignore_index, + ascending=ascending, + na_position=na_position, + sort_function=sort_function, + sort_function_kwargs=sort_function_kwargs, + ) if ignore_index: return df.reset_index(drop=True) diff --git a/python/dask_cudf/dask_cudf/sorting.py b/python/dask_cudf/dask_cudf/sorting.py index fa3a32db0cc..a5007c69d18 100644 --- a/python/dask_cudf/dask_cudf/sorting.py +++ b/python/dask_cudf/dask_cudf/sorting.py @@ -10,6 +10,7 @@ from dask.dataframe.core import DataFrame, Index, Series from dask.dataframe.shuffle import rearrange_by_column from dask.highlevelgraph import HighLevelGraph +from dask.utils import M import cudf as gd from cudf.api.types import is_categorical_dtype @@ -236,6 +237,21 @@ def sort_values( elif not isinstance(by, list): by = [by] + # parse custom sort function / kwargs if provided + sort_kwargs = { + "by": by, + "ascending": ascending, + "na_position": na_position, + } + if sort_function is None: + sort_function = M.sort_values + if sort_function_kwargs is not None: + sort_kwargs.update(sort_function_kwargs) + + # handle single partition case + if npartitions == 1: + return df.map_partitions(sort_function, **sort_kwargs) + # Step 1 - Calculate new divisions (if necessary) if divisions is None: divisions = quantile_divisions(df, by, npartitions) From bc9291c15d17bb92e0a5874eb661b1fa5d0ea165 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 10 Jan 2022 17:01:18 -0500 Subject: [PATCH 4/4] Use correct sort kwargs for map_partitions call --- python/dask_cudf/dask_cudf/sorting.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/sorting.py b/python/dask_cudf/dask_cudf/sorting.py index a5007c69d18..af40d9ca41b 100644 --- a/python/dask_cudf/dask_cudf/sorting.py +++ b/python/dask_cudf/dask_cudf/sorting.py @@ -282,7 +282,7 @@ def sort_values( df3.divisions = (None,) * (df3.npartitions + 1) # Step 3 - Return final sorted df - df4 = df3.map_partitions(sort_function, **sort_function_kwargs) + df4 = df3.map_partitions(sort_function, **sort_kwargs) if not isinstance(divisions, gd.DataFrame) and set_divisions: # Can't have multi-column divisions elsewhere in dask (yet) df4.divisions = methods.tolist(divisions)