Skip to content

Commit

Permalink
test if all good
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev committed Jan 18, 2024
1 parent 80015eb commit c42e997
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 39 deletions.
7 changes: 0 additions & 7 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
for pandas storage format.
"""
import datetime
from timeit import default_timer as timer
from typing import TYPE_CHECKING, Callable, Dict, Hashable, List, Optional, Union

import numpy as np
Expand Down Expand Up @@ -3791,7 +3790,6 @@ def apply_func(df): # pragma: no cover
key_columns=by,
func=apply_func,
)

# no need aligning columns if there's only one row partition
if add_missing_cats or align_result_columns and result._partitions.shape[0] > 1:
# FIXME: the current reshuffling implementation guarantees us that there's only one column
Expand Down Expand Up @@ -3861,20 +3859,15 @@ def compute_aligned_columns(*dfs, initial_columns=None):

def apply_aligned(df, args, partition_idx):
combined_cols, mask = args
t1 = timer()
if mask is not None and mask.get(partition_idx) is not None:
values = mask[partition_idx]

original_names = df.index.names
# values = pandas.DataFrame(np.NaN, index=values.index, columns=df.columns)
df = pandas.concat([df, values])

print("concating", timer() - t1)
t1 = timer()
if kwargs["sort"]:
# TODO: write search-sorted insertion or sort the result after insertion
df = df.sort_index(axis=0)
print("sorting", timer() - t1)
df.index.names = original_names
if combined_cols is not None:
df = df.reindex(columns=combined_cols)
Expand Down
43 changes: 25 additions & 18 deletions modin/core/dataframe/pandas/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import abc
from collections import namedtuple
from timeit import default_timer as timer
from typing import TYPE_CHECKING, Callable, Optional, Union

import numpy as np
Expand Down Expand Up @@ -531,7 +530,24 @@ def add_missing_categories_to_groupby(
kwargs,
initial_dtypes=None,
):
t1 = timer()
"""
Generate missing categories.
Parameters
----------
dfs : list of pandas.DataFrames
by : list of hashable
operator : callable
initial_columns : pandas.Index
combined_cols : pandas.Index
is_udf_agg : bool
kwargs : dict
initial_dtypes : pandas.Series, optional
Returns
-------
tuple[dict, pandas.Index]
"""
kwargs["observed"] = False
new_combined_cols = combined_cols

Expand All @@ -556,7 +572,6 @@ def add_missing_categories_to_groupby(
}
# if we're grouping on multiple groupers, then the missing categorical values is a
# carthesian product of (actual_missing_categorical_values X all_values_of_another_groupers)
# breakpoint()
complete_index = pandas.MultiIndex.from_product(
[
value.categories.astype(total_level.dtype)
Expand All @@ -575,12 +590,10 @@ def add_missing_categories_to_groupby(
missing_index = total_index.categories.difference(total_index.values)
missing_cats_dtype = {by[0]: pandas.CategoricalDtype(missing_index)}
missing_index.names = by
print("generating missing", timer() - t1)
print(len(missing_index))
t1 = timer()

if len(missing_index) == 0:
return {}, new_combined_cols
# breakpoint()

### At this stage we want to get a fill_value for missing categorical values
if is_udf_agg and isinstance(total_index, pandas.MultiIndex):
# if grouping on multiple columns and aggregating with an UDF, then the
Expand All @@ -606,8 +619,7 @@ def add_missing_categories_to_groupby(
)
empty_df = empty_df.astype(missing_cats_dtype)
missing_values = operator(empty_df.groupby(by, **kwargs))
print("getting fill value", timer() - t1)
t1 = timer()

if is_udf_agg and not isinstance(total_index, pandas.MultiIndex):
missing_values = missing_values.drop(columns=by, errors="ignore")
new_combined_cols = pandas.concat(
Expand All @@ -625,8 +637,7 @@ def add_missing_categories_to_groupby(
missing_values = pandas.DataFrame(
fill_value, index=missing_index, columns=combined_cols
)
print("generating missing values", timer() - t1)
t1 = timer()

# restoring original categorical dtypes for the indices
if isinstance(missing_values.index, pandas.MultiIndex):
# MultiIndex.astype() only takes a single dtype, the only way to cast
Expand All @@ -640,8 +651,7 @@ def add_missing_categories_to_groupby(
# )
else:
missing_values.index = missing_values.index.astype(total_index.dtype)
print("casting to original dtype", timer() - t1)
t1 = timer()

### Then we decide to which missing categorical values should go to which partition
if not kwargs["sort"]:
# If the result is allowed to be unsorted, simply insert all the missing
Expand Down Expand Up @@ -677,14 +687,13 @@ def add_missing_categories_to_groupby(
# doesn't affect the result
bins.append(idx[-1][0] if isinstance(idx, pandas.MultiIndex) else idx[-1])
old_bins_to_new[len(bins)] = offset
# breakpoint()

if len(bins) == 0:
# insert values to the first non-empty partition
return {old_bins_to_new.get(0, 0): missing_values}, new_combined_cols

# we used the very first level of MultiIndex to build bins, meaning that we also have
# to use values of the first index's level for 'digitize'
# breakpoint()
lvl_zero = (
missing_values.index.levels[0]
if isinstance(missing_values.index, pandas.MultiIndex)
Expand All @@ -694,8 +703,7 @@ def add_missing_categories_to_groupby(
part_idx = np.digitize(lvl_zero, bins, right=True)
else:
part_idx = np.searchsorted(bins, lvl_zero)
print("binning", timer() - t1)
t1 = timer()

### In the end we build a dictionary mapping partition index to a dataframe with missing categoricals
### to be inserted into this partition
masks = {}
Expand All @@ -711,5 +719,4 @@ def add_missing_categories_to_groupby(

# Restore the original indexing by adding the amount of skipped missing partitions
masks = {key + old_bins_to_new[key]: value for key, value in masks.items()}
print("generating masks", timer() - t1)
return masks, new_combined_cols
15 changes: 2 additions & 13 deletions modin/pandas/test/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@
]


def df_equals_fillna(df1, df2, fill_value=0):
df_equals(df1.fillna(fill_value), df2.fillna(fill_value))


def modin_groupby_equals_pandas(modin_groupby, pandas_groupby):
eval_general(
modin_groupby, pandas_groupby, lambda grp: grp.indices, comparator=dict_equals
Expand Down Expand Up @@ -456,7 +452,6 @@ def maybe_get_columns(df, by):
lambda df: df.sem(),
modin_df_almost_equals_pandas,
)
# breakpoint()
eval_mean(modin_groupby, pandas_groupby, numeric_only=True)

eval_any(modin_groupby, pandas_groupby)
Expand Down Expand Up @@ -520,20 +515,17 @@ def maybe_get_columns(df, by):
# because of this bug: https://github.com/pandas-dev/pandas/issues/36698
# Modin correctly processes the result, that's why `check_exception_type=None` in some cases
is_pandas_bug_case = not as_index and col1_category and isinstance(func, dict)
# breakpoint()
eval_general(
modin_groupby,
pandas_groupby,
lambda grp: grp.agg(func),
check_exception_type=None if is_pandas_bug_case else True,
comparator=df_equals_fillna,
)
eval_general(
modin_groupby,
pandas_groupby,
lambda grp: grp.aggregate(func),
check_exception_type=None if is_pandas_bug_case else True,
comparator=df_equals_fillna,
)

eval_general(modin_groupby, pandas_groupby, lambda df: df.last())
Expand Down Expand Up @@ -626,7 +618,6 @@ def maybe_get_columns(df, by):
if isinstance(by, list)
else ["col3", "col4"]
)
# breakpoint()
eval___getitem__(modin_groupby, pandas_groupby, non_by_cols)
# When GroupBy.__getitem__ meets an intersection of the selection and 'by' columns
# it throws a warning with the suggested workaround. The following code tests
Expand Down Expand Up @@ -1250,8 +1241,8 @@ def eval_cummin(modin_groupby, pandas_groupby, axis=lib.no_default, numeric_only
)


def eval_apply(modin_groupby, pandas_groupby, func, comparator=df_equals):
comparator(modin_groupby.apply(func), pandas_groupby.apply(func))
def eval_apply(modin_groupby, pandas_groupby, func):
df_equals(modin_groupby.apply(func), pandas_groupby.apply(func))


def eval_dtypes(modin_groupby, pandas_groupby):
Expand Down Expand Up @@ -2984,7 +2975,6 @@ def test_groupby_apply_series_result(modify_config):
np.random.randint(5, 10, size=5), index=[f"s{i+1}" for i in range(5)]
)
df["group"] = [1, 1, 2, 2, 3]
# breakpoint()
# res = df.groupby('group').apply(lambda x: x.name+2)
eval_general(
df, df._to_pandas(), lambda df: df.groupby("group").apply(lambda x: x.name + 2)
Expand Down Expand Up @@ -3236,5 +3226,4 @@ def test_range_groupby_categories(

md_res = func(md_df.groupby(by_cols, observed=observed, as_index=as_index))
pd_res = func(pd_df.groupby(by_cols, observed=observed, as_index=as_index))
# breakpoint()
df_equals(md_res, pd_res)
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tag_prefix =
parentdir_prefix = modin-

[tool:pytest]
addopts =
addopts = --cov-config=setup.cfg --cov=modin --cov-append --cov-report= -m "not exclude_by_default"
xfail_strict=true
markers =
exclude_in_sanity
Expand Down

0 comments on commit c42e997

Please sign in to comment.