Skip to content

Commit

Permalink
Function to rechunk single variables or batch variables from existing…
Browse files Browse the repository at this point in the history
… MDIO (#368)

* Improve MDIO API with accessor modes and rechunk functionality

The MDIO API has been enhanced with support for additional file operation modes ('w' for rechunking) and a new rechunking feature. The 'copy_mdio' function now accepts strongly typed arguments, and 'rechunk' functions have been added to efficiently resize chunks for large datasets, with progress tracking and error handling improvements.

* Add usage examples to rechunk functions

Expanded the docstrings in `convenience.py` to include examples illustrating how to use `rechunk` and `rechunk_batch` functions for clarity and ease of use for developers.

* Add convenience functions section to docs

The new section "Convenience Functions" has been added to the reference documentation. It specifically includes documentation for the `mdio.api.convenience` module, excluding `create_rechunk_plan` and `write_rechunked_values`.

* Add optional compressor parameter to rechunk functions

The rechunk operations in the MDIO API now accept an optional compressor parameter, allowing users to specify a custom data compression codec. The default compressor, Blosc('zstd'), is set if none is provided, ensuring backward compatibility.

* Add rechunk function TODO comment

Inserted a TODO comment in `rechunk` function for writing tests, referencing the relevant issue.

* Refactor buffer size and improve documentation

Removed an extraneous newline and introduced a constant MAX_BUFFER to handle buffer size for chunking. Updated the create_rechunk_plan function's docstring to include the buffer size details, making it clearer how the buffer size can be adjusted by altering the MAX_BUFFER variable. This change enhances code readability and maintainability.

* Add rechunking optimization demo

Added a new demo `rechunking.ipynb` to demonstrate how to optimize access patterns using rechunking and lossy compression. The notebook includes detailed steps and code snippets to create optimized, compressed copies for different access patterns, enhancing read performance.

* Refactor notebook: reset execution counts and tidy metadata

Streamlined the Jupyter notebook by resetting execution counts and cleaning up metadata fields. This provides a fresh state for the execution environment and a more structured document for other developers to follow.

* Update rechunking notebook with minor tweaks

Corrected the reference from 'notebook' to 'page', added a parenthetical clarification to a section heading, and updated the performance benchmark outputs. These changes improve document clarity and provide the latest performance metrics.
  • Loading branch information
tasansal authored Mar 12, 2024
1 parent 64b9ded commit d157465
Show file tree
Hide file tree
Showing 5 changed files with 742 additions and 20 deletions.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ maxdepth: 1
installation
notebooks/quickstart
notebooks/compression
notebooks/rechunking
usage
reference
contributing
Expand Down
514 changes: 514 additions & 0 deletions docs/notebooks/rechunking.ipynb

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,11 @@ and
.. automodule:: mdio.core.serialization
:members:
```

## Convenience Functions

```{eval-rst}
.. automodule:: mdio.api.convenience
:members:
:exclude-members: create_rechunk_plan, write_rechunked_values
```
32 changes: 18 additions & 14 deletions src/mdio/api/accessor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""MDIO accessor APIs."""


from __future__ import annotations

import dask.array as da
Expand Down Expand Up @@ -61,7 +60,9 @@ class MDIOAccessor:
mdio_path_or_buffer: Store URL for MDIO file. This can be either on
a local disk, or a cloud object store.
mode: Read or read/write mode. The file must exist. Options are
in {'r', 'r+'}.
in {'r', 'r+', 'w'}. 'r' is read only, 'r+' is append mode where
only existing arrays can be modified, 'w' is similar to 'r+'
but rechunking or other file-wide operations are allowed.
access_pattern: Chunk access pattern, optional. Default is "012".
Examples: '012', '01', '01234'.
storage_options: Options for the storage backend. By default,
Expand Down Expand Up @@ -133,9 +134,9 @@ def __init__(
mdio_path_or_buffer: str,
mode: str,
access_pattern: str,
storage_options: dict,
storage_options: dict | None,
return_metadata: bool,
new_chunks: tuple[int, ...],
new_chunks: tuple[int, ...] | None,
backend: str,
memory_cache_size: int,
disk_cache: bool,
Expand Down Expand Up @@ -191,10 +192,13 @@ def _validate_store(self, storage_options):
def _connect(self):
"""Open the zarr root."""
try:
self.root = zarr.open_consolidated(
store=self.store,
mode=self.mode,
)
if self.mode in {"r", "r+"}:
self.root = zarr.open_consolidated(store=self.store, mode=self.mode)
elif self.mode == "w":
self.root = zarr.open(store=self.store, mode="r+")
else:
msg = f"Invalid mode: {self.mode}"
raise ValueError(msg)
except KeyError as e:
msg = (
f"MDIO file not found or corrupt at {self.store.path}. "
Expand Down Expand Up @@ -377,7 +381,7 @@ def _data_group(self) -> zarr.Group:
def __getitem__(self, item: int | tuple) -> npt.ArrayLike | da.Array | tuple:
"""Data getter."""
if self._return_metadata is True:
if isinstance(item, int) or isinstance(item, slice):
if isinstance(item, (int, slice)):
meta_index = item
elif len(item) == len(self.shape):
meta_index = tuple(dim for dim in item[:-1])
Expand All @@ -400,7 +404,7 @@ def coord_to_index(
self,
*args,
dimensions: str | list[str] | None = None,
) -> tuple[NDArray[np.int], ...]:
) -> tuple[NDArray[int], ...]:
"""Convert dimension coordinate to zero-based index.
The coordinate labels of the array dimensions are converted to
Expand Down Expand Up @@ -576,8 +580,8 @@ def __init__(
return_metadata: bool = False,
new_chunks: tuple[int, ...] = None,
backend: str = "zarr",
memory_cache_size=0,
disk_cache=False,
memory_cache_size: int = 0,
disk_cache: bool = False,
): # TODO: Disabled all caching by default, sometimes causes performance issues
"""Initialize super class with `r` permission."""
super().__init__(
Expand Down Expand Up @@ -632,8 +636,8 @@ def __init__(
return_metadata: bool = False,
new_chunks: tuple[int, ...] = None,
backend: str = "zarr",
memory_cache_size=0,
disk_cache=False,
memory_cache_size: int = 0,
disk_cache: bool = False,
): # TODO: Disabled all caching by default, sometimes causes performance issues
"""Initialize super class with `r+` permission."""
super().__init__(
Expand Down
207 changes: 201 additions & 6 deletions src/mdio/api/convenience.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
"""Convenience APIs for working with MDIO files."""


from __future__ import annotations

from typing import TYPE_CHECKING

import zarr
from tqdm.auto import tqdm
from zarr import Blosc

from mdio.api.io_utils import process_url
from mdio.core.indexing import ChunkIterator


if TYPE_CHECKING:
from numcodecs.abc import Codec
from numpy.typing import NDArray
from zarr import Array

from mdio import MDIOAccessor
from mdio import MDIOReader

def copy_mdio(
source,

def copy_mdio( # noqa: PLR0913
source: MDIOReader,
dest_path_or_buffer: str,
excludes="",
includes="",
excludes: str = "",
includes: str = "",
storage_options: dict | None = None,
overwrite: bool = False,
) -> None:
Expand Down Expand Up @@ -61,7 +74,7 @@ def copy_mdio(
)

if len(excludes) > 0:
data_path = "/".join(["data", excludes])
data_path = f"data/{excludes}"
source_array = source.root[data_path]
dimension_separator = source_array._dimension_separator

Expand All @@ -72,3 +85,185 @@ def copy_mdio(
overwrite=overwrite,
dimension_separator=dimension_separator,
)


CREATE_KW = {
"dimension_separator": "/",
"write_empty_chunks": False,
}
MAX_BUFFER = 512


def create_rechunk_plan(
source: MDIOAccessor,
chunks_list: list[tuple[int, ...]],
suffix_list: list[str],
compressor: Codec | None = None,
overwrite: bool = False,
) -> tuple[[list[Array]], list[Array], NDArray, ChunkIterator]:
"""Create rechunk plan based on source and user input.
It will buffer 512 x n-dimensions in memory. Approximately
128MB. However, if you need to adjust the buffer size, change
the `MAX_BUFFER` variable in this module.
Args:
source: MDIO accessor instance. Data will be copied from here.
chunks_list: List of tuples containing new chunk sizes.
suffix_list: List of suffixes to append to new chunk sizes.
compressor: Data compressor to use, optional. Default is Blosc('zstd').
overwrite: Overwrite destination or not.
Returns:
Tuple containing the rechunk plan variables and iterator.
Raises:
NameError: if trying to write to original data.
"""
data_group = source._data_group
metadata_group = source._metadata_group

data_array = source._traces
metadata_array = source._headers
live_mask = source.live_mask[:]

metadata_arrs = []
data_arrs = []

header_compressor = Blosc("zstd")
trace_compressor = Blosc("zstd") if compressor is None else compressor

for chunks, suffix in zip(chunks_list, suffix_list): # noqa: B905
norm_chunks = [
min(chunk, size) for chunk, size in zip(chunks, source.shape) # noqa: B905
]

if suffix == source.access_pattern:
msg = f"Can't write over source data with suffix {suffix}"
raise NameError(msg)

metadata_arrs.append(
metadata_group.zeros_like(
name=f"chunked_{suffix}_trace_headers",
data=metadata_array,
chunks=norm_chunks[:-1],
compressor=header_compressor,
overwrite=overwrite,
**CREATE_KW,
)
)

data_arrs.append(
data_group.zeros_like(
name=f"chunked_{suffix}",
data=data_array,
chunks=norm_chunks,
compressor=trace_compressor,
overwrite=overwrite,
**CREATE_KW,
)
)

n_dimension = len(data_array.shape)
dummy_array = zarr.empty_like(data_array, chunks=(MAX_BUFFER,) * n_dimension)
iterator = ChunkIterator(dummy_array)

return metadata_arrs, data_arrs, live_mask, iterator


def write_rechunked_values( # noqa: PLR0913
source: MDIOAccessor,
suffix_list: list[str],
metadata_arrs_out: list[Array],
data_arrs_out: list[Array],
live_mask: NDArray,
iterator: ChunkIterator,
) -> None:
"""Create rechunk plan based on source and user input.
Args:
source: MDIO accessor instance. Data will be copied from here.
suffix_list: List of suffixes to append to new chunk sizes.
metadata_arrs_out: List of new metadata Zarr arrays.
data_arrs_out: List of new data Zarr arrays.
live_mask: Live mask to apply during copies.
iterator: The chunk iterator to use.
"""
suffix_names = ",".join(suffix_list)
for slice_ in tqdm(iterator, desc=f"Rechunking to {suffix_names}", unit="chunk"):
meta_slice = slice_[:-1]

if live_mask[meta_slice].sum() == 0:
continue

for array in metadata_arrs_out:
array[meta_slice] = source._headers[meta_slice]

for array in data_arrs_out:
array[slice_] = source._traces[slice_]

zarr.consolidate_metadata(source.store)


def rechunk_batch(
source: MDIOAccessor,
chunks_list: list[tuple[int, ...]],
suffix_list: list[str],
compressor: Codec | None = None,
overwrite: bool = False,
) -> None:
"""Rechunk MDIO file to multiple variables, reading it once.
Args:
source: MDIO accessor instance. Data will be copied from here.
chunks_list: List of tuples containing new chunk sizes.
suffix_list: List of suffixes to append to new chunk sizes.
compressor: Data compressor to use, optional. Default is Blosc('zstd').
overwrite: Overwrite destination or not.
Examples:
To rechunk multiple variables we can do things like:
>>> accessor = MDIOAccessor(...)
>>> rechunk_batch(
>>> accessor,
>>> chunks_list=[(1, 1024, 1024), (1024, 1, 1024), (1024, 1024, 1)],
>>> suffix_list=["fast_il", "fast_xl", "fast_z"],
>>> )
"""
plan = create_rechunk_plan(
source,
chunks_list=chunks_list,
suffix_list=suffix_list,
compressor=compressor,
overwrite=overwrite,
)

write_rechunked_values(source, suffix_list, *plan)


def rechunk(
source: MDIOAccessor,
chunks: tuple[int, ...],
suffix: str,
compressor: Codec | None = None,
overwrite: bool = False,
) -> None:
"""Rechunk MDIO file adding a new variable.
Args:
source: MDIO accessor instance. Data will be copied from here.
chunks: Tuple containing chunk sizes for new rechunked array.
suffix: Suffix to append to new rechunked array.
compressor: Data compressor to use, optional. Default is Blosc('zstd').
overwrite: Overwrite destination or not.
Examples:
To rechunk a single variable we can do this
>>> accessor = MDIOAccessor(...)
>>> rechunk(accessor, (1, 1024, 1024), suffix="fast_il")
"""
# TODO(Anyone): Write tests for rechunking functions
# https://github.com/TGSAI/mdio-python/issues/369
rechunk_batch(source, [chunks], [suffix], compressor, overwrite)

0 comments on commit d157465

Please sign in to comment.