Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zarr+CuPy+GDS+nvCOMP made easy #267

Merged
merged 8 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions python/examples/zarr_cupy_nvcomp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import cupy
import numpy
import zarr

import kvikio
import kvikio.zarr


def main(path):
a = cupy.arange(20)

# Let's use KvikIO's convenience function `open_cupy_array()` to create
# a new Zarr file on disk. Its semantic is the same as `zarr.open_array()`
# but uses a GDS file store, nvCOMP compression, and CuPy arrays.
z = kvikio.zarr.open_cupy_array(store=path, mode="w", shape=(20,), chunks=(5,))

# `z` is a regular Zarr Array that we can write to as usual
z[0:10] = numpy.arange(0, 10)
# but it also support direct reads and writes of CuPy arrays
z[10:20] = cupy.arange(10, 20)

# Reading `z` returns a CuPy array
assert isinstance(z[:], cupy.ndarray)
assert (a == z[:]).all()

# By default, `open_cupy_array()` uses nvCOMP's `lz4` GPU compression, which is
# compatible with NumCodecs's `lz4` CPU compression (CPU). Normally, it is not
# possible to change which decompressor to use when reading a Zarr file. The
# decompressor specified in the Zarr file's metadata is always used. However,
# `open_cupy_array()` makes it possible to overwrite the metadata on-the-fly
# without having to modify the Zarr file on disk. In fact, the Zarr file written
# above appears, in the metadata, as if it was written by NumCodecs's `lz4` CPU
# compression. Thus, we can open the file using Zarr's regular API and the CPU.
z = zarr.open_array(path)
# `z` is now read as a regular NumPy array
assert isinstance(z[:], numpy.ndarray)
assert (a.get() == z[:]).all()
# and we can write to is as usual
z[:] = numpy.arange(20, 40)

# Let's read the Zarr file back into a CuPy array. Notice, even though the metadata
# on disk is specifying NumCodecs's `lz4` CPU decompressor, `open_cupy_array` will
# use nvCOMP to decompress the files.
z = kvikio.zarr.open_cupy_array(store=path, mode="r")
assert isinstance(z[:], cupy.ndarray)
assert (cupy.arange(20, 40) == z[:]).all()


if __name__ == "__main__":
main("/tmp/zarr-cupy-nvcomp")
6 changes: 3 additions & 3 deletions python/kvikio/nvcomp_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from numcodecs.abc import Codec
from numcodecs.compat import ensure_contiguous_ndarray_like

import kvikio._lib.libnvcomp_ll as _ll
from kvikio._lib.libnvcomp_ll import SUPPORTED_ALGORITHMS


class NvCompBatchCodec(Codec):
Expand All @@ -34,11 +34,11 @@ def __init__(
stream: Optional[cp.cuda.Stream] = None,
) -> None:
algo_id = algorithm.lower()
algo_t = _ll.SUPPORTED_ALGORITHMS.get(algo_id, None)
algo_t = SUPPORTED_ALGORITHMS.get(algo_id, None)
if algo_t is None:
raise ValueError(
f"{algorithm} is not supported. "
f"Must be one of: {list(_ll.SUPPORTED_ALGORITHMS.keys())}"
f"Must be one of: {list(SUPPORTED_ALGORITHMS.keys())}"
)

self.algorithm = algo_id
Expand Down
159 changes: 153 additions & 6 deletions python/kvikio/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import os
import os.path
from abc import abstractmethod
from typing import Any, Mapping, Sequence
from typing import Any, Literal, Mapping, Optional, Sequence, Union

import cupy
import numcodecs
import numpy
import numpy as np
import zarr
Expand All @@ -20,6 +21,9 @@

import kvikio
import kvikio.nvcomp
import kvikio.nvcomp_codec
import kvikio.zarr
from kvikio.nvcomp_codec import NvCompBatchCodec

MINIMUM_ZARR_VERSION = "2.15"

Expand All @@ -37,22 +41,60 @@ class GDSStore(zarr.storage.DirectoryStore):
It uses KvikIO for reads and writes, which in turn will use GDS
when applicable.

Parameters
----------
path : string
Location of directory to use as the root of the storage hierarchy.
normalize_keys : bool, optional
If True, all store keys will be normalized to use lower case characters
(e.g. 'foo' and 'FOO' will be treated as equivalent). This can be
useful to avoid potential discrepancies between case-sensitive and
case-insensitive file system. Default value is False.
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
compressor_config_overwrite
If not None, use this `Mapping` to specify what is written to the Zarr metadata
file on disk (`.zarray`). Normally, Zarr writes the configuration[1] given by
the `compressor` argument to the `.zarray` file. Use this argument to overwrite
the normal configuration and use the specified `Mapping` instead.
decompressor_config_overwrite
If not None, use this `Mapping` to specify what compressor configuration[1] is
used for decompressing no matter the configuration found in the Zarr metadata
on disk (the `.zarray` file).

[1] https://github.com/zarr-developers/numcodecs/blob/cb155432/numcodecs/abc.py#L79

Notes
-----
GDSStore doesn't implement `_fromfile()` thus non-array data such as
meta data is always read into host memory.
This is because only zarr.Array use getitems() to retrieve data.
Atomic writes are used, which means that data are first written to a
temporary file, then moved into place when the write is successfully
completed. Files are only held open while they are being read or written and are
closed immediately afterwards, so there is no need to manually close any files.

Safe to write in multiple threads or processes.
"""

# The default output array type used by getitems().
default_meta_array = numpy.empty(())

def __init__(self, *args, **kwargs) -> None:
def __init__(
self,
path,
normalize_keys=False,
dimension_separator=None,
*,
compressor_config_overwrite: Optional[Mapping] = None,
decompressor_config_overwrite: Optional[Mapping] = None,
) -> None:
if not kvikio.zarr.supported:
raise RuntimeError(
f"GDSStore requires Zarr >={kvikio.zarr.MINIMUM_ZARR_VERSION}"
)
super().__init__(*args, **kwargs)
super().__init__(
path, normalize_keys=normalize_keys, dimension_separator=dimension_separator
)
self.compressor_config_overwrite = compressor_config_overwrite
self.decompressor_config_overwrite = decompressor_config_overwrite

def __eq__(self, other):
return isinstance(other, GDSStore) and self.path == other.path
Expand All @@ -62,6 +104,23 @@ def _tofile(self, a, fn):
written = f.write(a)
assert written == a.nbytes

def __getitem__(self, key):
ret = super().__getitem__(key)
if self.decompressor_config_overwrite and key == ".zarray":
meta = self._metadata_class.decode_array_metadata(ret)
if meta["compressor"]:
meta["compressor"] = self.decompressor_config_overwrite
ret = self._metadata_class.encode_array_metadata(meta)
return ret

def __setitem__(self, key, value):
if self.compressor_config_overwrite and key == ".zarray":
meta = self._metadata_class.decode_array_metadata(value)
if meta["compressor"]:
meta["compressor"] = self.compressor_config_overwrite
value = self._metadata_class.encode_array_metadata(meta)
super().__setitem__(key, value)

def getitems(
self,
keys: Sequence[str],
Expand Down Expand Up @@ -111,6 +170,94 @@ def getitems(
return ret


lz4_cpu_compressor = numcodecs.LZ4()
lz4_gpu_compressor = NvCompBatchCodec("lz4")


def open_cupy_array(
store: Union[os.PathLike, str],
mode: Literal["r", "r+", "a", "w", "w-"] = "a",
compressor: Codec = lz4_gpu_compressor,
meta_array=cupy.empty(()),
**kwargs,
) -> zarr.Array:
"""Open an Zarr array as a CuPy-like array using file-mode-like semantics.

This function is a CUDA friendly version of `zarr.open_array` that reads
and writes to CuPy arrays. Beside the arguments listed below, the arguments
have the same semantic as in `zarr.open_array`.

Parameters
----------
store
Path to directory in file system. As opposed to `zarr.open_array`,
Store and path to zip files isn't supported.
mode
Persistence mode: 'r' means read only (must exist); 'r+' means
read/write (must exist); 'a' means read/write (create if doesn't
exist); 'w' means create (overwrite if exists); 'w-' means create
(fail if exists).
compressor : Codec, optional
The compressor use when create a Zarr file or None if no compressor
is to be used. This is ignored in "r" and "r+" mode. By default the
LZ4 compressor by nvCOMP is used.
meta_array : array-like, optional
An CuPy-like array instance to use for determining arrays to create and
return to users. It must implement `__cuda_array_interface__`.
**kwargs
The rest of the arguments are forwarded to `zarr.open_array` as-is.

Returns
-------
Zarr array backed by a GDS file store, nvCOMP compression, and CuPy arrays.
"""

if not isinstance(store, (str, os.PathLike)):
raise ValueError("store must be a path")
store = str(os.fspath(store))
if not hasattr(meta_array, "__cuda_array_interface__"):
raise ValueError("meta_array must implement __cuda_array_interface__")

if mode in ("r", "r+"):
ret = zarr.open_array(
store=kvikio.zarr.GDSStore(path=store),
mode=mode,
meta_array=meta_array,
**kwargs,
)
if ret.compressor == lz4_cpu_compressor:
ret = zarr.open_array(
store=kvikio.zarr.GDSStore(
path=store,
compressor_config_overwrite=ret.compressor.get_config(),
decompressor_config_overwrite=lz4_gpu_compressor.get_config(),
),
mode=mode,
meta_array=meta_array,
**kwargs,
)
return ret

if compressor == lz4_gpu_compressor:
compressor_config_overwrite = lz4_cpu_compressor.get_config()
decompressor_config_overwrite = compressor.get_config()
else:
compressor_config_overwrite = None
decompressor_config_overwrite = None

return zarr.open_array(
store=kvikio.zarr.GDSStore(
path=store,
compressor_config_overwrite=compressor_config_overwrite,
decompressor_config_overwrite=decompressor_config_overwrite,
),
mode=mode,
meta_array=meta_array,
compressor=compressor,
**kwargs,
)


class NVCompCompressor(Codec):
"""Abstract base class for nvCOMP compressors

Expand Down
12 changes: 11 additions & 1 deletion python/tests/test_examples.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import os
Expand All @@ -16,3 +16,13 @@ def test_hello_world(tmp_path, monkeypatch):

monkeypatch.syspath_prepend(str(examples_path))
import_module("hello_world").main(tmp_path / "test-file")


def test_zarr_cupy_nvcomp(tmp_path, monkeypatch):
"""Test examples/zarr_cupy_nvcomp.py"""

# `examples/zarr_cupy_nvcomp.py` requires the Zarr submodule
pytest.importorskip("kvikio.zarr")

monkeypatch.syspath_prepend(str(examples_path))
import_module("zarr_cupy_nvcomp").main(tmp_path / "test-file")
Loading