Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dask.array writing with distributed / multiprocessing schedulers #185

Open
rabernat opened this issue Oct 11, 2024 · 3 comments
Open

dask.array writing with distributed / multiprocessing schedulers #185

rabernat opened this issue Oct 11, 2024 · 3 comments

Comments

@rabernat
Copy link
Contributor

Icechunk does support distributed writing of Arrays. However, currently Icechunk does not allow writing arrays via dask.array.store with distributed or multiprocessing schedulers.

This is because Icechunk must gather the results of each distributed write operation (metadata about the chunks that were written) back to the client in order to commit the transaction. Without this step, each worker's writes would be lost.

How it works today with regular Zarr

sequenceDiagram
    Client->>Worker0: Compute Chunk 0
    Client->>Worker1: Compute Chunk 1
    Client->>Worker2: Compute Chunk 2
    Worker0->>Object Store: Store Chunk 0
    Worker1->>Object Store: Store Chunk 1
    Worker2->>Object Store: Store Chunk 2
Loading

Once each worker has written their piece of data to storage, it's done.

Instead with Icechunk, it needs to look like this

sequenceDiagram
    Client->>Worker0: Compute Chunk 0
    Client->>Worker1: Compute Chunk 1
    Client->>Worker2: Compute Chunk 2
    Worker0->>Object Store: Store Chunk QYNXD
    Worker0->>Client:  Chunk 0 stored at QYNXD
    Worker1->>Object Store: Store Chunk PQM3C
    Worker1->>Client:  Chunk 1 stored at PQM3C
    Worker2->>Object Store: Store Chunk BUZXP
    Worker2->>Client:  Chunk 2 stored at BUZXP
    Note right of Client: Commit Snapshot
    Client->>Object Store: Write manifest
Loading

The dask.array.store code does not allow for returning metadata back to from each write process:

https://github.com/dask/dask/blob/20eeeda610260287a0dd50e4dd7b6a3cd8e007f3/dask/array/core.py#L1067

To overcome this, we need to either:

  • Update dask.array to accommodate our scenario
  • Write a custom dask function which generates the appropriate graph for an icechunk distributed write

This does work with the dask threaded scheduler because the same icechunk store can be shared in memory between threads.

There are some parallels here to what is required to allow Dask to write iceberg tables (apache/iceberg#5800).

@dcherian
Copy link
Contributor

I brought up the fact that to_zarr with a distributed Client is almost always a major footgun (#383) at today's Xarray meeting.

@shoyer suggested marking the store as read-only during unpickling to loudly fail in this scenario. That would allow thread-only parallelism and fail for anything else.

I like this idea, with one modification. We can have the user explicitly opt-in to receiving a writeable store after pickling.

with store.enable_distributed_writes():
    do_smart_things

That way they know they need to be careful. In our own to_icechunk, we can opt-in for the user.

The following will still be a footgun

with store.enable_distributed_writes():
	ds.to_zarr(store, ...)

but we can have the error message suggest using to_icechunk instead of to_zarr as a solution.

Thoughts?

@rabernat
Copy link
Contributor Author

I think this is a very reasonable approach Deepak. 👍 to what you have proposed above.

dcherian added a commit that referenced this issue Nov 21, 2024
dcherian added a commit that referenced this issue Nov 21, 2024
dcherian added a commit that referenced this issue Nov 22, 2024
* Set store to read only after unpickling

Closes #383
xref #185

* tpying
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants