-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serverless parallelization of reference generation #123
Comments
I'm reading about how reduce jobs work in lithops but I don't fully get it. Is it the case that we need each serverless worker to write out the references it read to some globally-accessible storage? So that they can all be accessed by a single node/worker for the concatenation step? |
Yes though you can also tree-reduce. IIUC serverless tasks don't communicate between each other, and must communicate through storage. Though perhaps lithops has something like https://modal.com/docs/guide/dicts-and-queues to enable cross-task communication. |
I haven't used Lithops |
It may be possible to actually use cubed to do the serverless concatenation. Cubed implements serverless concatenation operations for numpy arrays, which work like a tree reduction, concatenating subsets and saving each round to an intermediate Zarr store. The difficulty here is that ManifestArrays are a weird type of array, that is backed underneath by 3 numpy arrays. Cubed would know how to concatenate any one of these arrays but not what to do with all three at once. In particular if I understand correctly the problem is that it wouldn't know how to serialize the intermediate ManifestArrays to Zarr. One solution might be to repurpose internal machinery cubed has which uses Zarr structured arrays to effectively operate on multiple numpy arrays at once. Cubed has this because reductions such as mean require keeping track not just of the totals but also the counts. @tomwhite suggested that I could possibly plug a ManifestArray into Cubed in the same way that Tensorstore can be plugged in as an alternative to zarr-python, following this TensorStoreGroup object https://github.com/cubed-dev/cubed/blob/main/cubed%2Fstorage%2Fbackends%2Ftensorstore.py#L48 But I have to admit I don't fully understand this suggestion yet, I need to play around with it. |
Finding byte ranges in every file in an archival dataset is an embarrassingly-parallel problem, which might be a good fit for serverless.
This step is analogous to the
parallel=True
option toxr.open_mfdataset
, which wrapsxr.open_dataset
in@dask.delayed
for each file to parallelize the opening step (totally separate from any dask.array tree reduction after the arrays have been created).https://github.com/pydata/xarray/blob/12123be8608f3a90c6a6d8a1cdc4845126492364/xarray/backends/api.py#L1046
With that motivation, it's been suggested that we add a delayed-like function to Cubed cubed-dev/cubed#311, which could in theory plug in to
xarray.open_mfdataset
.A simpler way to test this idea would be just to skip the cubed layer and use a lithops map-gather (or whatever the correct primitive is) to try out serverless generation of references. I think for this to work the resulting virtual datasets need to be small enough to be able to be gathered onto one node (thus avoiding a tree-reduce), but the discussion in #104 indicates that this should be okay (at least after #107 is complete).
xref #95
cc @tomwhite @rabernat
The text was updated successfully, but these errors were encountered: