From a0e0e434d180031ec579bdb5fe33fe354f2583e3 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 26 Apr 2024 10:31:00 -0700 Subject: [PATCH 1/8] fix subtle issue with renaming before a merge --- dask_expr/_expr.py | 8 +++++++- dask_expr/tests/test_merge.py | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index ff9d5964..f015cf42 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1613,8 +1613,14 @@ class ToFrame(Elemwise): _keyword_only = ["name"] operation = M.to_frame _filter_passthrough = True - _preserves_partitioning_information = True + @functools.cached_property + def unique_partition_mapping_columns_from_shuffle(self): + unique_mapping = self.frame.unique_partition_mapping_columns_from_shuffle + if set(self.frame.columns) == unique_mapping: + # Account for default name conversion + return set(self.columns) + return unique_mapping class ToFrameIndex(Elemwise): _parameters = ["frame", "index", "name"] diff --git a/dask_expr/tests/test_merge.py b/dask_expr/tests/test_merge.py index 0bf078bd..24b3181f 100644 --- a/dask_expr/tests/test_merge.py +++ b/dask_expr/tests/test_merge.py @@ -1011,3 +1011,19 @@ def test_merge_suffix_projections(): expected = df.merge(df, on="a") expected = expected[expected["c_x"] == "A"]["c_y"] assert_eq(result, expected) + + +def test_merge_after_rename(): + pleft = pd.Series(range(10)) + pleft = pleft.drop_duplicates().to_frame() + pleft.columns = ["a"] + + left = from_pandas(pd.Series(range(10)), npartitions=2) + left = left.drop_duplicates().to_frame() + left.columns = ["a"] + + right = pd.DataFrame({"a": [1, 2] * 5}) + + expected = pleft.merge(right, how="inner") + result = left.merge(right, how="inner") + assert_eq(result, expected) From b73938d7fb7f1c5c9840df98ef4174475f055f6f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 26 Apr 2024 10:32:13 -0700 Subject: [PATCH 2/8] check formatting --- dask_expr/_expr.py | 1 + dask_expr/tests/test_merge.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index f015cf42..5d42d087 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1622,6 +1622,7 @@ def unique_partition_mapping_columns_from_shuffle(self): return set(self.columns) return unique_mapping + class ToFrameIndex(Elemwise): _parameters = ["frame", "index", "name"] _defaults = {"name": no_default, "index": True} diff --git a/dask_expr/tests/test_merge.py b/dask_expr/tests/test_merge.py index 24b3181f..b4530342 100644 --- a/dask_expr/tests/test_merge.py +++ b/dask_expr/tests/test_merge.py @@ -1023,7 +1023,6 @@ def test_merge_after_rename(): left.columns = ["a"] right = pd.DataFrame({"a": [1, 2] * 5}) - expected = pleft.merge(right, how="inner") result = left.merge(right, how="inner") assert_eq(result, expected) From aea6002deffcf37205770101278d331cd7447652 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Fri, 26 Apr 2024 12:59:48 -0500 Subject: [PATCH 3/8] Update dask_expr/tests/test_merge.py --- dask_expr/tests/test_merge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/tests/test_merge.py b/dask_expr/tests/test_merge.py index b4530342..d4d6134d 100644 --- a/dask_expr/tests/test_merge.py +++ b/dask_expr/tests/test_merge.py @@ -1025,4 +1025,4 @@ def test_merge_after_rename(): right = pd.DataFrame({"a": [1, 2] * 5}) expected = pleft.merge(right, how="inner") result = left.merge(right, how="inner") - assert_eq(result, expected) + assert_eq(result, expected, check_index=False) From d46447da7a1af63359147ffa64494dc3c737ba46 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 29 Apr 2024 09:27:44 -0700 Subject: [PATCH 4/8] test index case --- dask_expr/_expr.py | 9 ++++++++- dask_expr/tests/test_merge.py | 5 ++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index 5d42d087..e7287069 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1629,7 +1629,14 @@ class ToFrameIndex(Elemwise): _keyword_only = ["name", "index"] operation = M.to_frame _filter_passthrough = True - _preserves_partitioning_information = True + + @functools.cached_property + def unique_partition_mapping_columns_from_shuffle(self): + unique_mapping = self.frame.unique_partition_mapping_columns_from_shuffle + if set(self.frame.columns) == unique_mapping: + # Account for default name conversion + return set(self.columns) + return unique_mapping class ToSeriesIndex(ToFrameIndex): diff --git a/dask_expr/tests/test_merge.py b/dask_expr/tests/test_merge.py index d4d6134d..d16035df 100644 --- a/dask_expr/tests/test_merge.py +++ b/dask_expr/tests/test_merge.py @@ -1013,12 +1013,15 @@ def test_merge_suffix_projections(): assert_eq(result, expected) -def test_merge_after_rename(): +@pytest.mark.parametrize("index", [True, False]) +def test_merge_after_rename(index): pleft = pd.Series(range(10)) + pleft = pleft.index if index else pleft pleft = pleft.drop_duplicates().to_frame() pleft.columns = ["a"] left = from_pandas(pd.Series(range(10)), npartitions=2) + left = left.index if index else left left = left.drop_duplicates().to_frame() left.columns = ["a"] From c15e5d7e39a92f86ad0f935bc618edd8b7e304d0 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 1 May 2024 14:25:27 -0700 Subject: [PATCH 5/8] handle tuples --- dask_expr/_expr.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index 179b254f..e74fa9a1 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1616,28 +1616,23 @@ class ToFrame(Elemwise): @functools.cached_property def unique_partition_mapping_columns_from_shuffle(self): - unique_mapping = self.frame.unique_partition_mapping_columns_from_shuffle - if set(self.frame.columns) == unique_mapping: - # Account for default name conversion - return set(self.columns) - return unique_mapping + result = set() + name_mapping = dict(zip(self.frame.columns, self.columns)) + for elem in self.frame.unique_partition_mapping_columns_from_shuffle: + if isinstance(elem, tuple): + result.add(tuple(name_mapping.get(v, v) for v in elem)) + else: + result.add(name_mapping.get(elem, elem)) + return result -class ToFrameIndex(Elemwise): +class ToFrameIndex(ToFrame): _parameters = ["frame", "index", "name"] _defaults = {"name": no_default, "index": True} _keyword_only = ["name", "index"] operation = M.to_frame _filter_passthrough = True - @functools.cached_property - def unique_partition_mapping_columns_from_shuffle(self): - unique_mapping = self.frame.unique_partition_mapping_columns_from_shuffle - if set(self.frame.columns) == unique_mapping: - # Account for default name conversion - return set(self.columns) - return unique_mapping - class ToSeriesIndex(ToFrameIndex): _defaults = {"name": no_default, "index": None} From 9a63d2962ba7ee4512946581e90bdb9252d80c15 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Thu, 2 May 2024 12:46:13 +0200 Subject: [PATCH 6/8] Add test --- .../tests/test_partitioning_knowledge.py | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/dask_expr/tests/test_partitioning_knowledge.py b/dask_expr/tests/test_partitioning_knowledge.py index d7df9ec3..67dd8260 100644 --- a/dask_expr/tests/test_partitioning_knowledge.py +++ b/dask_expr/tests/test_partitioning_knowledge.py @@ -205,3 +205,29 @@ def test_avoid_shuffle_on_top_of_lowered_shuffle(): assert ( len(list(node for node in result.walk() if isinstance(node, DiskShuffle))) == 2 ) + + +def test_merge_to_frame(): + pdf = pd.DataFrame( + {"a": np.random.randint(1, 100, (10,)), "b": np.random.randint(1, 100, (10,))} + ) + + df = from_pandas(pdf, npartitions=4) + + pdf2 = pd.DataFrame( + {"a": np.random.randint(1, 100, (10,)), "c": np.random.randint(1, 100, (10,))} + ) + + df2 = from_pandas(pdf2, npartitions=3) + res = df.merge(df2) + result = res.a.to_frame() + assert result.unique_partition_mapping_columns_from_shuffle == {("a",)} + assert_eq(result, pdf.merge(pdf2).a.to_frame()) + + result = res.a.to_frame(name="x") + assert result.unique_partition_mapping_columns_from_shuffle == {("x",)} + assert_eq(result, pdf.merge(pdf2).a.to_frame(name="x")) + + result = res.index.to_frame() + assert result.unique_partition_mapping_columns_from_shuffle == set() + assert_eq(result, pdf.merge(pdf2).index.to_frame()) From de1e678a70d574be11eb860f28882d45bde51310 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Thu, 2 May 2024 12:53:59 +0200 Subject: [PATCH 7/8] Add test --- dask_expr/_expr.py | 10 ++++++++++ dask_expr/tests/test_partitioning_knowledge.py | 13 +++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index e74fa9a1..fad6bb72 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -2114,6 +2114,16 @@ def _task(self, index: int): "index", ) + @functools.cached_property + def unique_partition_mapping_columns_from_shuffle(self): + name = self.frame._meta.index.name + if name in self.frame.unique_partition_mapping_columns_from_shuffle: + return {name} + elif (name,) in self.frame.unique_partition_mapping_columns_from_shuffle: + return {(name,)} + else: + return set() + def _return_input(df, divisions=None): return df diff --git a/dask_expr/tests/test_partitioning_knowledge.py b/dask_expr/tests/test_partitioning_knowledge.py index 67dd8260..ef95b12d 100644 --- a/dask_expr/tests/test_partitioning_knowledge.py +++ b/dask_expr/tests/test_partitioning_knowledge.py @@ -207,7 +207,7 @@ def test_avoid_shuffle_on_top_of_lowered_shuffle(): ) -def test_merge_to_frame(): +def test_merge_groupby_to_frame(): pdf = pd.DataFrame( {"a": np.random.randint(1, 100, (10,)), "b": np.random.randint(1, 100, (10,))} ) @@ -222,7 +222,7 @@ def test_merge_to_frame(): res = df.merge(df2) result = res.a.to_frame() assert result.unique_partition_mapping_columns_from_shuffle == {("a",)} - assert_eq(result, pdf.merge(pdf2).a.to_frame()) + assert_eq(result, pdf.merge(pdf2).a.to_frame(), check_index=False) result = res.a.to_frame(name="x") assert result.unique_partition_mapping_columns_from_shuffle == {("x",)} @@ -231,3 +231,12 @@ def test_merge_to_frame(): result = res.index.to_frame() assert result.unique_partition_mapping_columns_from_shuffle == set() assert_eq(result, pdf.merge(pdf2).index.to_frame()) + + res = df.groupby("a").count(split_out=True) + result = res.index.to_frame() + assert result.unique_partition_mapping_columns_from_shuffle == {("a",)} + assert_eq(result, pdf.groupby("a").count().index.to_frame()) + + result = res.index.to_frame(name="x") + assert result.unique_partition_mapping_columns_from_shuffle == {("x",)} + assert_eq(result, pdf.groupby("a").count().index.to_frame(name="x")) From 4977083514b72d6b5a8e16420ade0c3f4f784491 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Thu, 2 May 2024 13:55:39 +0200 Subject: [PATCH 8/8] Update --- dask_expr/tests/test_partitioning_knowledge.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_expr/tests/test_partitioning_knowledge.py b/dask_expr/tests/test_partitioning_knowledge.py index ef95b12d..a2de78d6 100644 --- a/dask_expr/tests/test_partitioning_knowledge.py +++ b/dask_expr/tests/test_partitioning_knowledge.py @@ -226,11 +226,11 @@ def test_merge_groupby_to_frame(): result = res.a.to_frame(name="x") assert result.unique_partition_mapping_columns_from_shuffle == {("x",)} - assert_eq(result, pdf.merge(pdf2).a.to_frame(name="x")) + assert_eq(result, pdf.merge(pdf2).a.to_frame(name="x"), check_index=False) result = res.index.to_frame() assert result.unique_partition_mapping_columns_from_shuffle == set() - assert_eq(result, pdf.merge(pdf2).index.to_frame()) + assert_eq(result, pdf.merge(pdf2).index.to_frame(), check_index=False) res = df.groupby("a").count(split_out=True) result = res.index.to_frame()