Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GroupBy.shuffle_to_chunks() #9320

Merged
merged 61 commits into from
Nov 21, 2024
Merged

Conversation

dcherian
Copy link
Contributor

@dcherian dcherian commented Aug 7, 2024

This adds some new API to shuffle an Xarray object. Shuffling means we sort so that members of a group occur in the same chunk, with the possibility of multiple groups in a single chunk.

gb = ds.groupby(..)
shuffled : Dataset = gb.distributed_shuffle()
shuffled.groupby(...).quantile()

I've also added shuffle_by to DataArray and Dataset. This generalizes sortby, and lets you persist a shuffled Xarray object to disk.

ds.shuffle_by(grouper)

cc @phofl

xarray/core/groupby.py Outdated Show resolved Hide resolved
xarray/core/groupby.py Outdated Show resolved Hide resolved
dcherian added a commit to xarray-contrib/flox that referenced this pull request Aug 14, 2024
dcherian added a commit to xarray-contrib/flox that referenced this pull request Aug 14, 2024
dcherian added a commit to xarray-contrib/flox that referenced this pull request Aug 14, 2024
dcherian added a commit to xarray-contrib/flox that referenced this pull request Aug 14, 2024
* main:
  Revise (pydata#9366)
  Fix rechunking to a frequency with empty bins. (pydata#9364)
  whats-new entry for dropping python 3.9 (pydata#9359)
  drop support for `python=3.9` (pydata#8937)
  Revise (pydata#9357)
  try to fix scheduled hypothesis test (pydata#9358)
* main:
  fix html repr indexes section (pydata#9768)
  Bump pypa/gh-action-pypi-publish from 1.11.0 to 1.12.2 in the actions group (pydata#9763)
  unpin array-api-strict, as issues are resolved upstream (pydata#9762)
  rewrite the `min_deps_check` script (pydata#9754)
  CI runs ruff instead of pep8speaks (pydata#9759)
  Specify copyright holders in main license file (pydata#9756)
  Compress PNG files (pydata#9747)
  Dispatch to Dask if nanquantile is available (pydata#9719)
  Updates to Dask page in Xarray docs (pydata#9495)
  http:// → https:// (pydata#9748)
  Discard useless `!s` conversion in f-string (pydata#9752)
  Apply ruff/flake8-simplify rule SIM401 (pydata#9749)
  Use micromamba 1.5.10 where conda is needed (pydata#9737)
  pin array-api-strict<=2.1 (pydata#9751)
  Reorganise ruff rules (pydata#9738)
  use new conda-forge package pydap-server (pydata#9741)
@dcherian
Copy link
Contributor Author

dcherian commented Nov 13, 2024

We could also just have GroupBy.shuffle() -> Dataset | DataArray.

Then

# Save the groupby kwargs since we need it twice
groupers = {"label": UniqueGrouper()}

# get the shuffled dataset
shuffled: Dataset = ds.groupby(groupers).shuffle()

# Now group the shuffled dataset the same way and 
# map a UDF that expects all members of a group in memory at the same time.
shuffled.groupby(groupers).map(UDF)

Alternative names could be GroupBy.as_sorted() -> Dataset | DataArray, GroupBy.sorted_object() -> Dataset | DataArray.


After thinking about it a while, sortby is similar but different w.r.t chunking. It's vectorized indexing with an index you get from calling argsort, and dask/cubed is free to rechunk as they please. For example sortby(dim=UniqueGrouper()) when all(chunksizes["dim"] == 1) is basically free in terms of rechunking, but with a shuffle, you'd get the same labels in the same chunk so that chunking would have to change (assuming duplicated dim labels). We could have sortby a grouper object basically provide np.concatenate(_group_indices) as the indexer.

This is all fairly advanced stuff, so we can start with the smallest possible API surface, which would be GroupBy.shuffle IMO (or any of the other names above)

cc @shoyer

@max-sixty
Copy link
Collaborator

Very much agree with all of your message — sortby being a bit different, starting with GroupBy.shuffle

We could also just have GroupBy.shuffle() -> Dataset | DataArray.

(If it were GroupBy.shuffle() -> GroupBy, then I guess .shuffle_by would be equivalent to .groupby(groupers).shuffle().map(lambda x: x)...)

@shoyer
Copy link
Member

shoyer commented Nov 13, 2024

Would it be too verbose to call this distributed_shuffle? Shuffling just has a very specific meaningful in distributed computing that is quite different from the generic meaning of "shuffle" for rearranging data.

@max-sixty
Copy link
Collaborator

Would it be too verbose to call this distributed_shuffle? Shuffling just has a very specific meaningful in distributed computing that is quite different from the generic meaning of "shuffle" for rearranging data.

For the method is on the GroupBy object, is shuffle less ambiguous? FWIW I had correctly presumed it meant its actual meaning given it was associated with group by

Otherwise we could workshop distributed_shuffle to something terser :)

* main:
  Bump minimum versions (pydata#9796)
  Namespace-aware `xarray.ufuncs` (pydata#9776)
  Add prettier and pygrep hooks to pre-commit hooks (pydata#9644)
  `rolling.construct`: Add `sliding_window_kwargs` to pipe arguments down to `sliding_window_view` (pydata#9720)
  Bump codecov/codecov-action from 4.6.0 to 5.0.2 in the actions group (pydata#9793)
  Buffer types (pydata#9787)
  Add download stats badges (pydata#9786)
  Fix open_mfdataset for list of fsspec files (pydata#9785)
  add 'User-Agent'-header to pooch.retrieve (pydata#9782)
  Optimize `ffill`, `bfill` with dask when `limit` is specified (pydata#9771)
  fix cf decoding of grid_mapping (pydata#9765)
  Allow wrapping `np.ndarray` subclasses (pydata#9760)
  Optimize polyfit (pydata#9766)
  Use `map_overlap` for rolling reductions with Dask (pydata#9770)
@dcherian dcherian changed the title Add GroupBy.shuffle(), Dataset.shuffle_by(), DataArray.shuffle_by() Add GroupBy.distributed_shuffle() Nov 19, 2024
@dcherian
Copy link
Contributor Author

dcherian commented Nov 19, 2024

OK I went with distributed_shuffle returning a shuffled Xarray object. Any further groupby operations will need to repeat the groupby statement:
e.g.

gb = array.groupby_bins("dim_0", bins=bins, **cut_kwargs)

shuffled: DataArray = gb.distributed_shuffle()

gb = shuffle.groupby_bins("dim_0", bins=bins, **cut_kwargs)

I couldn't think of a shorter name, but happy to change it.

@dcherian
Copy link
Contributor Author

how about shuffle_to_chunks?

@max-sixty
Copy link
Collaborator

Looks good! (No strong view on the exact name...)

@dcherian dcherian changed the title Add GroupBy.distributed_shuffle() Add GroupBy.shuffle_to_chunks() Nov 20, 2024
@dcherian
Copy link
Contributor Author

dcherian commented Nov 20, 2024

OK I'm going with shuffle_to_chunks. Will merge in ~24 hours. It's been around a while now :)

@dcherian dcherian added the plan to merge Final call for comments label Nov 20, 2024
@dcherian dcherian mentioned this pull request Nov 21, 2024
6 tasks
@shoyer
Copy link
Member

shoyer commented Nov 21, 2024 via email

@max-sixty
Copy link
Collaborator

Really like the new name!

@dcherian
Copy link
Contributor Author

Thanks for the input!

@dcherian dcherian merged commit ad5c7ed into pydata:main Nov 21, 2024
29 checks passed
@dcherian dcherian deleted the groupby-shuffle branch December 11, 2024 22:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
API design plan to merge Final call for comments topic-chunked-arrays Managing different chunked backends, e.g. dask topic-dask topic-groupby
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants