Skip to content
/ cudf Public
forked from rapidsai/cudf

Commit

Permalink
Match pandas ordering for joins in pandas-compat mode
Browse files Browse the repository at this point in the history
If we pass sort=True to merges we are on the hook to sort the result
in order with respect to the key columns. If those key columns have
repeated values there is still some space for ambiguity. Currently we
get a result back whose order (for the repeated key values) is
determined by the gather map that libcudf returns for the join. This
does not come with any ordering guarantees.

When sort=False, pandas has join-type dependent ordering guarantees
which we also do not match. To fix this, in pandas-compatible mode
only, reorder the gather maps according to the order of the input
keys. When sort=False this means that our result matches pandas
ordering. When sort=True, it ensures that (if we use a stable sort)
the tie-break for equal sort keys is the input dataframe order.

While we're here, switch from argsort + gather to sort_by_key when
sorting results.

- Closes rapidsai#14001
  • Loading branch information
wence- committed Nov 16, 2023
1 parent 427390f commit 40a5861
Showing 1 changed file with 111 additions and 14 deletions.
125 changes: 111 additions & 14 deletions python/cudf/cudf/core/join/join.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
from __future__ import annotations

import itertools
import warnings
from typing import Any, ClassVar, List, Optional

import cudf
from cudf import _lib as libcudf
from cudf._lib.types import size_type_dtype
from cudf.core.copy_types import GatherMap
from cudf.core.join._join_helpers import (
_coerce_to_tuple,
Expand Down Expand Up @@ -94,7 +96,44 @@ def __init__(
self.lhs = lhs.copy(deep=False)
self.rhs = rhs.copy(deep=False)
self.how = how
self.sort = sort
# If the user requests that the result is sorted or we're in
# pandas-compatible mode we have various obligations on the
# output order:
#
# compat-> | False | True
# sort | |
# ---------+--------------------------+-------------------------------
# False| no obligation | ordering as per pandas docs(*)
# True | sorted lexicographically | sorted lexicographically(*)
#
# (*) If two keys are equal, tiebreak is to use input table order.
#
# In pandas-compat mode, we have obligations on the order to
# match pandas (even if sort=False), see
# pandas.pydata.org/docs/reference/api/pandas.DataFrame.merge.html.
# The ordering requirements differ depending on which join
# type is specified:
#
# - left: preserve key order (only keeping left keys)
# - right: preserve key order (only keeping right keys)
# - inner: preserve key order (of left keys)
# - outer: sort keys lexicographically
# - cross (not supported): preserve key order (of left keys)
#
# Moreover, in all cases, whenever there is a tiebreak
# situation (for sorting or otherwise), the deciding order is
# "input table order"
self.sort = sort or (
cudf.get_option("mode.pandas_compatible") and how == "outer"
)
self.preserve_key_order = cudf.get_option(
"mode.pandas_compatible"
) and how in {
"inner",
"outer",
"left",
"right",
}
self.lsuffix, self.rsuffix = suffixes

# At this point validation guarantees that if on is not None we
Expand Down Expand Up @@ -160,6 +199,55 @@ def __init__(
}
)

def _gather_maps(self, left_cols, right_cols):
# Produce gather maps for the join, optionally reordering to
# match pandas-order in compat mode.
maps = self._joiner(
left_cols,
right_cols,
how=self.how,
)
if not self.preserve_key_order:
return maps
# We should only get here if we're in a join on which
# pandas-compat places some ordering obligation (which
# precludes a semi-join)
# We must perform this reordering even if sort=True since the
# obligation to ensure tiebreaks appear in input table order
# means that the gather maps must be permuted into an original
# order.
assert self.how in {"inner", "outer", "left", "right"}
# And hence both maps returned from the libcudf join should be
# non-None.
assert all(m is not None for m in maps)
lengths = [len(left_cols[0]), len(right_cols[0])]
# Only nullify those maps that need it.
nullify = [
self.how not in {"inner", "left"},
self.how not in {"inner", "right"},
]
# To reorder maps so that they are in order of the input
# tables, we gather from iota on both right and left, and then
# sort the gather maps with those two columns as key.
key_order = list(
itertools.chain.from_iterable(
libcudf.copying.gather(
[cudf.core.column.arange(n, dtype=size_type_dtype)],
map_,
nullify=null,
)
for map_, n, null in zip(maps, lengths, nullify)
)
)
return libcudf.sort.sort_by_key(
list(maps),
# If how is right, right map is primary sort key.
key_order[:: -1 if self.how == "right" else 1],
[True] * len(key_order),
["last"] * len(key_order),
stable=True,
)

def perform_merge(self) -> cudf.DataFrame:
left_join_cols = []
right_join_cols = []
Expand All @@ -184,12 +272,9 @@ def perform_merge(self) -> cudf.DataFrame:
left_key.set(self.lhs, lcol_casted, validate=False)
right_key.set(self.rhs, rcol_casted, validate=False)

left_rows, right_rows = self._joiner(
left_join_cols,
right_join_cols,
how=self.how,
left_rows, right_rows = self._gather_maps(
left_join_cols, right_join_cols
)

gather_kwargs = {
"keep_index": self._using_left_index or self._using_right_index,
}
Expand Down Expand Up @@ -305,6 +390,11 @@ def _sort_result(self, result: cudf.DataFrame) -> cudf.DataFrame:
# same order as given in 'on'. If the indices are used as
# keys, the index will be sorted. If one index is specified,
# the key columns on the other side will be used to sort.
# In pandas-compatible mode, tie-breaking for multiple equal
# sort keys is to produce output in input dataframe order.
# This is taken care of by using a stable sort here, and (in
# pandas-compat mode) reordering the gather maps before
# producing the input result.
by: List[Any] = []
if self._using_left_index and self._using_right_index:
by.extend(result._index._data.columns)
Expand All @@ -313,15 +403,22 @@ def _sort_result(self, result: cudf.DataFrame) -> cudf.DataFrame:
if not self._using_right_index:
by.extend([result._data[col.name] for col in self._right_keys])
if by:
to_sort = cudf.DataFrame._from_data(dict(enumerate(by)))
sort_order = GatherMap.from_column_unchecked(
cudf.core.column.as_column(to_sort.argsort()),
len(result),
nullify=False,
keep_index = self._using_left_index or self._using_right_index
if keep_index:
to_sort = [*result._index._columns, *result._columns]
index_names = result._index.names
else:
to_sort = [*result._columns]
index_names = None
result_columns = libcudf.sort.sort_by_key(
to_sort,
by,
[True] * len(by),
["last"] * len(by),
stable=True,
)
result = result._gather(
sort_order,
keep_index=self._using_left_index or self._using_right_index,
result = result._from_columns_like_self(
result_columns, result._column_names, index_names
)
return result

Expand Down

0 comments on commit 40a5861

Please sign in to comment.