Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TypeError: 'Blosc' object is not iterable when reading v2 array with Blosc codec #2243

Closed
TomAugspurger opened this issue Sep 25, 2024 · 1 comment · Fixed by #2244
Closed
Labels
bug Potential issues with the zarr-python library

Comments

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Sep 25, 2024

Zarr version

v3

Numcodecs version

na

Python Version

na

Operating System

na

Installation

na

Description

We get some wires crossed between the ArrayV2Metadata.compressor and V2Compressor.compressor.

  • ArrayV2Metadata.compressor has a type of numcodecs.abc.Codec | None
  • V2Compressor.compressor has a typeof dict[str, JSON] | None.

We use it

[V2Filters(metadata.filters), V2Compressor(metadata.compressor)]
, but don't cast between the serialized dict and concrete class in between. I'm not sure which is preferred at this point, but naively maybe all the serialization is done at the boundary and we update V2Compressor to use the concrete class?

numcodecs.abc.Codec is an Any type unfortunately, which is why mypy missed this.

Steps to reproduce

import asyncio
import json

import zarr.core.buffer.cpu
import zarr.store


async def main():
    zarray = {
        "chunks": [1032],
        "compressor": {"blocksize": 0, "clevel": 5, "cname": "lz4", "id": "blosc", "shuffle": 1},
        "dtype": "<i8",
        "fill_value": 0,
        "filters": None,
        "order": "C",
        "shape": [1032],
        "zarr_format": 2,
    }
    zattrs = {}
    store_dict = {}
    store = zarr.store.MemoryStore(store_dict=store_dict, mode="w")
    await store.set(".zarray", zarr.core.buffer.cpu.Buffer.from_bytes(json.dumps(zarray).encode()))
    await store.set(".zattrs", zarr.core.buffer.cpu.Buffer.from_bytes(json.dumps(zattrs).encode()))
    await store.set(
        "0", zarr.core.buffer.cpu.Buffer.from_bytes(b"0" * 8)
    )  # not valid, but doesn't matter

    arr = zarr.open_array(store=store)
    arr[:]


if __name__ == "__main__":
    asyncio.run(main())

that raises with

File ~/gh/zarr-developers/zarr-python/src/zarr/core/array.py:925, in Array.__getitem__(self, selection)
    923     return self.vindex[cast(CoordinateSelection | MaskSelection, selection)]
    924 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
--> 925     return self.get_orthogonal_selection(pure_selection, fields=fields)
    926 else:
    927     return self.get_basic_selection(cast(BasicSelection, pure_selection), fields=fields)

File ~/gh/zarr-developers/zarr-python/src/zarr/_compat.py:43, in _deprecate_positional_args.<locals>._inner_deprecate_positional_args.<locals>.inner_f(*args, **kwargs)
     41 extra_args = len(args) - len(all_args)
     42 if extra_args <= 0:
---> 43     return f(*args, **kwargs)
     45 # extra_args > 0
     46 args_msg = [
     47     f"{name}={arg}"
     48     for name, arg in zip(kwonly_args[:extra_args], args[-extra_args:], strict=False)
     49 ]

File ~/gh/zarr-developers/zarr-python/src/zarr/core/array.py:1367, in Array.get_orthogonal_selection(self, selection, out, fields, prototype)
   1365     prototype = default_buffer_prototype()
   1366 indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
-> 1367 return sync(
   1368     self._async_array._get_selection(
   1369         indexer=indexer, out=out, fields=fields, prototype=prototype
   1370     )
   1371 )

File ~/gh/zarr-developers/zarr-python/src/zarr/core/sync.py:91, in sync(coro, loop, timeout)
     88 return_result = next(iter(finished)).result()
     90 if isinstance(return_result, BaseException):
---> 91     raise return_result
     92 else:
     93     return return_result

File ~/gh/zarr-developers/zarr-python/src/zarr/core/sync.py:50, in _runner(coro)
     45 """
     46 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
     47 exception, the exception will be returned.
     48 """
     49 try:
---> 50     return await coro
     51 except Exception as ex:
     52     return ex

File ~/gh/zarr-developers/zarr-python/src/zarr/core/array.py:482, in AsyncArray._get_selection(self, indexer, prototype, out, fields)
    474     out_buffer = prototype.nd_buffer.create(
    475         shape=indexer.shape,
    476         dtype=out_dtype,
    477         order=self.order,
    478         fill_value=self.metadata.fill_value,
    479     )
    480 if product(indexer.shape) > 0:
    481     # reading chunks and decoding them
--> 482     await self.codec_pipeline.read(
    483         [
    484             (
    485                 self.store_path / self.metadata.encode_chunk_key(chunk_coords),
    486                 self.metadata.get_chunk_spec(chunk_coords, self.order, prototype=prototype),
    487                 chunk_selection,
    488                 out_selection,
    489             )
    490             for chunk_coords, chunk_selection, out_selection in indexer
    491         ],
    492         out_buffer,
    493         drop_axes=indexer.drop_axes,
    494     )
    495 return out_buffer.as_ndarray_like()

File ~/gh/zarr-developers/zarr-python/src/zarr/codecs/pipeline.py:427, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
    421 async def read(
    422     self,
    423     batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]],
    424     out: NDBuffer,
    425     drop_axes: tuple[int, ...] = (),
    426 ) -> None:
