Skip to content

Commit

Permalink
👽️ Align rows_*() verbs to align with dplyr 1.1.3 (pwwang/datar#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwwang committed Sep 14, 2023
1 parent 177c6b3 commit e99d474
Show file tree
Hide file tree
Showing 2 changed files with 301 additions and 78 deletions.
167 changes: 108 additions & 59 deletions datar_pandas/api/dplyr/rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
https://github.com/tidyverse/dplyr/blob/master/R/rows.R
"""
from typing import Tuple
import numpy as np
from datar.core.utils import logger
from datar.core.utils import logger, arg_match
from datar.apis.dplyr import (
bind_rows,
left_join,
coalesce,
rows_insert,
rows_append,
rows_update,
rows_patch,
rows_upsert,
Expand All @@ -19,119 +20,154 @@
from ...pandas import DataFrame
from ...common import is_scalar, setdiff
from ...contexts import Context
from ..tibble.verbs import rownames_to_column

_meta_args = {"__ast_fallback": "normal", "__backend": "pandas"}


@rows_insert.register(DataFrame, context=Context.EVAL, backend="pandas")
def _rows_insert(x, y, by=None, copy=True):
def _rows_insert(
x,
y,
by=None,
conflict="error",
**kwargs,
):
if kwargs: # pragma: no cover
raise ValueError("Unsupported arguments: %s" % kwargs.keys())

conflict = arg_match(conflict, "conflict", ["error", "ignore"])

key = _rows_check_key(by, x, y)
_rows_check_key_df(x, key, df_name="x")
_rows_check_key_df(y, key, df_name="y")

idx = _rows_match(y[key], x[key])
bad = ~pd.isnull(idx)
if any(bad):
idx_x, idx_y = _rows_match(x[key], y[key])
if idx_x.size > 0 and conflict == "error":
raise ValueError("Attempting to insert duplicate rows.")

return bind_rows(x, y, _copy=copy, **_meta_args)
idx_y = np.isin(y.index, idx_y, invert=True)
return bind_rows(x, y.loc[idx_y, :], **_meta_args)


@rows_append.register(DataFrame, context=Context.EVAL, backend="pandas")
def _rows_append(x, y, **kwargs):
if kwargs: # pragma: no cover
raise ValueError("Unsupported arguments: %s" % kwargs.keys())

_rows_check_key_df(x, y.columns, df_name="x")
return bind_rows(x, y, **_meta_args)


@rows_update.register(DataFrame, context=Context.EVAL, backend="pandas")
def _rows_update(x, y, by=None, copy=True):
def _rows_update(x, y, by=None, unmatched="error", **kwargs):
if kwargs: # pragma: no cover
raise ValueError("Unsupported arguments: %s" % kwargs.keys())

unmatched = arg_match(unmatched, "unmatched", ["error", "ignore"])

key = _rows_check_key(by, x, y)
_rows_check_key_df(x, key, df_name="x")
_rows_check_key_df(y, key, df_name="y")

idx = _rows_match(y[key], x[key])
bad = pd.isnull(idx)
if any(bad):
raise ValueError("Attempting to update missing rows.")
idx_x, idx_y = _rows_match(x[key], y[key])

idx = idx.astype(int)

if copy:
x = x.copy()
if y.index.difference(idx_y).size > 0 and unmatched == "error":
raise ValueError("Attempting to update missing rows.")

# Join at the beginning? NaNs will be produced and dtypes will be changed
# in y
# Try it in pandas2
y_joined = left_join(x.loc[idx, key], y, by=key, **_meta_args).set_index(
idx
)
if np.unique(idx_x).size < idx_x.size:
raise ValueError("`y` key values must be unique.")

x.loc[idx, y.columns] = y_joined
x = x.copy()
x.loc[idx_x, y.columns] = y.loc[idx_y, :].values
return x


@rows_patch.register(DataFrame, context=Context.EVAL, backend="pandas")
def _rows_patch(x, y, by=None, copy=True):
def _rows_patch(x, y, by=None, unmatched="error", **kwargs):
if kwargs: # pragma: no cover
raise ValueError("Unsupported arguments: %s" % kwargs.keys())

unmatched = arg_match(unmatched, "unmatched", ["error", "ignore"])

key = _rows_check_key(by, x, y)
_rows_check_key_df(x, key, df_name="x")
_rows_check_key_df(y, key, df_name="y")

idx = _rows_match(y[key], x[key])
bad = pd.isnull(idx)
if any(bad):
idx_x, idx_y = _rows_match(x[key], y[key])

if idx_x.size == 0:
raise ValueError("Attempting to patch missing rows.")

new_data = []
for col in y.columns:
new_data.append(coalesce(x.loc[idx, col].values, y[col]))
if y.index.difference(idx_y).size > 0 and unmatched == "error":
raise ValueError("`y` must contain keys that already exist in `x`.")

if copy:
x = x.copy()
x.loc[idx, y.columns] = np.array(new_data).T
return x
if np.unique(idx_x).size < idx_x.size:
raise ValueError("`y` key values must be unique.")

z = x.copy()

other_cols = y.columns.difference(key)
z.loc[idx_x, other_cols] = coalesce(
x.loc[idx_x, other_cols],
y.loc[idx_y, other_cols].set_index(idx_x),
)

for col in other_cols:
z[col] = z[col].astype(x[col].dtype)

return z


@rows_upsert.register(DataFrame, context=Context.EVAL, backend="pandas")
def _rows_upsert(x, y, by=None, copy=True):
def _rows_upsert(x, y, by=None):
key = _rows_check_key(by, x, y)
_rows_check_key_df(x, key, df_name="x")
_rows_check_key_df(y, key, df_name="y")

idx = _rows_match(y[key], x[key])
new = pd.isnull(idx)
# idx of x
idx_existing = idx[~new]
idx_x, idx_y = _rows_match(x[key], y[key])

if np.unique(idx_x).size < idx_x.size:
raise ValueError("`y` key values must be unique.")

x.loc[idx_existing, y.columns] = y.loc[~new].values
return bind_rows(x, y.loc[new], _copy=copy, **_meta_args)
x = x.copy()
x.loc[idx_x, y.columns] = y.loc[idx_y, :].values
return bind_rows(x, y.loc[~y.index.isin(idx_y)], **_meta_args)


@rows_delete.register(DataFrame, context=Context.EVAL, backend="pandas")
def _rows_delete(
x,
y,
by=None,
copy=True,
unmatched="error",
**kwargs,
):
key = _rows_check_key(by, x, y)
if kwargs: # pragma: no cover
raise ValueError("Unsupported arguments: %s" % kwargs.keys())

unmatched = arg_match(unmatched, "unmatched", ["error", "ignore"])

key = _rows_check_key(by, x, y, allow_y_extra=True)
_rows_check_key_df(x, key, df_name="x")
_rows_check_key_df(y, key, df_name="y")

extra_cols = setdiff(y.columns, key)
extra_cols = y.columns.difference(key)
if len(extra_cols) > 0:
logger.info("Ignoring extra columns: %s", extra_cols)

idx = _rows_match(y[key], x[key])
bad = pd.isnull(idx)
idx_x, idx_y = _rows_match(x[key], y[key])

if any(bad):
if y.index.difference(idx_y).size > 0 and unmatched == "error":
raise ValueError("Attempting to delete missing rows.")

if copy:
x = x.copy()

return x.loc[~x.index.isin(idx), :]
x = x.copy()
return x.loc[~x.index.isin(idx_x), :]


# helpers -----------------------------------------------------------------


def _rows_check_key(by, x, y):
def _rows_check_key(by, x, y, allow_y_extra=False):
"""Check the key and return the valid key"""
if by is None:
by = y.columns[0]
Expand All @@ -145,7 +181,7 @@ def _rows_check_key(by, x, y):
raise ValueError("`by` must be a string or a list of strings.")

bad = setdiff(y.columns, x.columns)
if len(bad) > 0:
if len(bad) > 0 and not allow_y_extra:
raise ValueError("All columns in `y` must exist in `x`.")

return by
Expand All @@ -161,9 +197,22 @@ def _rows_check_key_df(df, by, df_name) -> None:
# raise ValueError(f"`{df_name}` key values are not unique.")


def _rows_match(x: pd.DataFrame, y: pd.DataFrame, for_: str = "x"):
def _rows_match(
x: pd.DataFrame,
y: pd.DataFrame,
) -> Tuple[np.ndarray, np.ndarray]:
"""Mimic vctrs::vec_match"""
id_col = "__id__"
y_with_id = rownames_to_column(y, var=id_col, **_meta_args)

return left_join(x, y_with_id, **_meta_args)[id_col].values
x_id_col = "__x_id__"
y_id_col = "__y_id__"
xin = x.index.name
yin = y.index.name
x.index.name = x_id_col
y.index.name = y_id_col
xi = x.reset_index()
yi = y.reset_index()
x.index.name = xin
y.index.name = yin
merge_col = "__merge__"
df = xi.merge(yi, how="left", indicator=merge_col)
df = df[df[merge_col] == "both"]
return df[x_id_col].values.astype(int), df[y_id_col].values.astype(int)
Loading

0 comments on commit e99d474

Please sign in to comment.