Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P Rechunking graph prohibits culling and partial release #8326

Closed
fjetter opened this issue Nov 6, 2023 · 6 comments · Fixed by #8330
Closed

P2P Rechunking graph prohibits culling and partial release #8326

fjetter opened this issue Nov 6, 2023 · 6 comments · Fixed by #8330
Assignees
Labels

Comments

@fjetter
Copy link
Member

fjetter commented Nov 6, 2023

The P2P rechunk graph is similar to the shuffle graph one large globally connected graph. However, many rechunk applications only operate on slices of the data and is often not connecting all in- and output partitions but only planes in the dataset.

Having only slices/planes connected by the graph is beneficial since it allows us to cull the graph, process downstream releaser earlier and release disk spaces sooner.

A real life example showing this effect is using the public ERA5 dataset (see code below) but a simpler variant can be constructed using artificial data.

ERA5 example
import itertools

import coiled
import dask
import distributed
import flox.xarray
import fsspec
import numpy as np
import pandas as pd
import xarray as xr
fs = fsspec.filesystem("s3")
prefix = "s3://era5-pds/zarr"
variables = [
    store.split("/")[-1] for store in fs.glob("era5-pds/zarr/1979/01/data/*.zarr")
]
years = [path.split("/")[-1] for path in fs.glob("era5-pds/zarr/*")]
months = [f"{m:02d}" for m in range(1, 13)]
last_months = [path.split("/")[-1] for path in fs.glob(f"era5-pds/zarr/{years[-1]}/*")]
var = ("precipitation_amount_1hour_Accumulation.zarr",)
var = (variables[0],)
all_stores = [
    "/".join(t)
    for t in itertools.chain(
        itertools.product((prefix,), years[:-1], months, ("data",), var),
        itertools.product((prefix,), years[-2:], last_months, ("data",), var),
    )
]

def preprocess(ds):
    """Edit the dataset so it combines nicely."""
    (time_dim,) = [dim for dim in ds.dims if "time" in dim]
    ds = ds.rename({time_dim: "time"})
    bounds_var = [var for var in ds.data_vars if "bounds" in var]
    ds = ds.drop_vars(bounds_var)
    return ds

ds = xr.open_mfdataset(
    all_stores,
    engine="zarr",
    combine="nested",
    concat_dim="time",
    preprocess=preprocess,
    join="override",
    parallel=True,
)

import dask
dask.config.set({"array.rechunk.method": "p2p"})

def rechunk_to_frequency(ds, **kwargs):
    newchunks = {}
    for dim, freq in kwargs.items():
        newchunks[dim] = tuple(
            ds[dim]
            .copy(data=np.ones(ds[dim].shape, dtype=np.int64))
            .to_dataframe()
            .resample(freq)
            .sum()[dim]
            .values
        )
    return newchunks

newchunks = rechunk_to_frequency(ds, time="A")
rechunked = ds.chunk(**newchunks, lat=75, lon=75)

# Run any kind of reduction on this. This reduces per year
# so every reduction step should only load 1/45 of the data
result = flox.xarray.xarray_reduce(
    rechunked,
    rechunked.time.dt.year,
    func="quantile",
    skipna=False,
    q=0.9,
    method="blockwise",
)

# Even though we're selecting only a thin slices, this is loading the entire data
# since culling is not possible. The entire data also has to be stored to disk temporarily
# such that the cluster has to be really large for this to work even though the actual
# slices should only be a couple GBs
result.isel(year=slice(2)).compute()

A simpler, toy example showing the same principle is just concatenating a single dimension.

import dask.array as da
# This is a simple example where we're just compacting the first dimension
# The ERA5 example simultaneously also splits another dimension but this boils
# down to the same principle
arr = da.random.random((10, 45), chunks=(1, 1))
rechunked = arr.rechunk((-1, 1))
dask.optimize(rechunked[:, :, :2])[0]
import dask
import dask.array as da

arr = da.random.random((10, 45), chunks=(1, 1))
for method in ["p2p", "tasks"]:
    dask.config.set({"array.rechunk.method": method})
    rechunked = arr.rechunk((-1, 1))
    
    len_with_slicing = len(dask.optimize(rechunked[:, :2])[0].__dask_graph__())
    len_without_slicing = len(dask.optimize(rechunked)[0].__dask_graph__())
    print(f"Tasks after slicing\t{method=}\t{len_with_slicing/len_without_slicing*100:2.0f}%")
Tasks after slicing	method='p2p'	96%
Tasks after slicing	method='tasks'	 5%

For P2P to do the same, we should instead of generating one large P2P layer generate many small P2P layers such that we only connect the relevant slices.

@fjetter fjetter added the shuffle label Nov 6, 2023
@hendrikmakait
Copy link
Member

In theory, detecting such split points should be pretty easy to do by walking through the input and output chunks of each individual axis and checking where the chunking matches up. If an axis contains unknown sizes, it must be unchanged, so that's even the best case for us.

@fjetter
Copy link
Member Author

fjetter commented Nov 6, 2023

to be clear, there are also slightly less trivial cases, e.g.

original = (1, 50, 1)
new = (5, 10, -1)

which is...

  • a simple concat on the first dimension
  • a split in the second dimension
  • a full shuffle on the third

but I still would expect this to break apart into many decoupled P2Ps

@crusaderky
Copy link
Collaborator

crusaderky commented Nov 6, 2023

I have not given a proper look at the ERA5 code.

Some considerations though:

  1. ..., chunks=(1, 1)).rechunk(-1, 1) gets zero benefit from p2p rechunking. This is a clear example where an "auto" rechunk method should choose tasks.
  2. Within p2p rechunk, this is also a clear use case where there should be no barrier, as each output chunk needs a very limited subset of the inputs. Do we have a performance measure of the pathological use case ..., chunks=(-1, 1)).rechunk(1, -1) where having no barrier would generate n^2 edges? Are edges a problem in the scheduler, or is n^2 only a problem for tasks?
  3. Isn't this use case exactly what dask-expr promises to deliver automagically? or anything else that pushes the slicing upwards in the graph:
import dask
import dask.array as da

shape = (200, 200)
before = (1, ) + (2, ) * 99 + (1, )
before = (before, before)
after = 2

arr = da.random.random(shape, chunks=before)
for method in ["p2p", "tasks"]:
    dask.config.set({"array.rechunk.method": method})
    rechunked = arr.rechunk(after)

    coll = arr
    print(method, "original:", len(dask.optimize(coll)[0].__dask_graph__()))

    coll = rechunked
    print(method, "rechunk:", len(dask.optimize(coll)[0].__dask_graph__()))

    coll = rechunked[:, :4]
    print(method, "rechunk->slice:", len(dask.optimize(coll)[0].__dask_graph__()))

    coll = arr[:, :4].rechunk(after)
    print(method, "slice->rechunk:", len(dask.optimize(coll)[0].__dask_graph__()))
p2p original: 10201
p2p rechunk: 30403
p2p rechunk->slice: 20803
p2p slice->rechunk: 807
tasks original: 10201
tasks rechunk: 60197
tasks rechunk->slice: 1501
tasks slice->rechunk: 1602

@fjetter
Copy link
Member Author

fjetter commented Nov 7, 2023

gets zero benefit from p2p rechunking.

It depends. I expect P2P to be faster than the ordinary split tasks approach once we're through with optimizing. I agree that it is not as powerful as for more complex problems.

Besides, I'm displaying the simples example here. I frequently encounter examples that exhibit an all-to-all pattern for some of the dimensions of the array while there is just a simple concat/split for other dimensions. This is very common for xarray workloads and I believe this kind of split could help.

Within p2p rechunk, this is also a clear use case where there should be no barrier,

If you don't have a barrier but n**2 edges you'd have the same problem

Are edges a problem in the scheduler, or is n^2 only a problem for tasks?

I'm not sure how bad it is but yet but having to maintain more edges in the scheduler means larger sets on all tasks which requires more memory. It also makes the compute-task message heavier and most and foremost, every worker would have to connect to every other worker again to fetch the dummy result.
my gut feeling is that this is not necessarily a problem but it is not optimal and I don't see a reason to introduce this.

Isn't this use case exactly what dask-expr promises to deliver automagically? or anything else that pushes the slicing upwards in the graph:

This is actually a slightly different problem. While it could manage to do the slicing, everything else wouldn't work (e.g. partial release of data stored to disk / starting reducing tasks earlier) Besides, we didn't even start with dask-expr for arrays yet and I suspect this will still be a a little while until we get there.

@crusaderky
Copy link
Collaborator

crusaderky commented Nov 7, 2023

original = (1, 50, 1)
new = (5, 10, -1)

which is...

  • a simple concat on the first dimension
  • a split in the second dimension
  • a full shuffle on the third

but I still would expect this to break apart into many decoupled P2Ps

I think this is a really good use case as it forces us to reason in generalized terms.

@dcherian
Copy link

dcherian commented Nov 7, 2023

I frequently encounter examples that exhibit an all-to-all pattern for some of the dimensions of the array while there is just a simple concat/split for other dimensions.

Just chiming in to support this. In Earth Science workloads, we basically have two chunking schemes (ignoring variables with a vertical dimension)

  1. small chunks along time + large chunks in (lat, lon) space.
  2. large chunks in time + small chunks in (lat, lon) space.

And it is very common for users to want to rechunk from one to the other, as in this workload.

The ERA5 example simultaneously also splits another dimension but this boils

I added this simply to get "reasonable chunk sizes", I would expect it to be a very common approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants