Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chunks get combined in 4d array reshape #5544

Closed
rabernat opened this issue Oct 30, 2019 · 23 comments · Fixed by #11273
Closed

chunks get combined in 4d array reshape #5544

rabernat opened this issue Oct 30, 2019 · 23 comments · Fixed by #11273
Labels

Comments

@rabernat
Copy link
Contributor

I want to reshape a 4D dask array in such a way that I expect should preserve the original chunk structure. I am finding that reshape is instead rechunking my array in a non-optimal way.

Things work as expected in 3D:

import dask.array as dsa
data = dsa.ones((20, 20, 5), chunks=(10, 10, 5))
display(data)
dsa.reshape(data, (400, 5))

image

The chunks simply get stacked into a single column. Examining the graph, however, reveals there is an intermediate merge step:

image

If I add another dimension at the beginning, things don't look as nice:

data = dsa.ones((2, 20, 20, 5), chunks=(1, 10, 10, 5))
display(data)
dsa.reshape(data, (800, 5))

image

Rather than seeing a neat stack of 8 chunks as I expected, the chunks have been fused.

image

This causes big problems for me when I am trying to do some processing of very large arrays. I can't afford to have the chunks fused, or else I will run out of memory.

Dask version 2.6.0+10.g8179f7f3

@TomAugspurger
Copy link
Member

I think your intuition that the intermediate rechunk-merge and split / reshape can be avoided is correct. The trick is probably to figure out when the output chunks align with the input chunks.

If no one beats me to it, I can take a look next week.

@mrocklin
Copy link
Member

I'm curious, if the order of the dimensions is flipped then does this problem persist? I.e. (5, 20, 20) -> (5, 400)?

@mrocklin
Copy link
Member

My guess is that the implicit C-ordering of data might make the intermediate reshaping actually necessary in some cases, even if we don't care in this case.

@rabernat
Copy link
Contributor Author

I'm curious, if the order of the dimensions is flipped then does this problem persist? I.e. (5, 20, 20) -> (5, 400)?

It's the same. Same number of tasks and graph structure.

@nbren12
Copy link
Contributor

nbren12 commented Mar 3, 2020

I have a similar problem, but perhaps even simpler. I just want to reshape an array with chunk sizes of 1 along one dimension. I would like the reshaped array to have chunksize (1,1).

In [31]: a = da.ones((10), chunks=1)                                                                                                                                        

In [32]: a                                                                                                                                                                  
Out[32]: dask.array<ones, shape=(10,), dtype=float64, chunksize=(1,), chunktype=numpy.ndarray>

In [33]: a.reshape((2, 5))                                                                                                                                                  
Out[33]: dask.array<reshape, shape=(2, 5), dtype=float64, chunksize=(1, 5), chunktype=numpy.ndarray>

This is important to me because I want xarray unstack to preserve chunking in the trivial case where the chunk size is 1.

@mrocklin
Copy link
Member

mrocklin commented Mar 3, 2020

cc @TomAugspurger in case he's interested in this from the Anaconda/Pangeo angle

@TomAugspurger
Copy link
Member

@nbren12 to confirm, your ideal chunking in the output of a.reshape((2, 5)) is ((1, 1), (1, 1, 1, 1, 1)). Essentially, a.reshape((2, 5)).rechunk((1, 1)), without the intermediate merged ``(1, 1), (5,))` chunks? One concern is that this will lead to larger task graphs since you'll have more chunks and tasks. Since not-merging isn't obviously better, we'd need a parameter or config setting to support this.

@nbren12
Copy link
Contributor

nbren12 commented Mar 3, 2020

@TomAugspurger That is correct. I have been running out of memory due to having too big chunks, and was trying to very carefully ensure that "chunks in= chunks_out".

@nbren12
Copy link
Contributor

nbren12 commented Mar 3, 2020

Just to provide some context, my workflow involves open a list of urls as dask delayed objects which are wrapped with xarray. The url string is parsed into dimension information, which I can represent as a stacked "xarray" dimension. I want to ensure that concatenated_url_datasets.unstack('urls') does not alter the chunking strategy.

@chrisroat
Copy link
Contributor

I just ran into this, as well, using chunksize=1 dimensions and seeing my graphs get complex. In the case of chunksize=1 dimensions, there is the possibility of a shortcut implementation. It seems pretty thorny to optimally get this optimal for all cases.

A less trivial case could be demonstrated with:

arr = da.from_array(np.arange(16).reshape((4,4)), chunks=((1,3),(2,2)))
00 01|02 03
-----+-----
04 05|06 07
08 09|10 11
12 13|14 16

In this case, the individual chunks are not contiguous and cannot be simply reshaped and stuck end-to-end.

That said, the above case gets rechunked in arr.reshape(16) as:

00 01 02 03|04 05 06 07|08 09 10 11 12 13 14 15

which I find odd.

Here's an example of a fastpath implementation for chunksize=1:
https://github.com/deisseroth-lab/dask/commit/39d56b91a016edbcbf01b281da2ba433915ee23a

@TomAugspurger
Copy link
Member

TomAugspurger commented Sep 2, 2020

@nbren12 I think your use-case would be solved by something like #6272. I'll see if I can turn that into a PR today.

In [2]: a = da.ones(10, chunks=1)

In [3]: o = a.reshape(2, 5, inchunks=a.chunks, outchunks=((1, 1), (1, 1, 1, 1, 1)))

mydask


@chrisroat what's your desired output chunking for arr.reshape(16)? Maybe something like

00 01 | 02 03 | 04 05 | 06 07 | 08 09 | 10 11 | 12 13 | 14 15

That would be a "zero-communication" reshape / rechunk IIUC.

@TomAugspurger
Copy link
Member

@chrisroat I think this is what you would need for your problem

inchunks = ((1, 1, 1, 1), (2, 2))
outchunks = (2,) * 8,
out = arr.reshape(16, inchunks=inchunks, outchunks=outchunks)
out.visualize()

mydask

Figuring out the right inchunks / outchunks isn't the easiest, but I'm not sure how much we can simplify the API while giving total control.

@chrisroat
Copy link
Contributor

I agree that that looks pretty good. It seems the method you use here is to find the smallest chunk along a dimension and rechunk that dimension to that chunksize.

@TomAugspurger
Copy link
Member

It seems the method you use here is to find the smallest chunk along a dimension and rechunk that dimension to that chunksize.

I think that's right, if you want to minimize communication between chunks. It does result in more chunks / tasks, but that might be a good tradeoff in certain cases.

@dcherian
Copy link
Contributor

dcherian commented Sep 3, 2020

the method you use here is to find the smallest chunk along a dimension and rechunk that dimension to that chunksize.

Could this algorithm be implemented and controlled using a kwarg like arr.reshape(16, strategy="preserve-chunks")? It would be friendlier and a lot easier than asking the user to specify inchunks and outchunks. The existing behaviour could be strategy="minimize-tasks"

@TomAugspurger
Copy link
Member

TomAugspurger commented Sep 3, 2020

I was considering that. My concerns were

  1. how do we discover the right chunks? This is probably solvable, but could be a bit tricky to write.
  2. Would this be flexible enough? Perhaps there are just two strategies, one that preserves chunk sizes and one that doesn't ever merge chunks.

So I think that if we think people won't really need complete flexibility in the chunk structure then the strategy keyword would be best.

@dcherian
Copy link
Contributor

dcherian commented Sep 3, 2020

If 1 can be solved then I think the following code isn't too bad. It allows easy opt-in to one of two strategies (default vs "preserve-chunks") and is flexible enough to allow motivated users to have full control (this might even lead to discovery of new strategies?)

def reshape(..., inchunks=None, outchunks=None, strategy=None):
    if strategy and any([inchunks, outchunks]):
        raise ValueError
        
    if strategy is not None:
        inchunks, outchunks = get_chunks_from_strategy(strategy)
        
    ...

@peterroelants
Copy link

I'm running into the same issue. Being able to preserve the original chunk size as proposed by @dcherian would solve my problem.

After reading the discussion above I was wondering about the following: Is there a difference in performance when rechunking small blocks by combining them, and rechunking large blocks by splitting them? In the first case the strategy of preserving the chunks might be more flexible then you think because you would still be able to rechunk after the reshape without much overhead.

@TomAugspurger
Copy link
Member

I was looking into this again last week, to implement the strategy keyword. I'm hoping to have something soon.

I don't think that "just maintain the original chunksize" will work. There are cases like #5544 (comment) where we need to rechunk the input to smaller chunks.

After reading the discussion above I was wondering about the following: Is there a difference in performance when rechunking small blocks by combining them, and rechunking large blocks by splitting them?

I'm not sure offhand, but all else equal the more tasks you have, the slower it'll be. In this case we're trading (some) additional tasks for less communication in the hope that it's faster. But we'll want to avoid doing unnecessary rechunking.

At a minimum though, I think we'll have a requirement that the fastest-changing dimension (the last with order="C") have the largest input chunks, and the slowest-changing dimensions (the first) have the smallest.

@TomAugspurger
Copy link
Member

I've identified one special case: When reshaping from a larger to smaller number of dimensions (e.g. 3d -> 2d), we're able to avoid rechunking entirely *when the "early" axes all have size-1 chunks. For example

Case 1a: (2, 3, 4) -> (6, 4)

* inchunks=((1, 1), (1, 2, 1, 2), (2, 2))   # self.inchunks
* outchunks=((1, 2, 1, 2), (2, 2))

Case 1a: (2, 3, 4) -> (6, 4)

* inchunks=((1, 1), (1, 2), (2, 2))   # self.inchunks
* outchunks=((1, 2, 1, 2), (2, 2))

00 01 | 02 03
----- | -----
04 05 | 06 07
08 09 | 10 11

=============

12 13 | 14 15
----- | -----
16 17 | 18 19
20 21 | 22 23

-> (3, 4)

00 01 | 02 03
----- | -----
04 05 | 06 07
08 09 | 10 11
----- | -----
12 13 | 14 15
----- | -----
16 17 | 18 19
20 21 | 22 23

Because of the "all low axes have chunksize 1" property, we avoid needing to rechunk the input and we're merely moving blocks around. So I think that for this special case, not rechunking is the strictly superior strategy.


For cases like @chrisroat's in #5544 (comment), we need to rechunk the inputs (since the "early" axes aren't all chunksize 1) and so the strategy avoiding rechunk-merge isn't necessarily better. I think it'll depend on the overhead of scheduling additional tasks, the cost of moving data around, maximum memory usage, ...

TomAugspurger added a commit to TomAugspurger/dask that referenced this issue Oct 19, 2020
When the slow-moving (early) axes in `.reshape` are all size 1, then we
can avoid an intermediate rechunk which could cause memory issues.

```
00 01 | 02 03   # a[0, :, :]
----- | -----
04 05 | 06 07
08 09 | 10 11

=============

12 13 | 14 15   # a[1, :, :]
----- | -----
16 17 | 18 19
20 21 | 22 23

-> (3, 4)

00 01 | 02 03
----- | -----
04 05 | 06 07
08 09 | 10 11
----- | -----
12 13 | 14 15
----- | -----
16 17 | 18 19
20 21 | 22 23
```

xref dask#5544, specifically the examples
given in dask#5544 (comment).
TomAugspurger added a commit to TomAugspurger/dask that referenced this issue Oct 19, 2020
When the slow-moving (early) axes in `.reshape` are all size 1, then we
can avoid an intermediate rechunk which could cause memory issues.

```
00 01 | 02 03   # a[0, :, :]
----- | -----
04 05 | 06 07
08 09 | 10 11

=============

12 13 | 14 15   # a[1, :, :]
----- | -----
16 17 | 18 19
20 21 | 22 23

-> (3, 4)

00 01 | 02 03
----- | -----
04 05 | 06 07
08 09 | 10 11
----- | -----
12 13 | 14 15
----- | -----
16 17 | 18 19
20 21 | 22 23
```

xref dask#5544, specifically the examples
given in dask#5544 (comment).
@peterroelants
Copy link

I've identified one special case: When reshaping from a larger to smaller number of dimensions (e.g. 3d -> 2d), we're able to avoid rechunking entirely *when the "early" axes all have size-1 chunks.

That's great. This seems to align with my use case.

@TomAugspurger
Copy link
Member

TomAugspurger commented Oct 19, 2020

Good to hear.

Unless I'm mistaken, then #5544 (comment) indicates the kind of API / implementation we'd need to solve @chrisroat's problem in #5544 (comment). To do a "zero-communication" / "no-merge" reshape, we need all the early axes to have a chunksize of 1. So that's necessary and sufficient for this optimization to kick in. That also covers @rabernat's original use-case.

So if I'm right that having a chunksize of 1 is necessary, then we just need a keyword in reshape saying "I want to rechunk to a chunksize of 1 for all my dimensions being collapsed". That would make the following two equivalent (with #6748, which disables merging for this special case).

data = da.ones((20, 20, 5), chunks=(10, 10, 5))
data.rechunk(400, 5), strategy="minimize_memory")  # equivalent to `data.rechunk({0: 1}).rechunk(400, 5)`

Where strategy is something like strategy="minimize_tasks" (the default) vs. "minimize_memory" (rechunk to chunksize=1) or "minimize_communication".

Or we could have a boolean flag like merge_chunks=True / False. I think I like merge_chunks, but we might not be able to completely disable merging chunks (I haven't looked at the reshape from lower to higher dim case yet).

jsignell pushed a commit that referenced this issue Jan 14, 2021
* Avoid rechunking in reshape with chunksize=1

When the slow-moving (early) axes in `.reshape` are all size 1, then we
can avoid an intermediate rechunk which could cause memory issues.

```
00 01 | 02 03   # a[0, :, :]
----- | -----
04 05 | 06 07
08 09 | 10 11

=============

12 13 | 14 15   # a[1, :, :]
----- | -----
16 17 | 18 19
20 21 | 22 23

-> (3, 4)

00 01 | 02 03
----- | -----
04 05 | 06 07
08 09 | 10 11
----- | -----
12 13 | 14 15
----- | -----
16 17 | 18 19
20 21 | 22 23
```

xref #5544, specifically the examples
given in #5544 (comment).

* fix conditioni

* remove breakpoint comment

* API: Added merge_chunks to reshape

Adds a keyword to reshape to control merge / rechunking. See the
documentation for an explanation.

* update images
abduhbm pushed a commit to abduhbm/dask that referenced this issue Jan 19, 2021
* Avoid rechunking in reshape with chunksize=1

When the slow-moving (early) axes in `.reshape` are all size 1, then we
can avoid an intermediate rechunk which could cause memory issues.

```
00 01 | 02 03   # a[0, :, :]
----- | -----
04 05 | 06 07
08 09 | 10 11

=============

12 13 | 14 15   # a[1, :, :]
----- | -----
16 17 | 18 19
20 21 | 22 23

-> (3, 4)

00 01 | 02 03
----- | -----
04 05 | 06 07
08 09 | 10 11
----- | -----
12 13 | 14 15
----- | -----
16 17 | 18 19
20 21 | 22 23
```

xref dask#5544, specifically the examples
given in dask#5544 (comment).

* fix conditioni

* remove breakpoint comment

* API: Added merge_chunks to reshape

Adds a keyword to reshape to control merge / rechunking. See the
documentation for an explanation.

* update images
@rabernat
Copy link
Contributor Author

Love it when a 5-year-old issue gets closed! 💪

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants