From 26f02ae35b1d0f3c7c631d504274fb5a280dff3c Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sun, 17 Oct 2021 09:28:34 -0700 Subject: [PATCH 01/37] Remove unnecessary trivial __init__ overrides. --- python/cudf/cudf/core/groupby/groupby.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 3be71cf17a8..713c80284b6 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -1150,18 +1150,6 @@ class DataFrameGroupBy(GroupBy, GetAttrGetItemMixin): _PROTECTED_KEYS = frozenset(("obj",)) - def __init__( - self, obj, by=None, level=None, sort=False, as_index=True, dropna=True - ): - super().__init__( - obj=obj, - by=by, - level=level, - sort=sort, - as_index=as_index, - dropna=dropna, - ) - def __getitem__(self, key): return self.obj[key].groupby( self.grouping, dropna=self._dropna, sort=self._sort @@ -1234,18 +1222,6 @@ class SeriesGroupBy(GroupBy): Name: Max Speed, dtype: float64 """ - def __init__( - self, obj, by=None, level=None, sort=False, as_index=True, dropna=True - ): - super().__init__( - obj=obj, - by=by, - level=level, - sort=sort, - as_index=as_index, - dropna=dropna, - ) - def agg(self, func): result = super().agg(func) From 3e8bea5ee99c8de932df77b90223469ecdbbab0b Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 18 Oct 2021 23:35:05 -0700 Subject: [PATCH 02/37] Some minor renames. --- python/cudf/cudf/core/dataframe.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 0baa4012570..dac097cd83d 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -3919,11 +3919,8 @@ def join( FutureWarning, ) - lhs = self - rhs = other - - df = lhs.merge( - rhs, + df = self.merge( + other, left_index=True, right_index=True, how=how, @@ -3931,7 +3928,7 @@ def join( sort=sort, ) df.index.name = ( - None if lhs.index.name != rhs.index.name else lhs.index.name + None if self.index.name != other.index.name else self.index.name ) return df From 39c400a52f5ed0bd1678fc9bd5ba4c1f7cd521f3 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 18 Oct 2021 23:54:21 -0700 Subject: [PATCH 03/37] Inline merge call. --- python/cudf/cudf/core/frame.py | 12 ++++++++---- python/cudf/cudf/core/join/__init__.py | 4 ++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 934682a1996..ff5af043096 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -45,7 +45,7 @@ serialize_columns, ) from cudf.core.column_accessor import ColumnAccessor -from cudf.core.join import merge +from cudf.core.join import Merge, MergeSemi from cudf.core.udf.pipeline import compile_or_get from cudf.core.window import Rolling from cudf.utils import ioutils @@ -3611,7 +3611,11 @@ def _merge( left_index, right_index = right_index, left_index suffixes = (suffixes[1], suffixes[0]) - return merge( + if how in {"leftsemi", "leftanti"}: + merge_cls = MergeSemi + else: + merge_cls = Merge + return merge_cls( lhs, rhs, on=on, @@ -3623,7 +3627,7 @@ def _merge( sort=sort, indicator=indicator, suffixes=suffixes, - ) + ).perform_merge() def _is_sorted(self, ascending=None, null_position=None): """ @@ -5508,7 +5512,7 @@ def multiply(self, other, axis, level=None, fill_value=None): def rmul(self, other, axis, level=None, fill_value=None): """ - Get Multiplication of dataframe or series and other, element-wise + Get Multiplication of dataframe or series and other, element-wise (binary operator `rmul`). Equivalent to ``other * frame``, but with support to substitute a diff --git a/python/cudf/cudf/core/join/__init__.py b/python/cudf/cudf/core/join/__init__.py index 0463b8f9df1..71a91c398ad 100644 --- a/python/cudf/cudf/core/join/__init__.py +++ b/python/cudf/cudf/core/join/__init__.py @@ -1,3 +1,3 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2021, NVIDIA CORPORATION. -from cudf.core.join.join import merge +from cudf.core.join.join import Merge, MergeSemi From 477ab41c6273b0a2f668cf471129712834153899 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 18 Oct 2021 23:57:45 -0700 Subject: [PATCH 04/37] More simplifications. --- python/cudf/cudf/core/frame.py | 7 ++- python/cudf/cudf/core/join/join.py | 87 +++++++++--------------------- 2 files changed, 28 insertions(+), 66 deletions(-) diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index ff5af043096..e586a3ccabe 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -3603,6 +3603,7 @@ def _merge( suffixes=("_x", "_y"), ): lhs, rhs = self, right + merge_cls = Merge if how == "right": # Merge doesn't support right, so just swap how = "left" @@ -3610,11 +3611,9 @@ def _merge( left_on, right_on = right_on, left_on left_index, right_index = right_index, left_index suffixes = (suffixes[1], suffixes[0]) - - if how in {"leftsemi", "leftanti"}: + elif how in {"leftsemi", "leftanti"}: merge_cls = MergeSemi - else: - merge_cls = Merge + return merge_cls( lhs, rhs, diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 55540d362ac..69865eb96c9 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -18,44 +18,10 @@ from cudf.core.frame import Frame -def merge( - lhs, - rhs, - *, - on, - left_on, - right_on, - left_index, - right_index, - how, - sort, - indicator, - suffixes, -): - if how in {"leftsemi", "leftanti"}: - merge_cls = MergeSemi - else: - merge_cls = Merge - mergeobj = merge_cls( - lhs, - rhs, - on=on, - left_on=left_on, - right_on=right_on, - left_index=left_index, - right_index=right_index, - how=how, - sort=sort, - indicator=indicator, - suffixes=suffixes, - ) - return mergeobj.perform_merge() - - _JoinKeys = namedtuple("JoinKeys", ["left", "right"]) -class Merge(object): +class Merge: # A namedtuple of indexers representing the left and right keys _keys: _JoinKeys @@ -148,18 +114,14 @@ def __init__( self.lsuffix, self.rsuffix = suffixes self._compute_join_keys() - @property - def _out_class(self): - # type of the result - out_class = cudf.DataFrame - - if isinstance(self.lhs, cudf.MultiIndex) or isinstance( - self.rhs, cudf.MultiIndex + if isinstance(lhs, cudf.MultiIndex) or isinstance( + rhs, cudf.MultiIndex ): - out_class = cudf.MultiIndex - elif isinstance(self.lhs, cudf.BaseIndex): - out_class = self.lhs.__class__ - return out_class + self._out_class = cudf.MultiIndex + elif isinstance(lhs, cudf.BaseIndex): + self._out_class = lhs.__class__ + else: + self._out_class = cudf.DataFrame def perform_merge(self) -> Frame: lhs, rhs = self._match_key_dtypes(self.lhs, self.rhs) @@ -185,7 +147,9 @@ def perform_merge(self) -> Frame: right_rows, nullify=True, keep_index=gather_index ) - result = self._merge_results(left_result, right_result) + result = self._out_class._from_data( + *self._merge_results(left_result, right_result) + ) if self.sort: result = self._sort_result(result) @@ -260,7 +224,7 @@ def _compute_join_keys(self): self._keys = _JoinKeys(left=left_keys, right=right_keys) - def _merge_results(self, left_result: Frame, right_result: Frame) -> Frame: + def _merge_results(self, left_result: Frame, right_result: Frame): # Merge the Frames `left_result` and `right_result` into a single # `Frame`, suffixing column names if necessary. @@ -322,22 +286,19 @@ def _merge_results(self, left_result: Frame, right_result: Frame) -> Frame: right_names[rcol], right_result._data[rcol], validate=False ) - # Index of the result: - if self.left_index and self.right_index: + # The index of the left result takes precedence if we join on both left + # and right keys, which is also true if we only join on right_index. + if self.right_index: + # right_index and left_on index = left_result._index elif self.left_index: # left_index and right_on index = right_result._index - elif self.right_index: - # right_index and left_on - index = left_result._index else: index = None # Construct result from data and index: - result = self._out_class._from_data(data=data, index=index) - - return result + return data, index def _sort_result(self, result: Frame) -> Frame: # Pandas sorts on the key columns in the @@ -407,7 +368,7 @@ def _validate_merge_params( if (isinstance(lhs, cudf.Series) and not lhs.name) or ( isinstance(rhs, cudf.Series) and not rhs.name ): - raise ValueError("Can not merge on unnamed Series") + raise ValueError("Cannot merge on unnamed Series") # If nothing specified, must have common cols to use implicitly same_named_columns = set(lhs._data) & set(rhs._data) @@ -484,9 +445,11 @@ def __init__(self, *args, **kwargs): libcudf.join.semi_join, how=kwargs["how"] ) - def _merge_results(self, lhs: Frame, rhs: Frame) -> Frame: + def _merge_results(self, lhs: Frame, rhs: Frame): # semi-join result includes only lhs columns - if issubclass(self._out_class, cudf.Index): - return self._out_class._from_data(lhs._data) - else: - return self._out_class._from_data(lhs._data, index=lhs._index) + return ( + lhs._data, + lhs._index + if not issubclass(self._out_class, cudf.Index) + else None, + ) From 1027a1ec320b86036badedc88addd15c3ef9d332 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 19 Oct 2021 00:15:40 -0700 Subject: [PATCH 05/37] Store left and right keys separately to get rid of the _JoinKeys namedtuple. --- python/cudf/cudf/core/join/join.py | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 69865eb96c9..1f76e3d3956 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -2,7 +2,6 @@ from __future__ import annotations import functools -from collections import namedtuple from typing import TYPE_CHECKING, Callable, Tuple import cudf @@ -18,13 +17,7 @@ from cudf.core.frame import Frame -_JoinKeys = namedtuple("JoinKeys", ["left", "right"]) - - class Merge: - # A namedtuple of indexers representing the left and right keys - _keys: _JoinKeys - # The joiner function must have the following signature: # # def joiner( @@ -126,8 +119,8 @@ def __init__( def perform_merge(self) -> Frame: lhs, rhs = self._match_key_dtypes(self.lhs, self.rhs) - left_table = _frame_select_by_indexers(lhs, self._keys.left) - right_table = _frame_select_by_indexers(rhs, self._keys.right) + left_table = _frame_select_by_indexers(lhs, self._left_keys) + right_table = _frame_select_by_indexers(rhs, self._right_keys) left_rows, right_rows = self._joiner( left_table, right_table, how=self.how, @@ -156,7 +149,6 @@ def perform_merge(self) -> Frame: return result def _compute_join_keys(self): - # Computes self._keys left_keys = [] right_keys = [] if ( @@ -222,7 +214,7 @@ def _compute_join_keys(self): "Merge operands must have same number of join key columns" ) - self._keys = _JoinKeys(left=left_keys, right=right_keys) + self._left_keys, self._right_keys = left_keys, right_keys def _merge_results(self, left_result: Frame, right_result: Frame): # Merge the Frames `left_result` and `right_result` into a single @@ -234,7 +226,7 @@ def _merge_results(self, left_result: Frame, right_result: Frame): # by filling nulls in the left key column with corresponding values # from the right key column: if self.how == "outer": - for lkey, rkey in zip(*self._keys): + for lkey, rkey in zip(self._left_keys, self._right_keys): if lkey.name == rkey.name: # fill nulls in lhs from values in the rhs lkey.set( @@ -261,7 +253,7 @@ def _merge_results(self, left_result: Frame, right_result: Frame): else: key_columns_with_same_name = [ lkey.name - for lkey, rkey in zip(*self._keys) + for lkey, rkey in zip(self._left_keys, self._right_keys) if ( (lkey.index, rkey.index) == (False, False) and lkey.name == rkey.name @@ -399,7 +391,7 @@ def _match_key_dtypes(self, lhs: Frame, rhs: Frame) -> Tuple[Frame, Frame]: # Match the dtypes of the key columns from lhs and rhs out_lhs = lhs.copy(deep=False) out_rhs = rhs.copy(deep=False) - for left_key, right_key in zip(*self._keys): + for left_key, right_key in zip(self._left_keys, self._right_keys): lcol, rcol = left_key.get(lhs), right_key.get(rhs) lcol_casted, rcol_casted = _match_join_keys( lcol, rcol, how=self.how @@ -419,7 +411,7 @@ def _restore_categorical_keys( out_lhs = lhs.copy(deep=False) out_rhs = rhs.copy(deep=False) if self.how == "inner": - for left_key, right_key in zip(*self._keys): + for left_key, right_key in zip(self._left_keys, self._right_keys): if isinstance( left_key.get(self.lhs).dtype, cudf.CategoricalDtype ) and isinstance( From e2d5b085a55aa42d273a40b021314edd5f87c855 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 19 Oct 2021 08:17:36 -0700 Subject: [PATCH 06/37] Inline some Merge methods. --- python/cudf/cudf/core/join/join.py | 81 +++++++++++++----------------- 1 file changed, 36 insertions(+), 45 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 1f76e3d3956..7e1b6bd531f 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -2,7 +2,7 @@ from __future__ import annotations import functools -from typing import TYPE_CHECKING, Callable, Tuple +from typing import TYPE_CHECKING, Callable import cudf from cudf import _lib as libcudf @@ -117,7 +117,18 @@ def __init__( self._out_class = cudf.DataFrame def perform_merge(self) -> Frame: - lhs, rhs = self._match_key_dtypes(self.lhs, self.rhs) + # Match the dtypes of the key columns from lhs and rhs + lhs = self.lhs.copy(deep=False) + rhs = self.rhs.copy(deep=False) + for left_key, right_key in zip(self._left_keys, self._right_keys): + lcol, rcol = left_key.get(lhs), right_key.get(rhs) + lcol_casted, rcol_casted = _match_join_keys( + lcol, rcol, how=self.how + ) + if lcol is not lcol_casted: + left_key.set(lhs, lcol_casted, validate=False) + if rcol is not rcol_casted: + right_key.set(rhs, rcol_casted, validate=False) left_table = _frame_select_by_indexers(lhs, self._left_keys) right_table = _frame_select_by_indexers(rhs, self._right_keys) @@ -125,7 +136,29 @@ def perform_merge(self) -> Frame: left_rows, right_rows = self._joiner( left_table, right_table, how=self.how, ) - lhs, rhs = self._restore_categorical_keys(lhs, rhs) + + # For inner joins, any categorical keys in `self.lhs` and `self.rhs` + # were casted to their category type to produce `lhs` and `rhs`. + # Here, we cast them back. + if self.how == "inner": + for left_key, right_key in zip(self._left_keys, self._right_keys): + # Note that we check self.lhs and self.rhs rather than lhs and + # rhs here because _match_key_dtypes has already modified them. + if isinstance( + left_key.get(self.lhs).dtype, cudf.CategoricalDtype + ) and isinstance( + right_key.get(self.rhs).dtype, cudf.CategoricalDtype + ): + left_key.set( + lhs, + left_key.get(lhs).astype("category"), + validate=False, + ) + right_key.set( + rhs, + right_key.get(rhs).astype("category"), + validate=False, + ) left_result = cudf.core.frame.Frame() right_result = cudf.core.frame.Frame() @@ -387,48 +420,6 @@ def _validate_merge_params( "lsuffix and rsuffix are not defined" ) - def _match_key_dtypes(self, lhs: Frame, rhs: Frame) -> Tuple[Frame, Frame]: - # Match the dtypes of the key columns from lhs and rhs - out_lhs = lhs.copy(deep=False) - out_rhs = rhs.copy(deep=False) - for left_key, right_key in zip(self._left_keys, self._right_keys): - lcol, rcol = left_key.get(lhs), right_key.get(rhs) - lcol_casted, rcol_casted = _match_join_keys( - lcol, rcol, how=self.how - ) - if lcol is not lcol_casted: - left_key.set(out_lhs, lcol_casted, validate=False) - if rcol is not rcol_casted: - right_key.set(out_rhs, rcol_casted, validate=False) - return out_lhs, out_rhs - - def _restore_categorical_keys( - self, lhs: Frame, rhs: Frame - ) -> Tuple[Frame, Frame]: - # For inner joins, any categorical keys in `self.lhs` and `self.rhs` - # were casted to their category type to produce `lhs` and `rhs`. - # Here, we cast them back. - out_lhs = lhs.copy(deep=False) - out_rhs = rhs.copy(deep=False) - if self.how == "inner": - for left_key, right_key in zip(self._left_keys, self._right_keys): - if isinstance( - left_key.get(self.lhs).dtype, cudf.CategoricalDtype - ) and isinstance( - right_key.get(self.rhs).dtype, cudf.CategoricalDtype - ): - left_key.set( - out_lhs, - left_key.get(out_lhs).astype("category"), - validate=False, - ) - right_key.set( - out_rhs, - right_key.get(out_rhs).astype("category"), - validate=False, - ) - return out_lhs, out_rhs - class MergeSemi(Merge): def __init__(self, *args, **kwargs): From 59d95100eff57fda00a783a05be2c56114c64e9c Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 19 Oct 2021 08:33:22 -0700 Subject: [PATCH 07/37] Inline single categorical matching logic. --- python/cudf/cudf/core/join/_join_helpers.py | 52 +++++++-------------- 1 file changed, 17 insertions(+), 35 deletions(-) diff --git a/python/cudf/cudf/core/join/_join_helpers.py b/python/cudf/cudf/core/join/_join_helpers.py index cc9c0fb66da..fec71b9b55a 100644 --- a/python/cudf/cudf/core/join/_join_helpers.py +++ b/python/cudf/cudf/core/join/_join_helpers.py @@ -3,12 +3,12 @@ import collections import warnings -from typing import TYPE_CHECKING, Any, Iterable, Tuple +from typing import TYPE_CHECKING, Any, Iterable, Tuple, cast import numpy as np -import pandas as pd import cudf +from cudf.api.types import is_dtype_equal from cudf.core.dtypes import CategoricalDtype if TYPE_CHECKING: @@ -91,12 +91,22 @@ def _match_join_keys( rtype = rcol.dtype # if either side is categorical, different logic - if isinstance(ltype, CategoricalDtype) or isinstance( - rtype, CategoricalDtype - ): - return _match_categorical_dtypes(lcol, rcol, how) + left_is_categorical = isinstance(ltype, CategoricalDtype) + right_is_categorical = isinstance(rtype, CategoricalDtype) + if left_is_categorical and right_is_categorical: + return _match_categorical_dtypes_both( + cast(CategoricalColumn, lcol), cast(CategoricalColumn, rcol), how + ) + elif left_is_categorical or right_is_categorical: + if left_is_categorical: + if how in {"left", "leftsemi", "leftanti"}: + return lcol, rcol.astype(ltype) + common_type = ltype.categories.dtype + else: + common_type = rtype.categories.dtype + return lcol.astype(common_type), rcol.astype(common_type) - if pd.api.types.is_dtype_equal(ltype, rtype): + if is_dtype_equal(ltype, rtype): return lcol, rcol if isinstance(ltype, cudf.Decimal64Dtype) or isinstance( @@ -131,34 +141,9 @@ def _match_join_keys( return lcol.astype(common_type), rcol.astype(common_type) -def _match_categorical_dtypes( - lcol: ColumnBase, rcol: ColumnBase, how: str -) -> Tuple[ColumnBase, ColumnBase]: - # cast the keys lcol and rcol to a common dtype - # when at least one of them is a categorical type - ltype, rtype = lcol.dtype, rcol.dtype - - if isinstance(lcol, cudf.core.column.CategoricalColumn) and isinstance( - rcol, cudf.core.column.CategoricalColumn - ): - # if both are categoricals, logic is complicated: - return _match_categorical_dtypes_both(lcol, rcol, how) - - if isinstance(ltype, CategoricalDtype): - if how in {"left", "leftsemi", "leftanti"}: - return lcol, rcol.astype(ltype) - common_type = ltype.categories.dtype - elif isinstance(rtype, CategoricalDtype): - common_type = rtype.categories.dtype - return lcol.astype(common_type), rcol.astype(common_type) - - def _match_categorical_dtypes_both( lcol: CategoricalColumn, rcol: CategoricalColumn, how: str ) -> Tuple[ColumnBase, ColumnBase]: - # The commontype depends on both `how` and the specifics of the - # categorical variables to be merged. - ltype, rtype = lcol.dtype, rcol.dtype # when both are ordered and both have the same categories, @@ -184,9 +169,6 @@ def _match_categorical_dtypes_both( "neither side is ordered" ) - # the following should now always hold - assert not ltype.ordered and not rtype.ordered - if how == "inner": # cast to category types -- we must cast them back later return _match_join_keys( From d1363aa0c5d746586b3484d744e0f6fb85fd15ad Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 19 Oct 2021 09:19:04 -0700 Subject: [PATCH 08/37] Stop creating column accessors and frames unnecessarily when subsetting. --- python/cudf/cudf/core/join/_join_helpers.py | 22 ++++++++++----------- python/cudf/cudf/core/join/join.py | 8 ++++++-- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/python/cudf/cudf/core/join/_join_helpers.py b/python/cudf/cudf/core/join/_join_helpers.py index fec71b9b55a..b2da8551870 100644 --- a/python/cudf/cudf/core/join/_join_helpers.py +++ b/python/cudf/cudf/core/join/_join_helpers.py @@ -9,10 +9,11 @@ import cudf from cudf.api.types import is_dtype_equal +from cudf.core.column import CategoricalColumn from cudf.core.dtypes import CategoricalDtype if TYPE_CHECKING: - from cudf.core.column import CategoricalColumn, ColumnBase + from cudf.core.column import ColumnBase from cudf.core.frame import Frame @@ -56,25 +57,22 @@ def set(self, obj: Frame, value: ColumnBase, validate=False): raise KeyError() -def _frame_select_by_indexers( - frame: Frame, indexers: Iterable[_Indexer] -) -> Frame: +def _frame_select_by_indexers(frame: Frame, indexers: Iterable[_Indexer]): # Select columns from the given `Frame` using `indexers`, # and return a new `Frame`. - index_data = frame._data.__class__() - data = frame._data.__class__() + index_data = {} + data = {} for idx in indexers: if idx.index: - index_data.set_by_label(idx.name, idx.get(frame), validate=False) + index_data[idx.name] = idx.get(frame) else: - data.set_by_label(idx.name, idx.get(frame), validate=False) + data[idx.name] = idx.get(frame) - result_index = ( - cudf.core.index._index_from_data(index_data) if index_data else None + return ( + data, + cudf.core.index._index_from_data(index_data) if index_data else None, ) - result = cudf.core.frame.Frame(data=data, index=result_index) - return result def _match_join_keys( diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 7e1b6bd531f..c3893560c19 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -130,8 +130,12 @@ def perform_merge(self) -> Frame: if rcol is not rcol_casted: right_key.set(rhs, rcol_casted, validate=False) - left_table = _frame_select_by_indexers(lhs, self._left_keys) - right_table = _frame_select_by_indexers(rhs, self._right_keys) + left_table = cudf.core.frame.Frame( + *_frame_select_by_indexers(lhs, self._left_keys) + ) + right_table = cudf.core.frame.Frame( + *_frame_select_by_indexers(rhs, self._right_keys) + ) left_rows, right_rows = self._joiner( left_table, right_table, how=self.how, From 54aef839642bc33f19057c292efb4fce1da65813 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 19 Oct 2021 12:22:04 -0700 Subject: [PATCH 09/37] Minor comments. --- python/cudf/cudf/core/join/_join_helpers.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/core/join/_join_helpers.py b/python/cudf/cudf/core/join/_join_helpers.py index b2da8551870..1915a497d86 100644 --- a/python/cudf/cudf/core/join/_join_helpers.py +++ b/python/cudf/cudf/core/join/_join_helpers.py @@ -42,7 +42,7 @@ def get(self, obj: Frame) -> ColumnBase: else: if obj._index is not None: return obj._index._data[self.name] - raise KeyError() + raise KeyError def set(self, obj: Frame, value: ColumnBase, validate=False): # set the colum in `obj` @@ -54,7 +54,8 @@ def set(self, obj: Frame, value: ColumnBase, validate=False): self.name, value, validate=validate ) else: - raise KeyError() + raise KeyError + # TODO: Why is it OK for control flow to get here? def _frame_select_by_indexers(frame: Frame, indexers: Iterable[_Indexer]): @@ -78,9 +79,8 @@ def _frame_select_by_indexers(frame: Frame, indexers: Iterable[_Indexer]): def _match_join_keys( lcol: ColumnBase, rcol: ColumnBase, how: str ) -> Tuple[ColumnBase, ColumnBase]: - # returns the common dtype that lcol and rcol should be casted to, - # before they can be used as left and right join keys. - # If no casting is necessary, returns None + # Casts lcol and rcol to a common dtype for use as join keys. If no casting + # is necessary, they are returned as is. common_type = None From 291467087d798ff324aa2060961920ae2bd23cad Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 19 Oct 2021 18:26:09 -0700 Subject: [PATCH 10/37] Allow left_on and right_on to support both index levels and columns. --- python/cudf/cudf/core/join/join.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index c3893560c19..2fb0c19009c 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -206,6 +206,8 @@ def _compute_join_keys(self): left_keys.extend( [ _Indexer(name=on, column=True) + if on in self.lhs._data + else _Indexer(name=on, index=True) for on in _coerce_to_tuple(self.left_on) ] ) @@ -221,6 +223,8 @@ def _compute_join_keys(self): right_keys.extend( [ _Indexer(name=on, column=True) + if on in self.rhs._data + else _Indexer(name=on, index=True) for on in _coerce_to_tuple(self.right_on) ] ) From dbce92aa01d3c3391612590bc76067364a0fa0cc Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 09:22:18 -0700 Subject: [PATCH 11/37] Cache key_columns_with_same_name on construction. --- python/cudf/cudf/core/join/join.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 2fb0c19009c..b00364c41cc 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -116,6 +116,8 @@ def __init__( else: self._out_class = cudf.DataFrame + self._key_columns_with_same_name = self.on if self.on else None + def perform_merge(self) -> Frame: # Match the dtypes of the key columns from lhs and rhs lhs = self.lhs.copy(deep=False) @@ -289,10 +291,8 @@ def _merge_results(self, left_result: Frame, right_result: Frame): # in the final result common_names = set(left_names) & set(right_names) - if self.on: - key_columns_with_same_name = self.on - else: - key_columns_with_same_name = [ + if self._key_columns_with_same_name is None: + self._key_columns_with_same_name = [ lkey.name for lkey, rkey in zip(self._left_keys, self._right_keys) if ( @@ -300,8 +300,9 @@ def _merge_results(self, left_result: Frame, right_result: Frame): and lkey.name == rkey.name ) ] + for name in common_names: - if name not in key_columns_with_same_name: + if name not in self._key_columns_with_same_name: left_names[name] = f"{name}{self.lsuffix}" right_names[name] = f"{name}{self.rsuffix}" else: From 2bc258ef79a6660962fb6179bd760a21d1c566e6 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 11:39:56 -0700 Subject: [PATCH 12/37] Remove self.on branch for sorting. --- python/cudf/cudf/core/join/join.py | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index b00364c41cc..a24f0a4257a 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -339,33 +339,24 @@ def _sort_result(self, result: Frame) -> Frame: # same order as given in 'on'. If the indices are used as # keys, the index will be sorted. If one index is specified, # the key columns on the other side will be used to sort. - if self.on: - if isinstance(result, cudf.BaseIndex): - sort_order = result._get_sorted_inds() - else: - # need a list instead of a tuple here because - # _get_sorted_inds calls down to ColumnAccessor.get_by_label - # which handles lists and tuples differently - sort_order = result._get_sorted_inds( - list(_coerce_to_tuple(self.on)) - ) - return result._gather(sort_order, keep_index=False) + left_on = self.left_on if self.left_on else self.on + right_on = self.right_on if self.right_on else self.on by = [] if self.left_index and self.right_index: if result._index is not None: by.extend(result._index._data.columns) - if self.left_on: + if left_on: + by.extend([result._data[col] for col in _coerce_to_tuple(left_on)]) + if right_on: by.extend( - [result._data[col] for col in _coerce_to_tuple(self.left_on)] - ) - if self.right_on: - by.extend( - [result._data[col] for col in _coerce_to_tuple(self.right_on)] + [result._data[col] for col in _coerce_to_tuple(right_on)] ) if by: to_sort = cudf.DataFrame._from_columns(by) sort_order = to_sort.argsort() - result = result._gather(sort_order) + result = result._gather( + sort_order, keep_index=self.left_index or self.right_index + ) return result @staticmethod From ea425e5018e38ad8bc7772d565384d58ffda4c09 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 11:42:30 -0700 Subject: [PATCH 13/37] Remove self.on logic from join key computation. --- python/cudf/cudf/core/join/join.py | 31 +++++++----------------------- 1 file changed, 7 insertions(+), 24 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index a24f0a4257a..b572a30f15b 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -190,12 +190,9 @@ def perform_merge(self) -> Frame: def _compute_join_keys(self): left_keys = [] right_keys = [] - if ( - self.left_index - or self.right_index - or self.left_on - or self.right_on - ): + left_on = self.left_on if self.left_on else self.on + right_on = self.right_on if self.right_on else self.on + if self.left_index or self.right_index or left_on or right_on: if self.left_index: left_keys.extend( [ @@ -203,14 +200,14 @@ def _compute_join_keys(self): for on in self.lhs.index._data.names ] ) - if self.left_on: + if left_on: # TODO: require left_on or left_index to be specified left_keys.extend( [ _Indexer(name=on, column=True) if on in self.lhs._data else _Indexer(name=on, index=True) - for on in _coerce_to_tuple(self.left_on) + for on in _coerce_to_tuple(left_on) ] ) if self.right_index: @@ -220,30 +217,16 @@ def _compute_join_keys(self): for on in self.rhs.index._data.names ] ) - if self.right_on: + if right_on: # TODO: require right_on or right_index to be specified right_keys.extend( [ _Indexer(name=on, column=True) if on in self.rhs._data else _Indexer(name=on, index=True) - for on in _coerce_to_tuple(self.right_on) + for on in _coerce_to_tuple(right_on) ] ) - elif self.on: - on_names = _coerce_to_tuple(self.on) - for on in on_names: - # If `on` is provided, Merge on columns if present, - # otherwise default to indexes. - if on in self.lhs._data: - left_keys.append(_Indexer(name=on, column=True)) - else: - left_keys.append(_Indexer(name=on, index=True)) - if on in self.rhs._data: - right_keys.append(_Indexer(name=on, column=True)) - else: - right_keys.append(_Indexer(name=on, index=True)) - else: # if `on` is not provided and we're not merging # index with column or on both indexes, then use From 5128a89f2c3eacc373d6c032b7c03c575c08b3d5 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 12:00:34 -0700 Subject: [PATCH 14/37] Disallow specifying left_on with left_index or right_on with right_index. --- python/cudf/cudf/core/join/join.py | 14 ++++++++++++++ python/cudf/cudf/tests/test_joining.py | 12 ------------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index b572a30f15b..b1624cf9134 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -368,9 +368,23 @@ def _validate_merge_params( 'Can only pass argument "on" OR "left_on" ' 'and "right_on", not a combination of both.' ) + elif left_index or right_index: + # Passing 'on' with 'left_index' or 'right_index' is ambiguous + raise ValueError( + 'Can only pass argument "on" OR "left_index" ' + 'and "right_index", not a combination of both.' + ) else: # the validity of 'on' being checked by _Indexer return + elif left_on and left_index: + raise ValueError( + 'Can only pass argument "left_on" OR "left_index" not both.' + ) + elif right_on and right_index: + raise ValueError( + 'Can only pass argument "right_on" OR "right_index" not both.' + ) # Can't merge on unnamed Series if (isinstance(lhs, cudf.Series) and not lhs.name) or ( diff --git a/python/cudf/cudf/tests/test_joining.py b/python/cudf/cudf/tests/test_joining.py index 775b866f5ce..1798a3be0e0 100644 --- a/python/cudf/cudf/tests/test_joining.py +++ b/python/cudf/cudf/tests/test_joining.py @@ -741,12 +741,6 @@ def test_merge_sort(ons, hows): [ {"left_on": ["a"], "left_index": False, "right_index": True}, {"right_on": ["b"], "left_index": True, "right_index": False}, - { - "left_on": ["a"], - "right_on": ["b"], - "left_index": True, - "right_index": True, - }, ], ) def test_merge_sort_on_indexes(kwargs): @@ -1784,12 +1778,6 @@ def test_typecast_on_join_indexes_matching_categorical(): {"left_index": True, "right_on": "b"}, {"left_on": "a", "right_index": True}, {"left_index": True, "right_index": True}, - { - "left_on": "a", - "right_on": "b", - "left_index": True, - "right_index": True, - }, ], ) def test_series_dataframe_mixed_merging(lhs, rhs, how, kwargs): From b601a82067d730adf78901b88cbdcb5978759f15 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 12:09:55 -0700 Subject: [PATCH 15/37] Remove self.on. --- python/cudf/cudf/core/join/join.py | 39 +++++++++++++++++------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index b1624cf9134..c82068203d6 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -96,9 +96,11 @@ def __init__( self.lhs = lhs self.rhs = rhs - self.on = on - self.left_on = left_on - self.right_on = right_on + # At this point validation guarantees that if on is not None we + # don't have any other args, so we can apply it directly to left_on and + # right_on. + self.left_on = left_on if left_on else on + self.right_on = right_on if right_on else on self.left_index = left_index self.right_index = right_index self.how = how @@ -116,7 +118,7 @@ def __init__( else: self._out_class = cudf.DataFrame - self._key_columns_with_same_name = self.on if self.on else None + self._key_columns_with_same_name = on if on else None def perform_merge(self) -> Frame: # Match the dtypes of the key columns from lhs and rhs @@ -190,9 +192,12 @@ def perform_merge(self) -> Frame: def _compute_join_keys(self): left_keys = [] right_keys = [] - left_on = self.left_on if self.left_on else self.on - right_on = self.right_on if self.right_on else self.on - if self.left_index or self.right_index or left_on or right_on: + if ( + self.left_index + or self.right_index + or self.left_on + or self.right_on + ): if self.left_index: left_keys.extend( [ @@ -200,14 +205,14 @@ def _compute_join_keys(self): for on in self.lhs.index._data.names ] ) - if left_on: + if self.left_on: # TODO: require left_on or left_index to be specified left_keys.extend( [ _Indexer(name=on, column=True) if on in self.lhs._data else _Indexer(name=on, index=True) - for on in _coerce_to_tuple(left_on) + for on in _coerce_to_tuple(self.left_on) ] ) if self.right_index: @@ -217,14 +222,14 @@ def _compute_join_keys(self): for on in self.rhs.index._data.names ] ) - if right_on: + if self.right_on: # TODO: require right_on or right_index to be specified right_keys.extend( [ _Indexer(name=on, column=True) if on in self.rhs._data else _Indexer(name=on, index=True) - for on in _coerce_to_tuple(right_on) + for on in _coerce_to_tuple(self.right_on) ] ) else: @@ -322,17 +327,17 @@ def _sort_result(self, result: Frame) -> Frame: # same order as given in 'on'. If the indices are used as # keys, the index will be sorted. If one index is specified, # the key columns on the other side will be used to sort. - left_on = self.left_on if self.left_on else self.on - right_on = self.right_on if self.right_on else self.on by = [] if self.left_index and self.right_index: if result._index is not None: by.extend(result._index._data.columns) - if left_on: - by.extend([result._data[col] for col in _coerce_to_tuple(left_on)]) - if right_on: + if self.left_on: + by.extend( + [result._data[col] for col in _coerce_to_tuple(self.left_on)] + ) + if self.right_on: by.extend( - [result._data[col] for col in _coerce_to_tuple(right_on)] + [result._data[col] for col in _coerce_to_tuple(self.right_on)] ) if by: to_sort = cudf.DataFrame._from_columns(by) From 46ec0cd9bbe1d7c38c89d6ff68e8f6fd25726674 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 13:16:13 -0700 Subject: [PATCH 16/37] Remove DataFrame._from_columns. --- python/cudf/cudf/core/dataframe.py | 21 ++++++--------------- python/cudf/cudf/core/join/_join_helpers.py | 1 - python/cudf/cudf/core/join/join.py | 2 +- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index dac097cd83d..d64a0c076c7 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -596,9 +596,12 @@ def __init__(self, data=None, index=None, columns=None, dtype=None): else: if is_list_like(data): if len(data) > 0 and is_scalar(data[0]): - new_df = self._from_columns( - [data], index=index, columns=columns - ) + if columns is not None: + data = dict(zip(columns, [data])) + else: + data = dict(enumerate([data])) + new_df = DataFrame(data=data, index=index) + self._data = new_df._data self.index = new_df._index self.columns = new_df.columns @@ -5274,18 +5277,6 @@ def _from_arrays(cls, data, index=None, columns=None, nan_as_null=False): df._index = as_index(index) return df - @classmethod - def _from_columns(cls, cols, index=None, columns=None): - """ - Construct a DataFrame from a list of Columns - """ - if columns is not None: - data = dict(zip(columns, cols)) - else: - data = dict(enumerate(cols)) - - return cls(data=data, index=index,) - def interpolate( self, method="linear", diff --git a/python/cudf/cudf/core/join/_join_helpers.py b/python/cudf/cudf/core/join/_join_helpers.py index 1915a497d86..af8c4075b80 100644 --- a/python/cudf/cudf/core/join/_join_helpers.py +++ b/python/cudf/cudf/core/join/_join_helpers.py @@ -55,7 +55,6 @@ def set(self, obj: Frame, value: ColumnBase, validate=False): ) else: raise KeyError - # TODO: Why is it OK for control flow to get here? def _frame_select_by_indexers(frame: Frame, indexers: Iterable[_Indexer]): diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index c82068203d6..f45c0303d46 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -340,7 +340,7 @@ def _sort_result(self, result: Frame) -> Frame: [result._data[col] for col in _coerce_to_tuple(self.right_on)] ) if by: - to_sort = cudf.DataFrame._from_columns(by) + to_sort = cudf.DataFrame._from_data(dict(enumerate(by))) sort_order = to_sort.argsort() result = result._gather( sort_order, keep_index=self.left_index or self.right_index From 229175b69a90b3ee12ac0e4f435e78aac9691e0b Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 14:42:12 -0700 Subject: [PATCH 17/37] Add error check for merge keys that are present in both a frame and its index. --- python/cudf/cudf/core/join/join.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index f45c0303d46..7fed60a4995 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -391,6 +391,32 @@ def _validate_merge_params( 'Can only pass argument "right_on" OR "right_index" not both.' ) + # Can't merge on a column name that is present in both a frame and its + # indexes. + if on: + for key in on: + if (key in lhs._data and key in lhs.index._data) or ( + key in rhs._data and key in rhs.index._data + ): + raise ValueError( + f"{key} is both an index level and a " + "column label, which is ambiguous." + ) + if left_on: + for key in left_on: + if key in lhs._data and key in lhs.index._data: + raise ValueError( + f"{key} is both an index level and a " + "column label, which is ambiguous." + ) + if right_on: + for key in right_on: + if key in rhs._data and key in rhs.index._data: + raise ValueError( + f"{key} is both an index level and a " + "column label, which is ambiguous." + ) + # Can't merge on unnamed Series if (isinstance(lhs, cudf.Series) and not lhs.name) or ( isinstance(rhs, cudf.Series) and not rhs.name From 8da1115581b11b5fbc485cf898317d9e3d3224f2 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 15:51:17 -0700 Subject: [PATCH 18/37] Use the same logic for index and on cols. --- python/cudf/cudf/core/join/join.py | 68 +++++++++++++----------------- 1 file changed, 30 insertions(+), 38 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 7fed60a4995..f514e0ef6c3 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -96,17 +96,27 @@ def __init__( self.lhs = lhs self.rhs = rhs - # At this point validation guarantees that if on is not None we - # don't have any other args, so we can apply it directly to left_on and - # right_on. - self.left_on = left_on if left_on else on - self.right_on = right_on if right_on else on - self.left_index = left_index - self.right_index = right_index self.how = how self.sort = sort if suffixes: self.lsuffix, self.rsuffix = suffixes + + # At this point validation guarantees that if on is not None we + # don't have any other args, so we can apply it directly to left_on and + # right_on. + self._using_left_index = bool(left_index) + self.left_on = ( + lhs.index._data.names if left_index else left_on if left_on else on + ) + self._using_right_index = bool(right_index) + self.right_on = ( + rhs.index._data.names + if right_index + else right_on + if right_on + else on + ) + self._compute_join_keys() if isinstance(lhs, cudf.MultiIndex) or isinstance( @@ -171,7 +181,7 @@ def perform_merge(self) -> Frame: left_result = cudf.core.frame.Frame() right_result = cudf.core.frame.Frame() - gather_index = self.left_index or self.right_index + gather_index = self._using_left_index or self._using_right_index if left_rows is not None: left_result = lhs._gather( left_rows, nullify=True, keep_index=gather_index @@ -192,42 +202,23 @@ def perform_merge(self) -> Frame: def _compute_join_keys(self): left_keys = [] right_keys = [] - if ( - self.left_index - or self.right_index - or self.left_on - or self.right_on - ): - if self.left_index: - left_keys.extend( - [ - _Indexer(name=on, index=True) - for on in self.lhs.index._data.names - ] - ) + if self.left_on or self.right_on: if self.left_on: # TODO: require left_on or left_index to be specified left_keys.extend( [ _Indexer(name=on, column=True) - if on in self.lhs._data + if not self._using_left_index and on in self.lhs._data else _Indexer(name=on, index=True) for on in _coerce_to_tuple(self.left_on) ] ) - if self.right_index: - right_keys.extend( - [ - _Indexer(name=on, index=True) - for on in self.rhs.index._data.names - ] - ) if self.right_on: # TODO: require right_on or right_index to be specified right_keys.extend( [ _Indexer(name=on, column=True) - if on in self.rhs._data + if not self._using_right_index and on in self.rhs._data else _Indexer(name=on, index=True) for on in _coerce_to_tuple(self.right_on) ] @@ -308,12 +299,12 @@ def _merge_results(self, left_result: Frame, right_result: Frame): right_names[rcol], right_result._data[rcol], validate=False ) - # The index of the left result takes precedence if we join on both left - # and right keys, which is also true if we only join on right_index. - if self.right_index: + # TODO: There is a bug here, we actually need to pull the index columns + # from both if both left_index and right_index were True. + if self._using_right_index: # right_index and left_on index = left_result._index - elif self.left_index: + elif self._using_left_index: # left_index and right_on index = right_result._index else: @@ -328,14 +319,14 @@ def _sort_result(self, result: Frame) -> Frame: # keys, the index will be sorted. If one index is specified, # the key columns on the other side will be used to sort. by = [] - if self.left_index and self.right_index: + if self._using_left_index and self._using_right_index: if result._index is not None: by.extend(result._index._data.columns) - if self.left_on: + if not self._using_left_index and self.left_on: by.extend( [result._data[col] for col in _coerce_to_tuple(self.left_on)] ) - if self.right_on: + if not self._using_right_index and self.right_on: by.extend( [result._data[col] for col in _coerce_to_tuple(self.right_on)] ) @@ -343,7 +334,8 @@ def _sort_result(self, result: Frame) -> Frame: to_sort = cudf.DataFrame._from_data(dict(enumerate(by))) sort_order = to_sort.argsort() result = result._gather( - sort_order, keep_index=self.left_index or self.right_index + sort_order, + keep_index=self._using_left_index or self._using_right_index, ) return result From aef3d914ae7a52fd99b1d7b8c940c8202c9b041f Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 16:07:59 -0700 Subject: [PATCH 19/37] Simplify key computation and inline in constructor. --- python/cudf/cudf/core/join/join.py | 72 +++++++++++++----------------- 1 file changed, 32 insertions(+), 40 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index f514e0ef6c3..cbda5f11a79 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -117,7 +117,38 @@ def __init__( else on ) - self._compute_join_keys() + if self.left_on or self.right_on: + self._left_keys = [ + _Indexer(name=on, column=True) + if not self._using_left_index and on in self.lhs._data + else _Indexer(name=on, index=True) + for on in ( + _coerce_to_tuple(self.left_on) if self.left_on else [] + ) + ] + self._right_keys = [ + _Indexer(name=on, column=True) + if not self._using_right_index and on in self.rhs._data + else _Indexer(name=on, index=True) + for on in ( + _coerce_to_tuple(self.right_on) if self.right_on else [] + ) + ] + if len(self._left_keys) != len(self._right_keys): + raise ValueError( + "Merge operands must have same number of join key columns" + ) + else: + # if `on` is not provided and we're not merging + # index with column or on both indexes, then use + # the intersection of columns in both frames + on_names = set(self.lhs._data) & set(self.rhs._data) + self._left_keys = [ + _Indexer(name=on, column=True) for on in on_names + ] + self._right_keys = [ + _Indexer(name=on, column=True) for on in on_names + ] if isinstance(lhs, cudf.MultiIndex) or isinstance( rhs, cudf.MultiIndex @@ -199,45 +230,6 @@ def perform_merge(self) -> Frame: result = self._sort_result(result) return result - def _compute_join_keys(self): - left_keys = [] - right_keys = [] - if self.left_on or self.right_on: - if self.left_on: - # TODO: require left_on or left_index to be specified - left_keys.extend( - [ - _Indexer(name=on, column=True) - if not self._using_left_index and on in self.lhs._data - else _Indexer(name=on, index=True) - for on in _coerce_to_tuple(self.left_on) - ] - ) - if self.right_on: - # TODO: require right_on or right_index to be specified - right_keys.extend( - [ - _Indexer(name=on, column=True) - if not self._using_right_index and on in self.rhs._data - else _Indexer(name=on, index=True) - for on in _coerce_to_tuple(self.right_on) - ] - ) - else: - # if `on` is not provided and we're not merging - # index with column or on both indexes, then use - # the intersection of columns in both frames - on_names = set(self.lhs._data) & set(self.rhs._data) - left_keys = [_Indexer(name=on, column=True) for on in on_names] - right_keys = [_Indexer(name=on, column=True) for on in on_names] - - if len(left_keys) != len(right_keys): - raise ValueError( - "Merge operands must have same number of join key columns" - ) - - self._left_keys, self._right_keys = left_keys, right_keys - def _merge_results(self, left_result: Frame, right_result: Frame): # Merge the Frames `left_result` and `right_result` into a single # `Frame`, suffixing column names if necessary. From 35e66bce325ee38be844e5a2cb9a1af1d6b12354 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 16:10:22 -0700 Subject: [PATCH 20/37] Define joiner as class rather than instance variable. --- python/cudf/cudf/core/join/join.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index cbda5f11a79..e759c0046c5 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -1,7 +1,6 @@ # Copyright (c) 2020-2021, NVIDIA CORPORATION. from __future__ import annotations -import functools from typing import TYPE_CHECKING, Callable import cudf @@ -30,7 +29,7 @@ class Merge: # join key. The `joiner` returns a tuple of two Columns # representing the rows to gather from the left- and right- side # tables respectively. - _joiner: Callable + _joiner: Callable = libcudf.join.join def __init__( self, @@ -92,7 +91,6 @@ def __init__( how=how, suffixes=suffixes, ) - self._joiner = functools.partial(libcudf.join.join, how=how) self.lhs = lhs self.rhs = rhs @@ -434,11 +432,7 @@ def _validate_merge_params( class MergeSemi(Merge): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._joiner = functools.partial( - libcudf.join.semi_join, how=kwargs["how"] - ) + _joiner: Callable = libcudf.join.semi_join def _merge_results(self, lhs: Frame, rhs: Frame): # semi-join result includes only lhs columns From 0ca291e8d1dedf64717719353de070f2e1f01a72 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 16:41:33 -0700 Subject: [PATCH 21/37] Remove left_on and right_on to rely on left_keys and right_keys entirely, and take advantage to always precompute key cols with identical names. --- python/cudf/cudf/core/join/join.py | 75 ++++++++++++++---------------- 1 file changed, 34 insertions(+), 41 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index e759c0046c5..a581c34f112 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -96,18 +96,17 @@ def __init__( self.rhs = rhs self.how = how self.sort = sort - if suffixes: - self.lsuffix, self.rsuffix = suffixes + self.lsuffix, self.rsuffix = suffixes # At this point validation guarantees that if on is not None we # don't have any other args, so we can apply it directly to left_on and # right_on. self._using_left_index = bool(left_index) - self.left_on = ( + left_on = ( lhs.index._data.names if left_index else left_on if left_on else on ) self._using_right_index = bool(right_index) - self.right_on = ( + right_on = ( rhs.index._data.names if right_index else right_on @@ -115,22 +114,18 @@ def __init__( else on ) - if self.left_on or self.right_on: + if left_on or right_on: self._left_keys = [ _Indexer(name=on, column=True) if not self._using_left_index and on in self.lhs._data else _Indexer(name=on, index=True) - for on in ( - _coerce_to_tuple(self.left_on) if self.left_on else [] - ) + for on in (_coerce_to_tuple(left_on) if left_on else []) ] self._right_keys = [ _Indexer(name=on, column=True) if not self._using_right_index and on in self.rhs._data else _Indexer(name=on, index=True) - for on in ( - _coerce_to_tuple(self.right_on) if self.right_on else [] - ) + for on in (_coerce_to_tuple(right_on) if right_on else []) ] if len(self._left_keys) != len(self._right_keys): raise ValueError( @@ -157,7 +152,17 @@ def __init__( else: self._out_class = cudf.DataFrame - self._key_columns_with_same_name = on if on else None + self._key_columns_with_same_name = ( + on + if on + else [] + if (self._using_left_index or self._using_right_index) + else [ + lkey.name + for lkey, rkey in zip(self._left_keys, self._right_keys) + if lkey.name == rkey.name + ] + ) def perform_merge(self) -> Frame: # Match the dtypes of the key columns from lhs and rhs @@ -258,17 +263,9 @@ def _merge_results(self, left_result: Frame, right_result: Frame): # - if they are key columns, keep only the left column # - if they are not key columns, use suffixes to differentiate them # in the final result - common_names = set(left_names) & set(right_names) - - if self._key_columns_with_same_name is None: - self._key_columns_with_same_name = [ - lkey.name - for lkey, rkey in zip(self._left_keys, self._right_keys) - if ( - (lkey.index, rkey.index) == (False, False) - and lkey.name == rkey.name - ) - ] + common_names = set(left_result._data.names) & set( + right_result._data.names + ) for name in common_names: if name not in self._key_columns_with_same_name: @@ -278,16 +275,16 @@ def _merge_results(self, left_result: Frame, right_result: Frame): del right_names[name] # Assemble the data columns of the result: - data = left_result._data.__class__() - - for lcol in left_names: - data.set_by_label( - left_names[lcol], left_result._data[lcol], validate=False - ) - for rcol in right_names: - data.set_by_label( - right_names[rcol], right_result._data[rcol], validate=False - ) + data = { + **{ + new_name: left_result._data[orig_name] + for orig_name, new_name in left_names.items() + }, + **{ + new_name: right_result._data[orig_name] + for orig_name, new_name in right_names.items() + }, + } # TODO: There is a bug here, we actually need to pull the index columns # from both if both left_index and right_index were True. @@ -312,14 +309,10 @@ def _sort_result(self, result: Frame) -> Frame: if self._using_left_index and self._using_right_index: if result._index is not None: by.extend(result._index._data.columns) - if not self._using_left_index and self.left_on: - by.extend( - [result._data[col] for col in _coerce_to_tuple(self.left_on)] - ) - if not self._using_right_index and self.right_on: - by.extend( - [result._data[col] for col in _coerce_to_tuple(self.right_on)] - ) + if not self._using_left_index: + by.extend([result._data[col.name] for col in self._left_keys]) + if not self._using_right_index: + by.extend([result._data[col.name] for col in self._right_keys]) if by: to_sort = cudf.DataFrame._from_data(dict(enumerate(by))) sort_order = to_sort.argsort() From 7f122306ce9b3fccc92790c019520b8d9169b5e8 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 17:31:56 -0700 Subject: [PATCH 22/37] Inline frame_select_by_indexers. --- python/cudf/cudf/core/join/_join_helpers.py | 20 +------------------- python/cudf/cudf/core/join/join.py | 5 ++--- 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/python/cudf/cudf/core/join/_join_helpers.py b/python/cudf/cudf/core/join/_join_helpers.py index af8c4075b80..56d26d7912e 100644 --- a/python/cudf/cudf/core/join/_join_helpers.py +++ b/python/cudf/cudf/core/join/_join_helpers.py @@ -3,7 +3,7 @@ import collections import warnings -from typing import TYPE_CHECKING, Any, Iterable, Tuple, cast +from typing import TYPE_CHECKING, Any, Tuple, cast import numpy as np @@ -57,24 +57,6 @@ def set(self, obj: Frame, value: ColumnBase, validate=False): raise KeyError -def _frame_select_by_indexers(frame: Frame, indexers: Iterable[_Indexer]): - # Select columns from the given `Frame` using `indexers`, - # and return a new `Frame`. - index_data = {} - data = {} - - for idx in indexers: - if idx.index: - index_data[idx.name] = idx.get(frame) - else: - data[idx.name] = idx.get(frame) - - return ( - data, - cudf.core.index._index_from_data(index_data) if index_data else None, - ) - - def _match_join_keys( lcol: ColumnBase, rcol: ColumnBase, how: str ) -> Tuple[ColumnBase, ColumnBase]: diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index a581c34f112..93831777805 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -7,7 +7,6 @@ from cudf import _lib as libcudf from cudf.core.join._join_helpers import ( _coerce_to_tuple, - _frame_select_by_indexers, _Indexer, _match_join_keys, ) @@ -179,10 +178,10 @@ def perform_merge(self) -> Frame: right_key.set(rhs, rcol_casted, validate=False) left_table = cudf.core.frame.Frame( - *_frame_select_by_indexers(lhs, self._left_keys) + {idx.name: idx.get(lhs) for idx in self._left_keys}, ) right_table = cudf.core.frame.Frame( - *_frame_select_by_indexers(rhs, self._right_keys) + {idx.name: idx.get(rhs) for idx in self._right_keys}, ) left_rows, right_rows = self._joiner( From 9992be67fd259ec457ffcd0b1924348f1d357201 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 18:08:56 -0700 Subject: [PATCH 23/37] Centralize more of the logic for generating output data and remove some duplicate loops. --- python/cudf/cudf/core/join/join.py | 84 ++++++++++++++---------------- 1 file changed, 38 insertions(+), 46 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 93831777805..bc5019770fd 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -142,6 +142,41 @@ def __init__( _Indexer(name=on, column=True) for on in on_names ] + self.output_lhs = self.lhs.copy(deep=False) + self.output_rhs = self.rhs.copy(deep=False) + + left_join_cols = {} + right_join_cols = {} + + for left_key, right_key in zip(self._left_keys, self._right_keys): + lcol_casted, rcol_casted = _match_join_keys( + left_key.get(self.output_lhs), + right_key.get(self.output_rhs), + how=self.how, + ) + left_join_cols[left_key.name] = lcol_casted + right_join_cols[left_key.name] = rcol_casted + + # Categorical dtypes must be cast back from the underlying codes + # type that was returned by _match_join_keys. + if ( + self.how == "inner" + and isinstance( + left_key.get(self.lhs).dtype, cudf.CategoricalDtype + ) + and isinstance( + right_key.get(self.rhs).dtype, cudf.CategoricalDtype + ) + ): + lcol_casted = lcol_casted.astype("category") + rcol_casted = rcol_casted.astype("category") + + left_key.set(self.output_lhs, lcol_casted, validate=False) + right_key.set(self.output_rhs, rcol_casted, validate=False) + + self._left_join_table = cudf.core.frame.Frame(left_join_cols) + self._right_join_table = cudf.core.frame.Frame(right_join_cols) + if isinstance(lhs, cudf.MultiIndex) or isinstance( rhs, cudf.MultiIndex ): @@ -164,63 +199,20 @@ def __init__( ) def perform_merge(self) -> Frame: - # Match the dtypes of the key columns from lhs and rhs - lhs = self.lhs.copy(deep=False) - rhs = self.rhs.copy(deep=False) - for left_key, right_key in zip(self._left_keys, self._right_keys): - lcol, rcol = left_key.get(lhs), right_key.get(rhs) - lcol_casted, rcol_casted = _match_join_keys( - lcol, rcol, how=self.how - ) - if lcol is not lcol_casted: - left_key.set(lhs, lcol_casted, validate=False) - if rcol is not rcol_casted: - right_key.set(rhs, rcol_casted, validate=False) - - left_table = cudf.core.frame.Frame( - {idx.name: idx.get(lhs) for idx in self._left_keys}, - ) - right_table = cudf.core.frame.Frame( - {idx.name: idx.get(rhs) for idx in self._right_keys}, - ) - left_rows, right_rows = self._joiner( - left_table, right_table, how=self.how, + self._left_join_table, self._right_join_table, how=self.how, ) - # For inner joins, any categorical keys in `self.lhs` and `self.rhs` - # were casted to their category type to produce `lhs` and `rhs`. - # Here, we cast them back. - if self.how == "inner": - for left_key, right_key in zip(self._left_keys, self._right_keys): - # Note that we check self.lhs and self.rhs rather than lhs and - # rhs here because _match_key_dtypes has already modified them. - if isinstance( - left_key.get(self.lhs).dtype, cudf.CategoricalDtype - ) and isinstance( - right_key.get(self.rhs).dtype, cudf.CategoricalDtype - ): - left_key.set( - lhs, - left_key.get(lhs).astype("category"), - validate=False, - ) - right_key.set( - rhs, - right_key.get(rhs).astype("category"), - validate=False, - ) - left_result = cudf.core.frame.Frame() right_result = cudf.core.frame.Frame() gather_index = self._using_left_index or self._using_right_index if left_rows is not None: - left_result = lhs._gather( + left_result = self.output_lhs._gather( left_rows, nullify=True, keep_index=gather_index ) if right_rows is not None: - right_result = rhs._gather( + right_result = self.output_rhs._gather( right_rows, nullify=True, keep_index=gather_index ) From ac621f20fecd03a947fe75923c0b6259b20bd3d0 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 21 Oct 2021 18:14:04 -0700 Subject: [PATCH 24/37] Remove one more use of _Indexer.get. --- python/cudf/cudf/core/join/join.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index bc5019770fd..ff9b74c54dc 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -149,11 +149,9 @@ def __init__( right_join_cols = {} for left_key, right_key in zip(self._left_keys, self._right_keys): - lcol_casted, rcol_casted = _match_join_keys( - left_key.get(self.output_lhs), - right_key.get(self.output_rhs), - how=self.how, - ) + lcol = left_key.get(self.output_lhs) + rcol = right_key.get(self.output_rhs) + lcol_casted, rcol_casted = _match_join_keys(lcol, rcol, self.how) left_join_cols[left_key.name] = lcol_casted right_join_cols[left_key.name] = rcol_casted @@ -161,12 +159,8 @@ def __init__( # type that was returned by _match_join_keys. if ( self.how == "inner" - and isinstance( - left_key.get(self.lhs).dtype, cudf.CategoricalDtype - ) - and isinstance( - right_key.get(self.rhs).dtype, cudf.CategoricalDtype - ) + and isinstance(lcol.dtype, cudf.CategoricalDtype) + and isinstance(rcol.dtype, cudf.CategoricalDtype) ): lcol_casted = lcol_casted.astype("category") rcol_casted = rcol_casted.astype("category") @@ -233,6 +227,8 @@ def _merge_results(self, left_result: Frame, right_result: Frame): # is simply dropped. For outer joins, the two key columns are combined # by filling nulls in the left key column with corresponding values # from the right key column: + # TODO: Move this to the creation of the output_lhs in the constructor + # as well. if self.how == "outer": for lkey, rkey in zip(self._left_keys, self._right_keys): if lkey.name == rkey.name: From e35309667242abcd865a5caa70473039bc986ae3 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sun, 24 Oct 2021 21:16:03 -0700 Subject: [PATCH 25/37] Remove lhs/rhs members. --- python/cudf/cudf/core/join/join.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index ff9b74c54dc..71eb8b61280 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -91,8 +91,6 @@ def __init__( suffixes=suffixes, ) - self.lhs = lhs - self.rhs = rhs self.how = how self.sort = sort self.lsuffix, self.rsuffix = suffixes @@ -116,13 +114,13 @@ def __init__( if left_on or right_on: self._left_keys = [ _Indexer(name=on, column=True) - if not self._using_left_index and on in self.lhs._data + if not self._using_left_index and on in lhs._data else _Indexer(name=on, index=True) for on in (_coerce_to_tuple(left_on) if left_on else []) ] self._right_keys = [ _Indexer(name=on, column=True) - if not self._using_right_index and on in self.rhs._data + if not self._using_right_index and on in rhs._data else _Indexer(name=on, index=True) for on in (_coerce_to_tuple(right_on) if right_on else []) ] @@ -134,7 +132,7 @@ def __init__( # if `on` is not provided and we're not merging # index with column or on both indexes, then use # the intersection of columns in both frames - on_names = set(self.lhs._data) & set(self.rhs._data) + on_names = set(lhs._data) & set(rhs._data) self._left_keys = [ _Indexer(name=on, column=True) for on in on_names ] @@ -142,8 +140,8 @@ def __init__( _Indexer(name=on, column=True) for on in on_names ] - self.output_lhs = self.lhs.copy(deep=False) - self.output_rhs = self.rhs.copy(deep=False) + self.output_lhs = lhs.copy(deep=False) + self.output_rhs = rhs.copy(deep=False) left_join_cols = {} right_join_cols = {} From 7452d70ba094b76d0b744399ee38696bfbdf2f7d Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sun, 24 Oct 2021 22:05:43 -0700 Subject: [PATCH 26/37] Remove unnecessary host copy in tests. --- python/cudf/cudf/tests/test_joining.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python/cudf/cudf/tests/test_joining.py b/python/cudf/cudf/tests/test_joining.py index 1798a3be0e0..fdd3f6ba87a 100644 --- a/python/cudf/cudf/tests/test_joining.py +++ b/python/cudf/cudf/tests/test_joining.py @@ -227,10 +227,7 @@ def test_dataframe_join_combine_cats(): expect.index = expect.index.astype("category") got = lhs.join(rhs, how="outer") - # TODO: Remove copying to host - # after https://github.com/rapidsai/cudf/issues/5676 - # is implemented - assert_eq(expect.index.sort_values(), got.index.to_pandas().sort_values()) + assert_eq(expect.index.sort_values(), got.index.sort_values()) @pytest.mark.parametrize("how", ["left", "right", "inner", "outer"]) From 8cca32ce595187c4a90bcc3741c8859454435f40 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 25 Oct 2021 10:03:06 -0700 Subject: [PATCH 27/37] Subclass _Indexer for columns and indexes rather than lumping all functionality into one class. --- python/cudf/cudf/core/join/_join_helpers.py | 35 ++++++++++----------- python/cudf/cudf/core/join/join.py | 19 +++++------ 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/python/cudf/cudf/core/join/_join_helpers.py b/python/cudf/cudf/core/join/_join_helpers.py index 56d26d7912e..6dec0b10273 100644 --- a/python/cudf/cudf/core/join/_join_helpers.py +++ b/python/cudf/cudf/core/join/_join_helpers.py @@ -29,32 +29,29 @@ class _Indexer: # >>> _Indexer("a", column=True).get(df) # returns column "a" of df # >>> _Indexer("b", index=True).get(df) # returns index level "b" of df - def __init__(self, name: Any, column=False, index=False): - if column and index: - raise ValueError("Cannot specify both column and index") + def __init__(self, name: Any): self.name = name - self.column, self.index = column, index + +class _ColumnIndexer(_Indexer): def get(self, obj: Frame) -> ColumnBase: - # get the column from `obj` - if self.column: - return obj._data[self.name] - else: - if obj._index is not None: - return obj._index._data[self.name] + return obj._data[self.name] + + def set(self, obj: Frame, value: ColumnBase, validate=False): + obj._data.set_by_label(self.name, value, validate=validate) + + +class _IndexIndexer(_Indexer): + def get(self, obj: Frame) -> ColumnBase: + if obj._index is not None: + return obj._index._data[self.name] raise KeyError def set(self, obj: Frame, value: ColumnBase, validate=False): - # set the colum in `obj` - if self.column: - obj._data.set_by_label(self.name, value, validate=validate) + if obj._index is not None: + obj._index._data.set_by_label(self.name, value, validate=validate) else: - if obj._index is not None: - obj._index._data.set_by_label( - self.name, value, validate=validate - ) - else: - raise KeyError + raise KeyError def _match_join_keys( diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 71eb8b61280..bc14c21e09d 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -7,7 +7,8 @@ from cudf import _lib as libcudf from cudf.core.join._join_helpers import ( _coerce_to_tuple, - _Indexer, + _ColumnIndexer, + _IndexIndexer, _match_join_keys, ) @@ -113,15 +114,15 @@ def __init__( if left_on or right_on: self._left_keys = [ - _Indexer(name=on, column=True) + _ColumnIndexer(name=on) if not self._using_left_index and on in lhs._data - else _Indexer(name=on, index=True) + else _IndexIndexer(name=on) for on in (_coerce_to_tuple(left_on) if left_on else []) ] self._right_keys = [ - _Indexer(name=on, column=True) + _ColumnIndexer(name=on) if not self._using_right_index and on in rhs._data - else _Indexer(name=on, index=True) + else _IndexIndexer(name=on) for on in (_coerce_to_tuple(right_on) if right_on else []) ] if len(self._left_keys) != len(self._right_keys): @@ -133,12 +134,8 @@ def __init__( # index with column or on both indexes, then use # the intersection of columns in both frames on_names = set(lhs._data) & set(rhs._data) - self._left_keys = [ - _Indexer(name=on, column=True) for on in on_names - ] - self._right_keys = [ - _Indexer(name=on, column=True) for on in on_names - ] + self._left_keys = [_ColumnIndexer(name=on) for on in on_names] + self._right_keys = [_ColumnIndexer(name=on) for on in on_names] self.output_lhs = lhs.copy(deep=False) self.output_rhs = rhs.copy(deep=False) From 47311af91aae0999b1c3a4fc94de97e4bbd5d119 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 25 Oct 2021 10:04:13 -0700 Subject: [PATCH 28/37] Rename output_lhs to lhs and output_rhs to rhs. --- python/cudf/cudf/core/join/join.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index bc14c21e09d..3f16b2f2aab 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -92,6 +92,8 @@ def __init__( suffixes=suffixes, ) + self.lhs = lhs.copy(deep=False) + self.rhs = rhs.copy(deep=False) self.how = how self.sort = sort self.lsuffix, self.rsuffix = suffixes @@ -137,15 +139,12 @@ def __init__( self._left_keys = [_ColumnIndexer(name=on) for on in on_names] self._right_keys = [_ColumnIndexer(name=on) for on in on_names] - self.output_lhs = lhs.copy(deep=False) - self.output_rhs = rhs.copy(deep=False) - left_join_cols = {} right_join_cols = {} for left_key, right_key in zip(self._left_keys, self._right_keys): - lcol = left_key.get(self.output_lhs) - rcol = right_key.get(self.output_rhs) + lcol = left_key.get(self.lhs) + rcol = right_key.get(self.rhs) lcol_casted, rcol_casted = _match_join_keys(lcol, rcol, self.how) left_join_cols[left_key.name] = lcol_casted right_join_cols[left_key.name] = rcol_casted @@ -160,8 +159,8 @@ def __init__( lcol_casted = lcol_casted.astype("category") rcol_casted = rcol_casted.astype("category") - left_key.set(self.output_lhs, lcol_casted, validate=False) - right_key.set(self.output_rhs, rcol_casted, validate=False) + left_key.set(self.lhs, lcol_casted, validate=False) + right_key.set(self.rhs, rcol_casted, validate=False) self._left_join_table = cudf.core.frame.Frame(left_join_cols) self._right_join_table = cudf.core.frame.Frame(right_join_cols) @@ -197,11 +196,11 @@ def perform_merge(self) -> Frame: gather_index = self._using_left_index or self._using_right_index if left_rows is not None: - left_result = self.output_lhs._gather( + left_result = self.lhs._gather( left_rows, nullify=True, keep_index=gather_index ) if right_rows is not None: - right_result = self.output_rhs._gather( + right_result = self.rhs._gather( right_rows, nullify=True, keep_index=gather_index ) @@ -222,8 +221,6 @@ def _merge_results(self, left_result: Frame, right_result: Frame): # is simply dropped. For outer joins, the two key columns are combined # by filling nulls in the left key column with corresponding values # from the right key column: - # TODO: Move this to the creation of the output_lhs in the constructor - # as well. if self.how == "outer": for lkey, rkey in zip(self._left_keys, self._right_keys): if lkey.name == rkey.name: From 086d90db750d6c0ecb15a18f7243c2b0ed443b53 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 25 Oct 2021 10:20:13 -0700 Subject: [PATCH 29/37] Various internal simplifications. --- python/cudf/cudf/core/join/join.py | 87 +++++++++++++++--------------- 1 file changed, 43 insertions(+), 44 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 3f16b2f2aab..9bbbc608845 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -139,6 +139,28 @@ def __init__( self._left_keys = [_ColumnIndexer(name=on) for on in on_names] self._right_keys = [_ColumnIndexer(name=on) for on in on_names] + if isinstance(lhs, cudf.MultiIndex) or isinstance( + rhs, cudf.MultiIndex + ): + self._out_class = cudf.MultiIndex + elif isinstance(lhs, cudf.BaseIndex): + self._out_class = lhs.__class__ + else: + self._out_class = cudf.DataFrame + + self._key_columns_with_same_name = ( + on + if on + else [] + if (self._using_left_index or self._using_right_index) + else [ + lkey.name + for lkey, rkey in zip(self._left_keys, self._right_keys) + if lkey.name == rkey.name + ] + ) + + def perform_merge(self) -> Frame: left_join_cols = {} right_join_cols = {} @@ -162,47 +184,24 @@ def __init__( left_key.set(self.lhs, lcol_casted, validate=False) right_key.set(self.rhs, rcol_casted, validate=False) - self._left_join_table = cudf.core.frame.Frame(left_join_cols) - self._right_join_table = cudf.core.frame.Frame(right_join_cols) - - if isinstance(lhs, cudf.MultiIndex) or isinstance( - rhs, cudf.MultiIndex - ): - self._out_class = cudf.MultiIndex - elif isinstance(lhs, cudf.BaseIndex): - self._out_class = lhs.__class__ - else: - self._out_class = cudf.DataFrame - - self._key_columns_with_same_name = ( - on - if on - else [] - if (self._using_left_index or self._using_right_index) - else [ - lkey.name - for lkey, rkey in zip(self._left_keys, self._right_keys) - if lkey.name == rkey.name - ] - ) - - def perform_merge(self) -> Frame: left_rows, right_rows = self._joiner( - self._left_join_table, self._right_join_table, how=self.how, + cudf.core.frame.Frame(left_join_cols), + cudf.core.frame.Frame(right_join_cols), + how=self.how, ) - left_result = cudf.core.frame.Frame() - right_result = cudf.core.frame.Frame() - gather_index = self._using_left_index or self._using_right_index - if left_rows is not None: - left_result = self.lhs._gather( - left_rows, nullify=True, keep_index=gather_index - ) - if right_rows is not None: - right_result = self.rhs._gather( - right_rows, nullify=True, keep_index=gather_index - ) + + left_result = ( + self.lhs._gather(left_rows, nullify=True, keep_index=gather_index) + if left_rows is not None + else cudf.core.frame.Frame() + ) + right_result = ( + self.rhs._gather(right_rows, nullify=True, keep_index=gather_index) + if right_rows is not None + else cudf.core.frame.Frame() + ) result = self._out_class._from_data( *self._merge_results(left_result, right_result) @@ -217,10 +216,10 @@ def _merge_results(self, left_result: Frame, right_result: Frame): # `Frame`, suffixing column names if necessary. # If two key columns have the same name, a single output column appears - # in the result. For all other join types, the key column from the rhs - # is simply dropped. For outer joins, the two key columns are combined - # by filling nulls in the left key column with corresponding values - # from the right key column: + # in the result. For all non-outer join types, the key column from the + # rhs is simply dropped. For outer joins, the two key columns are + # combined by filling nulls in the left key column with corresponding + # values from the right key column: if self.how == "outer": for lkey, rkey in zip(self._left_keys, self._right_keys): if lkey.name == rkey.name: @@ -247,11 +246,11 @@ def _merge_results(self, left_result: Frame, right_result: Frame): ) for name in common_names: - if name not in self._key_columns_with_same_name: + if name in self._key_columns_with_same_name: + del right_names[name] + else: left_names[name] = f"{name}{self.lsuffix}" right_names[name] = f"{name}{self.rsuffix}" - else: - del right_names[name] # Assemble the data columns of the result: data = { From c4bd790cfe1f643716b6c6bc0e7d6f8bd260462a Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 25 Oct 2021 11:01:51 -0700 Subject: [PATCH 30/37] Clean up output column renaming logic. --- python/cudf/cudf/core/join/join.py | 59 ++++++++++++------------------ 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 9bbbc608845..03982074103 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -149,15 +149,17 @@ def __init__( self._out_class = cudf.DataFrame self._key_columns_with_same_name = ( - on + set(_coerce_to_tuple(on)) if on - else [] + else set() if (self._using_left_index or self._using_right_index) - else [ - lkey.name - for lkey, rkey in zip(self._left_keys, self._right_keys) - if lkey.name == rkey.name - ] + else set( + [ + lkey.name + for lkey, rkey in zip(self._left_keys, self._right_keys) + if lkey.name == rkey.name + ] + ) ) def perform_merge(self) -> Frame: @@ -230,40 +232,27 @@ def _merge_results(self, left_result: Frame, right_result: Frame): validate=False, ) - # Compute the result column names: - # left_names and right_names will be a mappings of input column names - # to the corresponding names in the final result. - left_names = dict(zip(left_result._data, left_result._data)) - right_names = dict(zip(right_result._data, right_result._data)) - - # For any columns from left_result and right_result that have the same - # name: - # - if they are key columns, keep only the left column - # - if they are not key columns, use suffixes to differentiate them - # in the final result + # All columns from the left table make it into the output. Non-key + # columns that share a name with a column in the right table are + # suffixed with the provided suffix. common_names = set(left_result._data.names) & set( right_result._data.names ) - - for name in common_names: - if name in self._key_columns_with_same_name: - del right_names[name] - else: - left_names[name] = f"{name}{self.lsuffix}" - right_names[name] = f"{name}{self.rsuffix}" - - # Assemble the data columns of the result: + cols_to_suffix = common_names - self._key_columns_with_same_name data = { - **{ - new_name: left_result._data[orig_name] - for orig_name, new_name in left_names.items() - }, - **{ - new_name: right_result._data[orig_name] - for orig_name, new_name in right_names.items() - }, + (f"{name}{self.lsuffix}" if name in cols_to_suffix else name): col + for name, col in left_result._data.items() } + # The right table follows the same rule as the left table except that + # key columns from the right table are removed. + for name, col in right_result._data.items(): + if name in common_names: + if name not in self._key_columns_with_same_name: + data[f"{name}{self.rsuffix}"] = col + else: + data[name] = col + # TODO: There is a bug here, we actually need to pull the index columns # from both if both left_index and right_index were True. if self._using_right_index: From 3d7ff7f34a3358271e4464422b8d9877c6b07d59 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 25 Oct 2021 11:05:25 -0700 Subject: [PATCH 31/37] Minor comment cleanup. --- python/cudf/cudf/core/join/join.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 03982074103..2a00e0913a2 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -301,10 +301,9 @@ def _validate_merge_params( how, suffixes, ): - """ - Error for various invalid combinations of merge input parameters - """ - # must actually support the requested merge type + # Error for various invalid combinations of merge input parameters + + # We must actually support the requested merge type if how not in {"left", "inner", "outer", "leftanti", "leftsemi"}: raise NotImplementedError(f"{how} merge not supported yet") From 8e1c9086ce7d47236787cd44b65ae2ea9a49cbd0 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 25 Oct 2021 11:35:31 -0700 Subject: [PATCH 32/37] Add tests for newly added checks. --- python/cudf/cudf/tests/test_joining.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/cudf/cudf/tests/test_joining.py b/python/cudf/cudf/tests/test_joining.py index fdd3f6ba87a..ea40db8d92b 100644 --- a/python/cudf/cudf/tests/test_joining.py +++ b/python/cudf/cudf/tests/test_joining.py @@ -2099,3 +2099,20 @@ def test_join_on_index_with_duplicate_names(): got = lhs.join(rhs, how="inner") assert_join_results_equal(expect, got, how="inner") + + +def test_join_redundant_params(): + lhs = cudf.DataFrame( + {"a": [1, 2, 3], "c": [2, 3, 4]}, index=cudf.Index([0, 1, 2], name="c") + ) + rhs = cudf.DataFrame( + {"b": [1, 2, 3]}, index=cudf.Index([0, 1, 2], name="a") + ) + with pytest.raises(ValueError): + lhs.merge(rhs, on="a", left_index=True) + with pytest.raises(ValueError): + lhs.merge(rhs, left_on="a", left_index=True, right_index=True) + with pytest.raises(ValueError): + lhs.merge(rhs, right_on="a", left_index=True, right_index=True) + with pytest.raises(ValueError): + lhs.merge(rhs, left_on="c", right_on="b") From cf9d242ed0c29345754cc20b750c4071f7b45566 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 25 Oct 2021 11:40:50 -0700 Subject: [PATCH 33/37] Remove TODO. --- python/cudf/cudf/core/join/join.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 2a00e0913a2..4d6a5d7f2a7 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -253,8 +253,6 @@ def _merge_results(self, left_result: Frame, right_result: Frame): else: data[name] = col - # TODO: There is a bug here, we actually need to pull the index columns - # from both if both left_index and right_index were True. if self._using_right_index: # right_index and left_on index = left_result._index From 38792617306457d946b7b2b0dbfad5358103ff61 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 25 Oct 2021 14:37:38 -0700 Subject: [PATCH 34/37] Maintain index if 'on' is provided and names index columns. --- python/cudf/cudf/core/join/join.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 4d6a5d7f2a7..73eb97c14be 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -101,11 +101,9 @@ def __init__( # At this point validation guarantees that if on is not None we # don't have any other args, so we can apply it directly to left_on and # right_on. - self._using_left_index = bool(left_index) left_on = ( lhs.index._data.names if left_index else left_on if left_on else on ) - self._using_right_index = bool(right_index) right_on = ( rhs.index._data.names if right_index @@ -117,13 +115,13 @@ def __init__( if left_on or right_on: self._left_keys = [ _ColumnIndexer(name=on) - if not self._using_left_index and on in lhs._data + if on in lhs._data else _IndexIndexer(name=on) for on in (_coerce_to_tuple(left_on) if left_on else []) ] self._right_keys = [ _ColumnIndexer(name=on) - if not self._using_right_index and on in rhs._data + if on in rhs._data else _IndexIndexer(name=on) for on in (_coerce_to_tuple(right_on) if right_on else []) ] @@ -131,6 +129,12 @@ def __init__( raise ValueError( "Merge operands must have same number of join key columns" ) + self._using_left_index = any( + isinstance(idx, _IndexIndexer) for idx in self._left_keys + ) + self._using_right_index = any( + isinstance(idx, _IndexIndexer) for idx in self._right_keys + ) else: # if `on` is not provided and we're not merging # index with column or on both indexes, then use @@ -138,6 +142,8 @@ def __init__( on_names = set(lhs._data) & set(rhs._data) self._left_keys = [_ColumnIndexer(name=on) for on in on_names] self._right_keys = [_ColumnIndexer(name=on) for on in on_names] + self._using_left_index = False + self._using_right_index = False if isinstance(lhs, cudf.MultiIndex) or isinstance( rhs, cudf.MultiIndex From ac9ce949765aef0941dd97033e5b9fbac79eeda6 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 25 Oct 2021 14:38:26 -0700 Subject: [PATCH 35/37] Change dask_cudf test since we now support this behavior (consistent with pandas). --- python/dask_cudf/dask_cudf/tests/test_join.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_join.py b/python/dask_cudf/dask_cudf/tests/test_join.py index 58811ee98fc..90b7c3692ce 100644 --- a/python/dask_cudf/dask_cudf/tests/test_join.py +++ b/python/dask_cudf/dask_cudf/tests/test_join.py @@ -245,8 +245,6 @@ def test_merge_should_fail(): left.merge(right, how="left", on=["b"]) with pytest.raises(KeyError): left.merge(right, how="left", on=["c"]) - with pytest.raises(KeyError): - left.merge(right, how="left", on=["a"]) # Same column names df2["b"] = np.random.randint(0, 12, 12) From 0f0c2d986f66d08c9da23475960b5066cc7dbba6 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 26 Oct 2021 15:53:37 -0700 Subject: [PATCH 36/37] Remove one more unnecessary assertion. --- python/dask_cudf/dask_cudf/tests/test_join.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_join.py b/python/dask_cudf/dask_cudf/tests/test_join.py index 90b7c3692ce..8b2d85c59d7 100644 --- a/python/dask_cudf/dask_cudf/tests/test_join.py +++ b/python/dask_cudf/dask_cudf/tests/test_join.py @@ -252,8 +252,6 @@ def test_merge_should_fail(): with pytest.raises(KeyError): left.merge(right, how="left", on="NonCol") - with pytest.raises(KeyError): - left.merge(right, how="left", on="a") @pytest.mark.parametrize("how", ["inner", "left"]) From cd71262c8ad59e9cdce4826a026bd8d876286a3e Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Fri, 29 Oct 2021 15:45:26 -0700 Subject: [PATCH 37/37] Fix custreamz test. --- python/cudf/cudf/core/join/join.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 73eb97c14be..d3302c567d0 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -101,9 +101,11 @@ def __init__( # At this point validation guarantees that if on is not None we # don't have any other args, so we can apply it directly to left_on and # right_on. + self._using_left_index = bool(left_index) left_on = ( lhs.index._data.names if left_index else left_on if left_on else on ) + self._using_right_index = bool(right_index) right_on = ( rhs.index._data.names if right_index @@ -115,13 +117,13 @@ def __init__( if left_on or right_on: self._left_keys = [ _ColumnIndexer(name=on) - if on in lhs._data + if not self._using_left_index and on in lhs._data else _IndexIndexer(name=on) for on in (_coerce_to_tuple(left_on) if left_on else []) ] self._right_keys = [ _ColumnIndexer(name=on) - if on in rhs._data + if not self._using_right_index and on in rhs._data else _IndexIndexer(name=on) for on in (_coerce_to_tuple(right_on) if right_on else []) ]