Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v3] SyncError: Calling sync() from within a running loop #2133

Closed
jhamman opened this issue Aug 28, 2024 · 4 comments · Fixed by #2137
Closed

[v3] SyncError: Calling sync() from within a running loop #2133

jhamman opened this issue Aug 28, 2024 · 4 comments · Fixed by #2137
Labels
bug Potential issues with the zarr-python library
Milestone

Comments

@jhamman
Copy link
Member

jhamman commented Aug 28, 2024

Zarr version

'3.0.0a2.dev5+g8446607e.d20240827'

Numcodecs version

0.13.0

Python Version

3.11

Operating System

Mac

Installation

pip

Description

Indexing a zarr array with another zarr array causes our sync method to be called inside another sync call.

Steps to reproduce

a = np.arange(10)
za = zarr.array(a, chunks=2)
ix = [False, True, False, True, False, True, False, True, False, True]

zix = zarr.array(ix, chunks=2)
za = zarr.array(a, chunks=2)
za.oindex[zix]

Produces this traceback 👇

---------------------------------------------------------------------------
SyncError                                 Traceback (most recent call last)
Cell In[102], line 7
      5 zix = zarr.array(ix, chunks=2)
      6 za = zarr.array(a, chunks=2)
----> 7 za.oindex[zix]  # will not load all zix into memory

File ~/Library/CloudStorage/Dropbox/src/zarr-python/src/zarr/core/indexing.py:845, in OIndex.__getitem__(self, selection)
    843 new_selection = ensure_tuple(new_selection)
    844 new_selection = replace_lists(new_selection)
--> 845 return self.array.get_orthogonal_selection(
    846     cast(OrthogonalSelection, new_selection), fields=fields
    847 )

File ~/Library/CloudStorage/Dropbox/src/zarr-python/src/zarr/core/array.py:1338, in Array.get_orthogonal_selection(self, selection, out, fields, prototype)
   1336     prototype = default_buffer_prototype()
   1337 indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
-> 1338 return sync(
   1339     self._async_array._get_selection(
   1340         indexer=indexer, out=out, fields=fields, prototype=prototype
   1341     )
   1342 )

File ~/Library/CloudStorage/Dropbox/src/zarr-python/src/zarr/core/sync.py:91, in sync(coro, loop, timeout)
     88 return_result = next(iter(finished)).result()
     90 if isinstance(return_result, BaseException):
---> 91     raise return_result
     92 else:
     93     return return_result

File ~/Library/CloudStorage/Dropbox/src/zarr-python/src/zarr/core/sync.py:50, in _runner(coro)
     45 """
     46 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
     47 exception, the exception will be returned.
     48 """
     49 try:
---> 50     return await coro
     51 except Exception as ex:
     52     return ex

File ~/Library/CloudStorage/Dropbox/src/zarr-python/src/zarr/core/array.py:467, in AsyncArray._get_selection(self, indexer, prototype, out, fields)
    458     out_buffer = prototype.nd_buffer.create(
    459         shape=indexer.shape,
    460         dtype=out_dtype,
    461         order=self.order,
    462         fill_value=self.metadata.fill_value,
    463     )
    464 if product(indexer.shape) > 0:
    465     # reading chunks and decoding them
    466     await self.codec_pipeline.read(
--> 467         [
    468             (
    469                 self.store_path / self.metadata.encode_chunk_key(chunk_coords),
    470                 self.metadata.get_chunk_spec(chunk_coords, self.order, prototype=prototype),
    471                 chunk_selection,
    472                 out_selection,
    473             )
    474             for chunk_coords, chunk_selection, out_selection in indexer
    475         ],
    476         out_buffer,
    477         drop_axes=indexer.drop_axes,
    478     )
    479 return out_buffer.as_ndarray_like()

File ~/Library/CloudStorage/Dropbox/src/zarr-python/src/zarr/core/array.py:467, in <listcomp>(.0)
    458     out_buffer = prototype.nd_buffer.create(
    459         shape=indexer.shape,
    460         dtype=out_dtype,
    461         order=self.order,
    462         fill_value=self.metadata.fill_value,
    463     )
    464 if product(indexer.shape) > 0:
    465     # reading chunks and decoding them
    466     await self.codec_pipeline.read(
--> 467         [
    468             (
    469                 self.store_path / self.metadata.encode_chunk_key(chunk_coords),
    470                 self.metadata.get_chunk_spec(chunk_coords, self.order, prototype=prototype),
    471                 chunk_selection,
    472                 out_selection,
    473             )
    474             for chunk_coords, chunk_selection, out_selection in indexer
    475         ],
    476         out_buffer,
    477         drop_axes=indexer.drop_axes,
    478     )
    479 return out_buffer.as_ndarray_like()

File ~/Library/CloudStorage/Dropbox/src/zarr-python/src/zarr/core/indexing.py:813, in OrthogonalIndexer.__iter__(self)
    812 def __iter__(self) -> Iterator[ChunkProjection]:
--> 813     for dim_projections in itertools.product(*self.dim_indexers):
    814         chunk_coords = tuple(p.dim_chunk_ix for p in dim_projections)
    815         chunk_selection: tuple[Selector, ...] | npt.NDArray[Any] = tuple(
    816             p.dim_chunk_sel for p in dim_projections
    817         )

File ~/Library/CloudStorage/Dropbox/src/zarr-python/src/zarr/core/indexing.py:542, in BoolArrayDimIndexer.__iter__(self)
    539 for dim_chunk_ix in self.dim_chunk_ixs:
    540     # find region in chunk
    541     dim_offset = dim_chunk_ix * self.dim_chunk_len
--> 542     dim_chunk_sel = self.dim_sel[dim_offset : dim_offset + self.dim_chunk_len]
    544     # pad out if final chunk
    545     if dim_chunk_sel.shape[0] < self.dim_chunk_len:

File ~/Library/CloudStorage/Dropbox/src/zarr-python/src/zarr/core/array.py:899, in Array.__getitem__(self, selection)
    897     return self.vindex[cast(CoordinateSelection | MaskSelection, selection)]
    898 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
--> 899     return self.get_orthogonal_selection(pure_selection, fields=fields)
    900 else:
    901     return self.get_basic_selection(cast(BasicSelection, pure_selection), fields=fields)

File ~/Library/CloudStorage/Dropbox/src/zarr-python/src/zarr/core/array.py:1338, in Array.get_orthogonal_selection(self, selection, out, fields, prototype)
   1336     prototype = default_buffer_prototype()
   1337 indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
-> 1338 return sync(
   1339     self._async_array._get_selection(
   1340         indexer=indexer, out=out, fields=fields, prototype=prototype
   1341     )
   1342 )

File ~/Library/CloudStorage/Dropbox/src/zarr-python/src/zarr/core/sync.py:78, in sync(coro, loop, timeout)
     76     loop0 = asyncio.events.get_running_loop()
     77     if loop0 is loop:
---> 78         raise SyncError("Calling sync() from within a running loop")
     79 except RuntimeError:
     80     pass

SyncError: Calling sync() from within a running loop

Additional output

No response

@jhamman jhamman added the bug Potential issues with the zarr-python library label Aug 28, 2024
@jhamman
Copy link
Member Author

jhamman commented Aug 28, 2024

@normanrz - curious if you have ideas for how to address this?

@jhamman jhamman added this to the 3.0.0 milestone Aug 28, 2024
@normanrz
Copy link
Member

Seems like a tricky problem. We could solve this for specific use cases. For example, in the array.oindex[sel] case, we could turn the selector into a numpy array first before handing it over to the sync-ed selection.
Not sure if there is a general solution.

cc @martindurant

@martindurant
Copy link
Member

asyncio is not re-entrant, but nest-asyncio does exist (if "archived"). It is not a solution for us, because it would patch system libraries that users might be depending on.

Conversely, the dask way to deal with this is for a (non-async) function to detect if it is being called within an active event loop or not, so that when it calls async code, it can either pass back the coroutine to the caller, or sync() it.

The fsspec way to deal with this, is to have a separate sync API: every async method has another function which calls sync() on it, so that the calling code must decide which variant to call. This is how you might end up with a sync-minded fs instance and an async one, which are otherwise identical (they used to have different init behaviour, but not any more).

@martindurant
Copy link
Member

A simplified version of what dask does is in streamz. I haven't actually checked that dask still works this way, now that I think about it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Potential issues with the zarr-python library
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

3 participants