Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating cubed arrays from lazy xarray data #197

Closed
TomNicholas opened this issue Jun 1, 2023 · 9 comments
Closed

Creating cubed arrays from lazy xarray data #197

TomNicholas opened this issue Jun 1, 2023 · 9 comments

Comments

@TomNicholas
Copy link
Member

To subset my data whilst getting around #196 I tried slicing using xarray's lazy indexing machinery before converting to cubed arrays using .chunk (a trick which when used with dask avoids creating a bunch of open-dataset tasks for chunks that will never be used).

However this causes cubed to throw an error saying that projected blockwise mem exceeds max mem, which I don't understand - I literally haven't asked cubed to do any operations other than creating the array yet.

Screenshot from 2023-06-01 15-09-07

ValueError: Projected blockwise memory (86732800008) exceeds allowed_mem (10000000000), including reserved_mem (0)

The size of the projected memory usage also scales with the size of the subset I select.

@TomNicholas
Copy link
Member Author

Actually, maybe that was my fault for putting chunks={}, which didn't read the on-disk chunking as I hoped, instead just returning the whole variable as one big chunk.

Interestingly if I try chunks={'time': 1, 'values': -1} I get a Zarr error from inside the rechunk function

...

File ~/Documents/Work/Code/cubed/cubed/primitive/rechunk.py:35, in rechunk(source, target_chunks, allowed_mem, reserved_mem, target_store, temp_store)
     31 # rechunker doesn't take account of uncompressed and compressed copies of the
     32 # input and output array chunk/selection, so adjust appropriately
     33 rechunker_max_mem = (allowed_mem - reserved_mem) / 4
---> 35 copy_specs, intermediate, target = _setup_rechunk(
     36     source=source,
     37     target_chunks=target_chunks,
     38     max_mem=rechunker_max_mem,
     39     target_store=target_store,
     40     temp_store=temp_store,
     41 )
     43 # source is a Zarr array, so only a single copy spec
     44 if len(copy_specs) != 1:  # pragma: no cover

File ~/Documents/Work/Code/cubed/cubed/vendor/rechunker/api.py:250, in _setup_rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options)
    247 # elif isinstance(source, (zarr.core.Array, dask.array.Array)):
    248 elif isinstance(source, zarr.core.Array):
--> 250     copy_spec = _setup_array_rechunk(
    251         source,
    252         target_chunks,
    253         max_mem,
    254         target_store,
    255         target_options=target_options,
    256         temp_store_or_group=temp_store,
    257         temp_options=temp_options,
    258     )
    259     intermediate = copy_spec.intermediate.array
    260     target = copy_spec.write.array

File ~/Documents/Work/Code/cubed/cubed/vendor/rechunker/api.py:296, in _setup_array_rechunk(source_array, target_chunks, max_mem, target_store_or_group, target_options, temp_store_or_group, temp_options, name)
    293     target_chunks = source_chunks
    295 if isinstance(target_chunks, dict):
--> 296     array_dims = _get_dims_from_zarr_array(source_array)
    297     try:
    298         target_chunks = _shape_dict_to_tuple(array_dims, target_chunks)

File ~/Documents/Work/Code/cubed/cubed/vendor/rechunker/api.py:21, in _get_dims_from_zarr_array(z_array)
     18 def _get_dims_from_zarr_array(z_array):
     19     # use Xarray convention
     20     # http://xarray.pydata.org/en/stable/internals.html#zarr-encoding-specification
---> 21     return z_array.attrs["_ARRAY_DIMENSIONS"]

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/zarr/attrs.py:74, in Attributes.__getitem__(self, item)
     73 def __getitem__(self, item):
---> 74     return self.asdict()[item]

KeyError: '_ARRAY_DIMENSIONS'

@TomNicholas
Copy link
Member Author

TomNicholas commented Jun 1, 2023

Finally if I try specifying chunks explicitly e.g. chunks={'time': 1} then the indexing works.

However when I actually call compute then a possibly-related ModuleNotFoundError: No module named 'xarray' is raised by lithops, but the stack trace is not any more informative. I tried adding xarray to the lithops runtime requirements.txt but no change.

@TomNicholas
Copy link
Member Author

Here's the full notebook, showing what I was trying to do (benchmark cubed on a similar "quadratic means" problem but with real data living in GCS).

@tomwhite
Copy link
Member

tomwhite commented Jun 2, 2023

I tried adding xarray to the lithops runtime requirements.txt but no change.

I did this then ran

lithops runtime build -f requirements.txt cubed-runtime -b gcp_functions
lithops runtime deploy -b gcp_functions --memory 2048 cubed-runtime

The second line is normally optional since Lithops will auto deploy the first, but I think you may need to run it to force a new deployment. (In the past I have used lithops runtime delete -b gcp_functions cubed-runtime and even lithops clean -b gcp_functions to start again, but I didn't need that this time.)

With that I managed to get the notebook to run.

It has an interesting timeline though:

1685703178_timeline

I think this is due to the large number of rounds in the reduce. This could possibly be improved by trying a larger allowed_mem (since it can then do more aggregations in one go), but there are probably other optimizations that could be done too.

@TomNicholas
Copy link
Member Author

I did this then ran

That's weird - I swear @cisaacstern and I tried exactly the same thing - we definitely did call deploy 😅 I'll try again but double-check all the commands, and explicitly clean first.

My requirements.txt looked like this:

cubed
lithops[gcf,gcp]
gcsfs
tqdm
xarray == 2023.5.0
cubed-xarray

It has an interesting timeline though:

I think this is due to the large number of rounds in the reduce. This could possibly be improved by trying a larger allowed_mem (since it can then do more aggregations in one go), but there are probably other optimizations that could be done too.

Huh. I'm very pleased that worked at all (real data in a huge store!) but wondering if it's not going to test / demonstrate the kind of scaling that I wanted to try out. I was looking for a problem that would show linear-like weak scaling on a larger and larger dataset, whereas it seems like this would deviate from linear scaling due to the number of aggregation steps.

@cisaacstern
Copy link

Thanks @tomwhite! @TomNicholas, yes feels somehow like a cached/old version of the runtime was being used, which perhaps can be solved by the delete/clean steps.

@tomwhite
Copy link
Member

tomwhite commented Jun 5, 2023

I think this is due to the large number of rounds in the reduce. This could possibly be improved by trying a larger allowed_mem (since it can then do more aggregations in one go)

I tried running this again with allowed_mem="2GB", but the computation still took about the same time. The last few rounds were smaller, but slower (see below). I need to look into what is happening in more detail, but I think there is the opportunity for more optimisation here.

1685966338_timeline

@tomwhite
Copy link
Member

tomwhite commented Jun 5, 2023

I changed the mean to a sum and the 21 min runtime went down to 7 min. I think this shows that the overhead of using structured arrays is significant, and it would be worth implementing #69.

@tomwhite
Copy link
Member

Closing as this is fixed now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants