diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index b94f8f583f4..20f5b7989eb 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -1,11 +1,13 @@ # Copyright (c) 2020-2023, NVIDIA CORPORATION. from __future__ import annotations +import itertools import warnings from typing import Any, ClassVar, List, Optional import cudf from cudf import _lib as libcudf +from cudf._lib.types import size_type_dtype from cudf.core.copy_types import GatherMap from cudf.core.join._join_helpers import ( _coerce_to_tuple, @@ -94,7 +96,44 @@ def __init__( self.lhs = lhs.copy(deep=False) self.rhs = rhs.copy(deep=False) self.how = how - self.sort = sort + # If the user requests that the result is sorted or we're in + # pandas-compatible mode we have various obligations on the + # output order: + # + # compat-> | False | True + # sort | | + # ---------+--------------------------+------------------------------- + # False| no obligation | ordering as per pandas docs(*) + # True | sorted lexicographically | sorted lexicographically(*) + # + # (*) If two keys are equal, tiebreak is to use input table order. + # + # In pandas-compat mode, we have obligations on the order to + # match pandas (even if sort=False), see + # pandas.pydata.org/docs/reference/api/pandas.DataFrame.merge.html. + # The ordering requirements differ depending on which join + # type is specified: + # + # - left: preserve key order (only keeping left keys) + # - right: preserve key order (only keeping right keys) + # - inner: preserve key order (of left keys) + # - outer: sort keys lexicographically + # - cross (not supported): preserve key order (of left keys) + # + # Moreover, in all cases, whenever there is a tiebreak + # situation (for sorting or otherwise), the deciding order is + # "input table order" + self.sort = sort or ( + cudf.get_option("mode.pandas_compatible") and how == "outer" + ) + self.preserve_key_order = cudf.get_option( + "mode.pandas_compatible" + ) and how in { + "inner", + "outer", + "left", + "right", + } self.lsuffix, self.rsuffix = suffixes # At this point validation guarantees that if on is not None we @@ -160,6 +199,55 @@ def __init__( } ) + def _gather_maps(self, left_cols, right_cols): + # Produce gather maps for the join, optionally reordering to + # match pandas-order in compat mode. + maps = self._joiner( + left_cols, + right_cols, + how=self.how, + ) + if not self.preserve_key_order: + return maps + # We should only get here if we're in a join on which + # pandas-compat places some ordering obligation (which + # precludes a semi-join) + # We must perform this reordering even if sort=True since the + # obligation to ensure tiebreaks appear in input table order + # means that the gather maps must be permuted into an original + # order. + assert self.how in {"inner", "outer", "left", "right"} + # And hence both maps returned from the libcudf join should be + # non-None. + assert all(m is not None for m in maps) + lengths = [len(left_cols[0]), len(right_cols[0])] + # Only nullify those maps that need it. + nullify = [ + self.how not in {"inner", "left"}, + self.how not in {"inner", "right"}, + ] + # To reorder maps so that they are in order of the input + # tables, we gather from iota on both right and left, and then + # sort the gather maps with those two columns as key. + key_order = list( + itertools.chain.from_iterable( + libcudf.copying.gather( + [cudf.core.column.arange(n, dtype=size_type_dtype)], + map_, + nullify=null, + ) + for map_, n, null in zip(maps, lengths, nullify) + ) + ) + return libcudf.sort.sort_by_key( + list(maps), + # If how is right, right map is primary sort key. + key_order[:: -1 if self.how == "right" else 1], + [True] * len(key_order), + ["last"] * len(key_order), + stable=True, + ) + def perform_merge(self) -> cudf.DataFrame: left_join_cols = [] right_join_cols = [] @@ -184,12 +272,9 @@ def perform_merge(self) -> cudf.DataFrame: left_key.set(self.lhs, lcol_casted, validate=False) right_key.set(self.rhs, rcol_casted, validate=False) - left_rows, right_rows = self._joiner( - left_join_cols, - right_join_cols, - how=self.how, + left_rows, right_rows = self._gather_maps( + left_join_cols, right_join_cols ) - gather_kwargs = { "keep_index": self._using_left_index or self._using_right_index, } @@ -305,6 +390,11 @@ def _sort_result(self, result: cudf.DataFrame) -> cudf.DataFrame: # same order as given in 'on'. If the indices are used as # keys, the index will be sorted. If one index is specified, # the key columns on the other side will be used to sort. + # In pandas-compatible mode, tie-breaking for multiple equal + # sort keys is to produce output in input dataframe order. + # This is taken care of by using a stable sort here, and (in + # pandas-compat mode) reordering the gather maps before + # producing the input result. by: List[Any] = [] if self._using_left_index and self._using_right_index: by.extend(result._index._data.columns) @@ -313,15 +403,22 @@ def _sort_result(self, result: cudf.DataFrame) -> cudf.DataFrame: if not self._using_right_index: by.extend([result._data[col.name] for col in self._right_keys]) if by: - to_sort = cudf.DataFrame._from_data(dict(enumerate(by))) - sort_order = GatherMap.from_column_unchecked( - cudf.core.column.as_column(to_sort.argsort()), - len(result), - nullify=False, + keep_index = self._using_left_index or self._using_right_index + if keep_index: + to_sort = [*result._index._columns, *result._columns] + index_names = result._index.names + else: + to_sort = [*result._columns] + index_names = None + result_columns = libcudf.sort.sort_by_key( + to_sort, + by, + [True] * len(by), + ["last"] * len(by), + stable=True, ) - result = result._gather( - sort_order, - keep_index=self._using_left_index or self._using_right_index, + result = result._from_columns_like_self( + result_columns, result._column_names, index_names ) return result