Skip to content

Commit

Permalink
Don't sample dict result of a shuffle group when calculating its size
Browse files Browse the repository at this point in the history
The size calculation for shuffle group results is very sensitive to sampling since there may
be empty splits skewing the result.

See also dask/distributed#4962
  • Loading branch information
fjetter committed Jun 25, 2021
1 parent 0f2ba09 commit b2e5a3f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
21 changes: 20 additions & 1 deletion dask/dataframe/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
union_categoricals,
)

from dask.sizeof import SimpleSizeof, sizeof

from ..utils import is_arraylike, typename
from ._compat import PANDAS_GT_100
from .core import DataFrame, Index, Scalar, Series, _Frame
Expand Down Expand Up @@ -341,6 +343,23 @@ def hash_object_pandas(
)


class ShuffleGroupResult(SimpleSizeof, dict):
def __sizeof__(self) -> int:
"""
The result of the shuffle split are typically small dictionaries
(#keys << 100; typically <= 32) The splits are often non-uniformly
distributed. Some of the splits may even be empty. Sampling the
dictionary for size estimation can cause severe errors.
See also https://github.com/dask/distributed/issues/4962
"""
total_size = super().__sizeof__()
for k, df in self.items():
total_size += sizeof(k)
total_size += sizeof(df)
return total_size


@group_split_dispatch.register((pd.DataFrame, pd.Series, pd.Index))
def group_split_pandas(df, c, k, ignore_index=False):
indexer, locations = pd._libs.algos.groupsort_indexer(
Expand All @@ -352,7 +371,7 @@ def group_split_pandas(df, c, k, ignore_index=False):
df2.iloc[a:b].reset_index(drop=True) if ignore_index else df2.iloc[a:b]
for a, b in zip(locations[:-1], locations[1:])
]
return dict(zip(range(k), parts))
return ShuffleGroupResult(zip(range(k), parts))


@concat_dispatch.register((pd.DataFrame, pd.Series, pd.Index))
Expand Down
23 changes: 23 additions & 0 deletions dask/sizeof.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,29 @@ def sizeof_python_collection(seq):
return getsizeof(seq) + sum(map(sizeof, seq))


class SimpleSizeof:
"""Sentinel class to mark a class to be skipped by the dispatcher. This only
works if this sentinel mixin is first in the mro.
Examples
--------
>>> class TheAnswer(SimpleSizeof):
... def __sizeof__(self):
... # Sizeof always add overhead of an object for GC
... return 42 - sizeof(object())
>>> sizeof(TheAnswer())
42
"""


@sizeof.register(SimpleSizeof)
def sizeof_blocked(d):
return getsizeof(d)


@sizeof.register(dict)
def sizeof_python_dict(d):
return (
Expand Down

0 comments on commit b2e5a3f

Please sign in to comment.