From 40a58618f6ab5398382dd0c8a9c44c386fb7bc61 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Mon, 23 Oct 2023 16:04:58 +0100 Subject: [PATCH] Match pandas ordering for joins in pandas-compat mode If we pass sort=True to merges we are on the hook to sort the result in order with respect to the key columns. If those key columns have repeated values there is still some space for ambiguity. Currently we get a result back whose order (for the repeated key values) is determined by the gather map that libcudf returns for the join. This does not come with any ordering guarantees. When sort=False, pandas has join-type dependent ordering guarantees which we also do not match. To fix this, in pandas-compatible mode only, reorder the gather maps according to the order of the input keys. When sort=False this means that our result matches pandas ordering. When sort=True, it ensures that (if we use a stable sort) the tie-break for equal sort keys is the input dataframe order. While we're here, switch from argsort + gather to sort_by_key when sorting results. - Closes #14001 --- python/cudf/cudf/core/join/join.py | 125 +++++++++++++++++++++++++---- 1 file changed, 111 insertions(+), 14 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index b94f8f583f4..20f5b7989eb 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -1,11 +1,13 @@ # Copyright (c) 2020-2023, NVIDIA CORPORATION. from __future__ import annotations +import itertools import warnings from typing import Any, ClassVar, List, Optional import cudf from cudf import _lib as libcudf +from cudf._lib.types import size_type_dtype from cudf.core.copy_types import GatherMap from cudf.core.join._join_helpers import ( _coerce_to_tuple, @@ -94,7 +96,44 @@ def __init__( self.lhs = lhs.copy(deep=False) self.rhs = rhs.copy(deep=False) self.how = how - self.sort = sort + # If the user requests that the result is sorted or we're in + # pandas-compatible mode we have various obligations on the + # output order: + # + # compat-> | False | True + # sort | | + # ---------+--------------------------+------------------------------- + # False| no obligation | ordering as per pandas docs(*) + # True | sorted lexicographically | sorted lexicographically(*) + # + # (*) If two keys are equal, tiebreak is to use input table order. + # + # In pandas-compat mode, we have obligations on the order to + # match pandas (even if sort=False), see + # pandas.pydata.org/docs/reference/api/pandas.DataFrame.merge.html. + # The ordering requirements differ depending on which join + # type is specified: + # + # - left: preserve key order (only keeping left keys) + # - right: preserve key order (only keeping right keys) + # - inner: preserve key order (of left keys) + # - outer: sort keys lexicographically + # - cross (not supported): preserve key order (of left keys) + # + # Moreover, in all cases, whenever there is a tiebreak + # situation (for sorting or otherwise), the deciding order is + # "input table order" + self.sort = sort or ( + cudf.get_option("mode.pandas_compatible") and how == "outer" + ) + self.preserve_key_order = cudf.get_option( + "mode.pandas_compatible" + ) and how in { + "inner", + "outer", + "left", + "right", + } self.lsuffix, self.rsuffix = suffixes # At this point validation guarantees that if on is not None we @@ -160,6 +199,55 @@ def __init__( } ) + def _gather_maps(self, left_cols, right_cols): + # Produce gather maps for the join, optionally reordering to + # match pandas-order in compat mode. + maps = self._joiner( + left_cols, + right_cols, + how=self.how, + ) + if not self.preserve_key_order: + return maps + # We should only get here if we're in a join on which + # pandas-compat places some ordering obligation (which + # precludes a semi-join) + # We must perform this reordering even if sort=True since the + # obligation to ensure tiebreaks appear in input table order + # means that the gather maps must be permuted into an original + # order. + assert self.how in {"inner", "outer", "left", "right"} + # And hence both maps returned from the libcudf join should be + # non-None. + assert all(m is not None for m in maps) + lengths = [len(left_cols[0]), len(right_cols[0])] + # Only nullify those maps that need it. + nullify = [ + self.how not in {"inner", "left"}, + self.how not in {"inner", "right"}, + ] + # To reorder maps so that they are in order of the input + # tables, we gather from iota on both right and left, and then + # sort the gather maps with those two columns as key. + key_order = list( + itertools.chain.from_iterable( + libcudf.copying.gather( + [cudf.core.column.arange(n, dtype=size_type_dtype)], + map_, + nullify=null, + ) + for map_, n, null in zip(maps, lengths, nullify) + ) + ) + return libcudf.sort.sort_by_key( + list(maps), + # If how is right, right map is primary sort key. + key_order[:: -1 if self.how == "right" else 1], + [True] * len(key_order), + ["last"] * len(key_order), + stable=True, + ) + def perform_merge(self) -> cudf.DataFrame: left_join_cols = [] right_join_cols = [] @@ -184,12 +272,9 @@ def perform_merge(self) -> cudf.DataFrame: left_key.set(self.lhs, lcol_casted, validate=False) right_key.set(self.rhs, rcol_casted, validate=False) - left_rows, right_rows = self._joiner( - left_join_cols, - right_join_cols, - how=self.how, + left_rows, right_rows = self._gather_maps( + left_join_cols, right_join_cols ) - gather_kwargs = { "keep_index": self._using_left_index or self._using_right_index, } @@ -305,6 +390,11 @@ def _sort_result(self, result: cudf.DataFrame) -> cudf.DataFrame: # same order as given in 'on'. If the indices are used as # keys, the index will be sorted. If one index is specified, # the key columns on the other side will be used to sort. + # In pandas-compatible mode, tie-breaking for multiple equal + # sort keys is to produce output in input dataframe order. + # This is taken care of by using a stable sort here, and (in + # pandas-compat mode) reordering the gather maps before + # producing the input result. by: List[Any] = [] if self._using_left_index and self._using_right_index: by.extend(result._index._data.columns) @@ -313,15 +403,22 @@ def _sort_result(self, result: cudf.DataFrame) -> cudf.DataFrame: if not self._using_right_index: by.extend([result._data[col.name] for col in self._right_keys]) if by: - to_sort = cudf.DataFrame._from_data(dict(enumerate(by))) - sort_order = GatherMap.from_column_unchecked( - cudf.core.column.as_column(to_sort.argsort()), - len(result), - nullify=False, + keep_index = self._using_left_index or self._using_right_index + if keep_index: + to_sort = [*result._index._columns, *result._columns] + index_names = result._index.names + else: + to_sort = [*result._columns] + index_names = None + result_columns = libcudf.sort.sort_by_key( + to_sort, + by, + [True] * len(by), + ["last"] * len(by), + stable=True, ) - result = result._gather( - sort_order, - keep_index=self._using_left_index or self._using_right_index, + result = result._from_columns_like_self( + result_columns, result._column_names, index_names ) return result