From f836cefa369efdab884c7e90dd6ae88f0024708a Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 25 Jun 2021 14:28:36 +0200 Subject: [PATCH] Don't sample dict result of a shuffle group when calculating its size The size calculation for shuffle group results is very sensitive to sampling since there may be empty splits skewing the result. See also https://github.com/dask/distributed/issues/4962 --- dask/dataframe/backends.py | 21 ++++++++++++++++++++- dask/sizeof.py | 21 +++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/dask/dataframe/backends.py b/dask/dataframe/backends.py index 1b2a6784ee3..9dca68635e5 100644 --- a/dask/dataframe/backends.py +++ b/dask/dataframe/backends.py @@ -12,6 +12,8 @@ union_categoricals, ) +from dask.sizeof import SimpleSizeof, sizeof + from ..utils import is_arraylike, typename from ._compat import PANDAS_GT_100 from .core import DataFrame, Index, Scalar, Series, _Frame @@ -341,6 +343,23 @@ def hash_object_pandas( ) +class ShuffleGroupResult(SimpleSizeof, dict): + def __sizeof__(self) -> int: + """ + The result of the shuffle split are typically small dictionaries + (#keys << 100; typically <= 32) The splits are often non-uniformly + distributed. Some of the splits may even be empty. Sampling the + dictionary for size estimation can cause severe errors. + + See also https://github.com/dask/distributed/issues/4962 + """ + total_size = super().__sizeof__() + for k, df in self.items(): + total_size += sizeof(k) + total_size += sizeof(df) + return total_size + + @group_split_dispatch.register((pd.DataFrame, pd.Series, pd.Index)) def group_split_pandas(df, c, k, ignore_index=False): indexer, locations = pd._libs.algos.groupsort_indexer( @@ -352,7 +371,7 @@ def group_split_pandas(df, c, k, ignore_index=False): df2.iloc[a:b].reset_index(drop=True) if ignore_index else df2.iloc[a:b] for a, b in zip(locations[:-1], locations[1:]) ] - return dict(zip(range(k), parts)) + return ShuffleGroupResult(zip(range(k), parts)) @concat_dispatch.register((pd.DataFrame, pd.Series, pd.Index)) diff --git a/dask/sizeof.py b/dask/sizeof.py index 570b62513b7..686c8aac1a9 100644 --- a/dask/sizeof.py +++ b/dask/sizeof.py @@ -59,6 +59,27 @@ def sizeof_python_collection(seq): return getsizeof(seq) + sum(map(sizeof, seq)) +class SimpleSizeof: + """Sentinel class to mark a class to be skipped by the dispatcher. This only + works if this sentinel mixin is first in the mro. + + Examples + -------- + + >>> class TheAnswer(SimpleSizeof): + ... def __sizeof__(self): + ... return 42 + + >>> assert sizeof(TheAnswer()) == 42 + + """ + + +@sizeof.register(SimpleSizeof) +def sizeof_blocked(d): + return getsizeof(d) + + @sizeof.register(dict) def sizeof_python_dict(d): return (