Skip to content

Commit

Permalink
Legate HDF5 using kerchunk (#222)
Browse files Browse the repository at this point in the history
Support reading of HDF5 files using kerchunk.

#### Limitations

- Only read is supported. 
- Can only read whole arrays -- no partial access.
- No compression -- only  HDF5 files written without compression is supported.
- Groups not supported -- only access to individual Zarr arrays are implemented.

#### Example

```python

import h5py
import numpy as np

import legate_kvikio.kerchunk
import legate_kvikio.zarr


def hdf5_io(filename):
    a = np.arange(10000).reshape((100, 100))

    # Write array using h5py
    with h5py.File(filename, "w") as f:
        f.create_dataset("mydataset", chunks=(10, 10), data=a)

    # Read hdf5 file using legate+kerchunk
    b = legate_kvikio.kerchunk.hdf5_read(filename, dataset_name="mydataset")

    # They should be equal
    assert (a == b).all()


if __name__ == "__main__":
    hdf5_io("/tmp/legate-kvikio-io.hdf5")
```

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #222
  • Loading branch information
madsbk authored May 30, 2023
1 parent 8d098ce commit 6503c23
Show file tree
Hide file tree
Showing 12 changed files with 527 additions and 16 deletions.
188 changes: 188 additions & 0 deletions legate/benchmarks/hdf5_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import argparse
import contextlib
import pathlib
import tempfile
from time import perf_counter as clock
from typing import ContextManager

import h5py
import numpy as np

DATASET = "dataset-hdf5-read"


def try_open_hdf5_array(filepath, shape, chunks, dtype):
try:
with h5py.File(filepath, "r") as f:
a = f[DATASET]
if a.shape == shape and a.chunks == chunks and a.dtype == dtype:
return a
except BaseException:
pass
return None


def create_hdf5_array(filepath: pathlib.Path, shape, chunks, dtype=np.float64) -> None:
ret = try_open_hdf5_array(filepath, shape, chunks, dtype)
if ret is None:
filepath.unlink(missing_ok=True)
# Write array using h5py
with h5py.File(filepath, "w") as f:
f.create_dataset(DATASET, chunks=chunks, data=np.random.random(shape))
print(f"HDF5 '{filepath}': shape: {shape}, " f"chunks: {chunks}, dtype: {dtype}")


@contextlib.contextmanager
def dask_h5py(args):
import cupy
import h5py
from dask import array as da
from dask_cuda import LocalCUDACluster
from distributed import Client

def f():
t0 = clock()
with h5py.File(args.dir / "A", "r") as af:
with h5py.File(args.dir / "B", "r") as bf:
a = da.from_array(af[DATASET], chunks=af[DATASET].chunks)
b = da.from_array(bf[DATASET], chunks=bf[DATASET].chunks)
a = a.map_blocks(cupy.asarray)
b = b.map_blocks(cupy.asarray)
c = args.op(da, a, b)
int(c.sum().compute())
t1 = clock()
return t1 - t0

with LocalCUDACluster(n_workers=args.n_workers) as cluster:
with Client(cluster):
yield f


@contextlib.contextmanager
def run_legate(args):
import cunumeric as num

from legate.core import get_legate_runtime
from legate_kvikio.kerchunk import hdf5_read

def f():
get_legate_runtime().issue_execution_fence(block=True)
t0 = clock()
a = hdf5_read(args.dir / "A", dataset_name=DATASET)
b = hdf5_read(args.dir / "B", dataset_name=DATASET)
c = args.op(num, a, b)
int(c.sum())
t1 = clock()
return t1 - t0

yield f


API = {
"dask-h5py": dask_h5py,
"legate": run_legate,
}

OP = {"add": lambda xp, a, b: a + b, "matmul": lambda xp, a, b: xp.matmul(a, b)}


def main(args):
create_hdf5_array(args.dir / "A", chunks=(args.c, args.c), shape=(args.m, args.m))
create_hdf5_array(args.dir / "B", chunks=(args.c, args.c), shape=(args.m, args.m))

timings = []
with API[args.api](args) as f:
for _ in range(args.n_warmup_runs):
elapsed = f()
print("elapsed[warmup]: ", elapsed)
for i in range(args.nruns):
elapsed = f()
print(f"elapsed[run #{i}]: ", elapsed)
timings.append(elapsed)
print(f"elapsed mean: {np.mean(timings):.5}s (std: {np.std(timings):.5}s)")


if __name__ == "__main__":

def parse_directory(x):
if x is None:
return x
else:
p = pathlib.Path(x)
if not p.is_dir():
raise argparse.ArgumentTypeError("Must be a directory")
return p

parser = argparse.ArgumentParser(description="Matrix operation on two Zarr files")
parser.add_argument(
"-m",
default=100,
type=int,
help="Dimension of the two square input matrices (MxM) (default: %(default)s).",
)
parser.add_argument(
"-c",
default=None,
type=int,
help="Dimension of the square chunks (CxC) (default: M//10).",
)
parser.add_argument(
"-d",
"--dir",
metavar="PATH",
default=None,
type=parse_directory,
help="Path to the directory to r/w from (default: tempfile.TemporaryDirectory)",
)
parser.add_argument(
"--nruns",
metavar="RUNS",
default=1,
type=int,
help="Number of runs (default: %(default)s).",
)
parser.add_argument(
"--api",
metavar="API",
default=tuple(API.keys())[0],
choices=tuple(API.keys()),
help="API to use {%(choices)s}",
)
parser.add_argument(
"--n-workers",
default=1,
type=int,
help="Number of workers (default: %(default)s).",
)
parser.add_argument(
"--op",
metavar="OP",
default=tuple(OP.keys())[0],
choices=tuple(OP.keys()),
help="Operation to run {%(choices)s}",
)
parser.add_argument(
"--n-warmup-runs",
default=1,
type=int,
help="Number of warmup runs (default: %(default)s).",
)

args = parser.parse_args()
args.op = OP[args.op] # Parse the operation argument
if args.c is None:
args.c = args.m // 10

# Create a temporary directory if user didn't specify a directory
temp_dir: tempfile.TemporaryDirectory | ContextManager
if args.dir is None:
temp_dir = tempfile.TemporaryDirectory()
args.dir = pathlib.Path(temp_dir.name)
else:
temp_dir = contextlib.nullcontext()

with temp_dir:
main(args)
1 change: 1 addition & 0 deletions legate/cpp/task_opcodes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ enum TaskOpCode {
OP_READ,
OP_TILE_WRITE,
OP_TILE_READ,
OP_TILE_READ_BY_OFFSETS,
OP_NUM_TASK_IDS, // must be last
};
97 changes: 93 additions & 4 deletions legate/cpp/tile_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <filesystem>
#include <iostream>
#include <stdexcept>
#include <utility>

#include "legate_mapping.hpp"
Expand Down Expand Up @@ -70,7 +71,7 @@ std::filesystem::path get_file_path(const std::string& dirpath,
*
* @tparam IsReadOperation Whether the operation is a read or a write operation
* @param context The Legate task context
* @param store The Legate store ti read or write
* @param store The Legate store to read or write
*/
template <bool IsReadOperation>
struct tile_read_write_fn {
Expand All @@ -89,22 +90,79 @@ struct tile_read_write_fn {
auto shape_volume = shape.volume();
if (shape_volume == 0) { return; }
size_t nbytes = shape_volume * sizeof(DTYPE);
std::array<size_t, DIM> strides{};

// We know that the accessor is contiguous because we set `policy.exact = true`
// in `Mapper::store_mappings()`.
if constexpr (IsReadOperation) {
kvikio::FileHandle f(filepath, "r");
auto* data = store.write_accessor<DTYPE, DIM>().ptr(shape, strides.data());
auto* data = store.write_accessor<DTYPE, DIM>().ptr(shape);
f.pread(data, nbytes).get();
} else {
kvikio::FileHandle f(filepath, "w");
const auto* data = store.read_accessor<DTYPE, DIM>().ptr(shape, strides.data());
const auto* data = store.read_accessor<DTYPE, DIM>().ptr(shape);
f.pwrite(data, nbytes).get();
}
}
};

/**
* @brief Flatten the domain point to a 1D point
*
* @param lo_dp Lower point
* @param hi_dp High point
* @param point_dp The domain point to flatten
* @return The flatten domain point
*/
template <int32_t DIM>
size_t linearize(const legate::DomainPoint& lo_dp,
const legate::DomainPoint& hi_dp,
const legate::DomainPoint& point_dp)
{
const legate::Point<DIM> lo = lo_dp;
const legate::Point<DIM> hi = hi_dp;
const legate::Point<DIM> point = point_dp - lo_dp;
const legate::Point<DIM> extents = hi - lo + legate::Point<DIM>::ONES();
size_t idx = 0;
for (int32_t dim = 0; dim < DIM; ++dim) {
idx = idx * extents[dim] + point[dim];
}
return idx;
}

/**
* @brief Functor for tiling read Legate store by offsets from disk using KvikIO
*
* @param context The Legate task context
* @param store The Legate output store
*/
struct tile_read_by_offsets_fn {
template <legate::Type::Code CODE, int32_t DIM>
void operator()(legate::TaskContext& context, legate::Store& store)
{
using DTYPE = legate::legate_type_of<CODE>;
const auto task_index = context.get_task_index();
const auto launch_domain = context.get_launch_domain();
const std::string path = context.scalars().at(0).value<std::string>();
legate::Span<const uint64_t> offsets = context.scalars().at(1).values<uint64_t>();
legate::Span<const uint64_t> tile_shape = context.scalars().at(2).values<uint64_t>();

// Flatten task index
uint32_t flatten_task_index =
linearize<DIM>(launch_domain.lo(), launch_domain.hi(), task_index);

auto shape = store.shape<DIM>();
auto shape_volume = shape.volume();
if (shape_volume == 0) { return; }
size_t nbytes = shape_volume * sizeof(DTYPE);

// We know that the accessor is contiguous because we set `policy.exact = true`
// in `Mapper::store_mappings()`.
kvikio::FileHandle f(path, "r");
auto* data = store.write_accessor<DTYPE, DIM>().ptr(shape);
f.pread(data, nbytes, offsets[flatten_task_index]).get();
}
};

} // namespace

namespace legate_kvikio {
Expand Down Expand Up @@ -167,6 +225,36 @@ class TileReadTask : public Task<TileReadTask, TaskOpCode::OP_TILE_READ> {
}
};

/**
* @brief Read a tiled Legate store by offset to disk using KvikIO
* Task signature:
* - scalars:
* - path: std::string
* - offsets: tuple of int64_t
* - tile_shape: tuple of int64_t
* - outputs:
* - buffer: store (any dtype)
*
* NB: the store must be contigues. To make Legate in force this,
* set `policy.exact = true` in `Mapper::store_mappings()`.
*
*/
class TileReadByOffsetsTask
: public Task<TileReadByOffsetsTask, TaskOpCode::OP_TILE_READ_BY_OFFSETS> {
public:
static void cpu_variant(legate::TaskContext& context)
{
legate::Store& store = context.outputs().at(0);
legate::double_dispatch(store.dim(), store.code(), tile_read_by_offsets_fn{}, context, store);
}

static void gpu_variant(legate::TaskContext& context)
{
// Since KvikIO supports both GPU and CPU memory seamlessly, we reuse the CPU variant.
cpu_variant(context);
}
};

} // namespace legate_kvikio

namespace {
Expand All @@ -175,6 +263,7 @@ void __attribute__((constructor)) register_tasks()
{
legate_kvikio::TileWriteTask::register_variants();
legate_kvikio::TileReadTask::register_variants();
legate_kvikio::TileReadByOffsetsTask::register_variants();
}

} // namespace
26 changes: 26 additions & 0 deletions legate/examples/hdf5_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.


import h5py
import numpy as np

import legate_kvikio.kerchunk


def hdf5_io(filename):
a = np.arange(10000).reshape((100, 100))

# Write array using h5py
with h5py.File(filename, "w") as f:
f.create_dataset("mydataset", chunks=(10, 10), data=a)

# Read hdf5 file using legate+kerchunk
b = legate_kvikio.kerchunk.hdf5_read(filename, dataset_name="mydataset")

# They should be equal
assert (a == b).all()


if __name__ == "__main__":
hdf5_io("/tmp/legate-kvikio-io.hdf5")
27 changes: 27 additions & 0 deletions legate/examples/zarr_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import cunumeric as num

import legate.core
import legate_kvikio.zarr


def zarr_io(dirname):
a = num.arange(10000).reshape(100, 100)

# Write array to a Zarr file by chunks of 10x10.
legate_kvikio.zarr.write_array(a, dirname, chunks=(10, 10))

# Block until done writing.
legate.core.get_legate_runtime().issue_execution_fence(block=True)

# Read array from a Zarr file.
b = legate_kvikio.zarr.read_array(dirname)

# They should be equal
assert (a == b).all()


if __name__ == "__main__":
zarr_io("/tmp/legate-kvikio-zarr-io")
Loading

0 comments on commit 6503c23

Please sign in to comment.