--> 427     await concurrent_map(
    428         [
    429             (single_batch_info, out, drop_axes)
    430             for single_batch_info in batched(batch_info, self.batch_size)
    431         ],
    432         self.read_batch,
    433         config.get("async.concurrency"),
    434     )

File ~/gh/zarr-developers/zarr-python/src/zarr/core/common.py:52, in concurrent_map(items, func, limit)
     48 async def concurrent_map(
     49     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
     50 ) -> list[V]:
     51     if limit is None:
---> 52         return await asyncio.gather(*[func(*item) for item in items])
     54     else:
     55         sem = asyncio.Semaphore(limit)

File ~/gh/zarr-developers/zarr-python/src/zarr/codecs/pipeline.py:260, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
    251 else:
    252     chunk_bytes_batch = await concurrent_map(
    253         [
    254             (byte_getter, array_spec.prototype)
   (...)
    258         config.get("async.concurrency"),
    259     )
--> 260     chunk_array_batch = await self.decode_batch(
    261         [
    262             (chunk_bytes, chunk_spec)
    263             for chunk_bytes, (_, chunk_spec, _, _) in zip(
    264                 chunk_bytes_batch, batch_info, strict=False
    265             )
    266         ],
    267     )
    268     for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip(
    269         chunk_array_batch, batch_info, strict=False
    270     ):
    271         if chunk_array is not None:

File ~/gh/zarr-developers/zarr-python/src/zarr/codecs/pipeline.py:177, in BatchedCodecPipeline.decode_batch(self, chunk_bytes_and_specs)
    172     chunk_bytes_batch = await bb_codec.decode(
    173         zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
    174     )
    176 ab_codec, chunk_spec_batch = ab_codec_with_spec
--> 177 chunk_array_batch = await ab_codec.decode(
    178     zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
    179 )
    181 for aa_codec, chunk_spec_batch in aa_codecs_with_spec[::-1]:
    182     chunk_array_batch = await aa_codec.decode(
    183         zip(chunk_array_batch, chunk_spec_batch, strict=False)
    184     )

File ~/gh/zarr-developers/zarr-python/src/zarr/abc/codec.py:125, in _Codec.decode(self, chunks_and_specs)
    109 async def decode(
    110     self,
    111     chunks_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]],
    112 ) -> Iterable[CodecInput | None]:
    113     """Decodes a batch of chunks.
    114     Chunks can be None in which case they are ignored by the codec.
    115
   (...)
    123     Iterable[CodecInput | None]
    124     """
--> 125     return await _batching_helper(self._decode_single, chunks_and_specs)

File ~/gh/zarr-developers/zarr-python/src/zarr/abc/codec.py:409, in _batching_helper(func, batch_info)
    405 async def _batching_helper(
    406     func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
    407     batch_info: Iterable[tuple[CodecInput | None, ArraySpec]],
    408 ) -> list[CodecOutput | None]:
--> 409     return await concurrent_map(
    410         list(batch_info),
    411         _noop_for_none(func),
    412         config.get("async.concurrency"),
    413     )

File ~/gh/zarr-developers/zarr-python/src/zarr/core/common.py:52, in concurrent_map(items, func, limit)
     48 async def concurrent_map(
     49     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
     50 ) -> list[V]:
     51     if limit is None:
---> 52         return await asyncio.gather(*[func(*item) for item in items])
     54     else:
     55         sem = asyncio.Semaphore(limit)

File ~/gh/zarr-developers/zarr-python/src/zarr/abc/codec.py:422, in _noop_for_none.<locals>.wrap(chunk, chunk_spec)
    420 if chunk is None:
    421     return None
--> 422 return await func(chunk, chunk_spec)

File ~/gh/zarr-developers/zarr-python/src/zarr/codecs/_v2.py:30, in V2Compressor._decode_single(self, chunk_bytes, chunk_spec)
     24 async def _decode_single(
     25     self,
     26     chunk_bytes: Buffer,
     27     chunk_spec: ArraySpec,
     28 ) -> NDBuffer:
     29     if self.compressor is not None:
---> 30         compressor = numcodecs.get_codec(self.compressor)
     31         chunk_numpy_array = ensure_ndarray(
     32             await to_thread(compressor.decode, chunk_bytes.as_array_like())
     33         )
     34     else:

File ~/gh/zarr-developers/zarr-python/.direnv/python-3.10/lib/python3.10/site-packages/numcodecs/registry.py:42, in get_codec(config)
     21 def get_codec(config):
     22     """Obtain a codec for the given configuration.
     23
     24     Parameters
   (...)
     40
     41     """
---> 42     config = dict(config)
     43     codec_id = config.pop('id', None)
     44     cls = codec_registry.get(codec_id)

TypeError: 'Blosc' object is not iterable

Additional output

No response

@TomAugspurger TomAugspurger added the bug Potential issues with the zarr-python library label Sep 25, 2024
@d-v-b
Copy link
Contributor

d-v-b commented Sep 25, 2024

but naively maybe all the serialization is done at the boundary and we update V2Compressor to use the concrete class?

I think this would be best, and it's consistent with what we do for the v3 codecs

numcodecs.abc.Codec is an Any type unfortunately, which is why mypy missed this.

:(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Potential issues with the zarr-python library
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants