Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Legate HDF5 using kerchunk #222

Merged
merged 18 commits into from
May 30, 2023
Merged
188 changes: 188 additions & 0 deletions legate/benchmarks/hdf5_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import argparse
import contextlib
import pathlib
import tempfile
from time import perf_counter as clock
from typing import ContextManager

import h5py
import numpy as np

DATASET = "dataset-hdf5-read"


def try_open_hdf5_array(filepath, shape, chunks, dtype):
try:
with h5py.File(filepath, "r") as f:
a = f[DATASET]
if a.shape == shape and a.chunks == chunks and a.dtype == dtype:
return a
except BaseException:
pass
return None


def create_hdf5_array(filepath: pathlib.Path, shape, chunks, dtype=np.float64) -> None:
ret = try_open_hdf5_array(filepath, shape, chunks, dtype)
if ret is None:
filepath.unlink(missing_ok=True)
# Write array using h5py
with h5py.File(filepath, "w") as f:
f.create_dataset(DATASET, chunks=chunks, data=np.random.random(shape))
print(f"HDF5 '{filepath}': shape: {shape}, " f"chunks: {chunks}, dtype: {dtype}")


@contextlib.contextmanager
def dask_h5py(args):
import cupy
import h5py
from dask import array as da
from dask_cuda import LocalCUDACluster
from distributed import Client

def f():
t0 = clock()
with h5py.File(args.dir / "A", "r") as af:
with h5py.File(args.dir / "B", "r") as bf:
a = da.from_array(af[DATASET], chunks=af[DATASET].chunks)
b = da.from_array(bf[DATASET], chunks=bf[DATASET].chunks)
a = a.map_blocks(cupy.asarray)
b = b.map_blocks(cupy.asarray)
c = args.op(da, a, b)
int(c.sum().compute())
t1 = clock()
return t1 - t0

with LocalCUDACluster(n_workers=args.n_workers) as cluster:
with Client(cluster):
yield f


@contextlib.contextmanager
def run_legate(args):
import cunumeric as num

from legate.core import get_legate_runtime
from legate_kvikio.kerchunk import hdf5_read

def f():
get_legate_runtime().issue_execution_fence(block=True)
t0 = clock()
a = hdf5_read(args.dir / "A", dataset_name=DATASET)
b = hdf5_read(args.dir / "B", dataset_name=DATASET)
c = args.op(num, a, b)
int(c.sum())
t1 = clock()
return t1 - t0

yield f


API = {
"dask-h5py": dask_h5py,
"legate": run_legate,
}

OP = {"add": lambda xp, a, b: a + b, "matmul": lambda xp, a, b: xp.matmul(a, b)}
madsbk marked this conversation as resolved.
Show resolved Hide resolved


def main(args):
create_hdf5_array(args.dir / "A", chunks=(args.c, args.c), shape=(args.m, args.m))
create_hdf5_array(args.dir / "B", chunks=(args.c, args.c), shape=(args.m, args.m))

timings = []
with API[args.api](args) as f:
for _ in range(args.n_warmup_runs):
elapsed = f()
print("elapsed[warmup]: ", elapsed)
for i in range(args.nruns):
elapsed = f()
print(f"elapsed[run #{i}]: ", elapsed)
timings.append(elapsed)
print(f"elapsed mean: {np.mean(timings):.5}s (std: {np.std(timings):.5}s)")


if __name__ == "__main__":

def parse_directory(x):
if x is None:
return x
else:
p = pathlib.Path(x)
if not p.is_dir():
raise argparse.ArgumentTypeError("Must be a directory")
return p

parser = argparse.ArgumentParser(description="Matrix operation on two Zarr files")
parser.add_argument(
"-m",
default=100,
type=int,
help="Dimension of the two squired input matrix (MxM) (default: %(default)s).",
madsbk marked this conversation as resolved.
Show resolved Hide resolved
)
parser.add_argument(
"-c",
default=None,
type=int,
help="Dimension of the squired chunk (CxC) (default: M/10).",
madsbk marked this conversation as resolved.
Show resolved Hide resolved
)
parser.add_argument(
"-d",
"--dir",
metavar="PATH",
default=None,
type=parse_directory,
help="Path to the directory to r/w from (default: tempfile.TemporaryDirectory)",
)
parser.add_argument(
"--nruns",
metavar="RUNS",
default=1,
type=int,
help="Number of runs (default: %(default)s).",
)
parser.add_argument(
"--api",
metavar="API",
default=tuple(API.keys())[0],
choices=tuple(API.keys()),
help="API to use {%(choices)s}",
)
parser.add_argument(
"--n-workers",
default=1,
type=int,
help="Number of workers (default: %(default)s).",
)
parser.add_argument(
"--op",
metavar="OP",
default=tuple(OP.keys())[0],
choices=tuple(OP.keys()),
help="Operation to run {%(choices)s}",
)
parser.add_argument(
"--n-warmup-runs",
default=1,
type=int,
help="Number of warmup runs (default: %(default)s).",
)

args = parser.parse_args()
args.op = OP[args.op] # Parse the operation argument
if args.c is None:
args.c = args.m // 10

# Create a temporary directory if user didn't specify a directory
temp_dir: tempfile.TemporaryDirectory | ContextManager
if args.dir is None:
temp_dir = tempfile.TemporaryDirectory()
args.dir = pathlib.Path(temp_dir.name)
else:
temp_dir = contextlib.nullcontext()

with temp_dir:
main(args)
1 change: 1 addition & 0 deletions legate/cpp/task_opcodes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ enum TaskOpCode {
OP_READ,
OP_TILE_WRITE,
OP_TILE_READ,
OP_TILE_READ_BY_OFFSETS,
OP_NUM_TASK_IDS, // must be last
};
100 changes: 99 additions & 1 deletion legate/cpp/tile_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <filesystem>
#include <iostream>
#include <stdexcept>
#include <utility>

#include "legate_mapping.hpp"
Expand Down Expand Up @@ -70,7 +71,7 @@ std::filesystem::path get_file_path(const std::string& dirpath,
*
* @tparam IsReadOperation Whether the operation is a read or a write operation
* @param context The Legate task context
* @param store The Legate store ti read or write
* @param store The Legate store to read or write
*/
template <bool IsReadOperation>
struct tile_read_write_fn {
Expand Down Expand Up @@ -105,6 +106,71 @@ struct tile_read_write_fn {
}
};

/**
* @brief Flatten the domain point to a 1D point
*
* @param lo_dp Lower point
* @param hi_dp High point
* @param point_dp The domain point to flatten
* @return The flatten domain point
*/
template <int32_t DIM>
size_t linearize(const legate::DomainPoint& lo_dp,
const legate::DomainPoint& hi_dp,
const legate::DomainPoint& point_dp)
{
legate::Point<DIM> lo = lo_dp;
legate::Point<DIM> hi = hi_dp;
legate::Point<DIM> point = point_dp;
legate::Point<DIM> extents = hi - lo + legate::Point<DIM>::ONES();
madsbk marked this conversation as resolved.
Show resolved Hide resolved
size_t idx = 0;
for (int32_t dim = 0; dim < DIM; ++dim) {
idx = idx * extents[dim] + point[dim] - lo[dim];
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe translate to zero-origin up front? (Does this do the right thing if point_dp is not in the box described by lo and hi?)

legate::Point<DIM> point = point_dp - lo_dp;
...
for (...) {
   idx = idx * extents[dim] + points[dim];

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
point_dp must be within lo and hi. Since they come from context.get_task_index() and context.get_launch_domain() directly , I don't we need a check here.

return idx;
}

/**
* @brief Functor for tiling read Legate store by offsets from disk using KvikIO
*
* @param context The Legate task context
* @param store The Legate output store
*/
struct tile_read_by_offsets_fn {
template <legate::Type::Code CODE, int32_t DIM>
void operator()(legate::TaskContext& context, legate::Store& store)
{
using DTYPE = legate::legate_type_of<CODE>;
const auto task_index = context.get_task_index();
const auto launch_domain = context.get_launch_domain();
const std::string path = context.scalars().at(0).value<std::string>();
legate::Span<const uint64_t> offsets = context.scalars().at(1).values<uint64_t>();
legate::Span<const uint64_t> sizes = context.scalars().at(2).values<uint64_t>();
legate::Span<const uint64_t> tile_shape = context.scalars().at(3).values<uint64_t>();

// Flatten task index
uint32_t flatten_task_index = 0;
if (!context.is_single_task()) {
flatten_task_index = linearize<DIM>(launch_domain.lo(), launch_domain.hi(), task_index);
}
wence- marked this conversation as resolved.
Show resolved Hide resolved

auto shape = store.shape<DIM>();
auto shape_volume = shape.volume();
if (shape_volume == 0) { return; }
size_t nbytes = shape_volume * sizeof(DTYPE);
std::array<size_t, DIM> strides{};
if (nbytes != sizes[flatten_task_index]) {
throw std::runtime_error("sizes doesn't match the size of the output store");
}

// We know that the accessor is contiguous because we set `policy.exact = true`
// in `Mapper::store_mappings()`.
kvikio::FileHandle f(path, "r");
auto* data = store.write_accessor<DTYPE, DIM>().ptr(shape, strides.data());
madsbk marked this conversation as resolved.
Show resolved Hide resolved
f.pread(data, nbytes, offsets[flatten_task_index]).get();
wence- marked this conversation as resolved.
Show resolved Hide resolved
}
};

} // namespace

namespace legate_kvikio {
Expand Down Expand Up @@ -167,6 +233,37 @@ class TileReadTask : public Task<TileReadTask, TaskOpCode::OP_TILE_READ> {
}
};

/**
* @brief Read a tiled Legate store by offset to disk using KvikIO
* Task signature:
* - scalars:
* - path: std::string
* - offsets: tuple of int64_t
* - sizes: tuple of int64_t
* - tile_shape: tuple of int64_t
* - outputs:
* - buffer: store (any dtype)
*
* NB: the store must be contigues. To make Legate in force this,
* set `policy.exact = true` in `Mapper::store_mappings()`.
*
*/
class TileReadByOffsetsTask
: public Task<TileReadByOffsetsTask, TaskOpCode::OP_TILE_READ_BY_OFFSETS> {
public:
static void cpu_variant(legate::TaskContext& context)
{
legate::Store& store = context.outputs().at(0);
legate::double_dispatch(store.dim(), store.code(), tile_read_by_offsets_fn{}, context, store);
}

static void gpu_variant(legate::TaskContext& context)
{
// Since KvikIO supports both GPU and CPU memory seamlessly, we reuse the CPU variant.
cpu_variant(context);
}
};

} // namespace legate_kvikio

namespace {
Expand All @@ -175,6 +272,7 @@ void __attribute__((constructor)) register_tasks()
{
legate_kvikio::TileWriteTask::register_variants();
legate_kvikio::TileReadTask::register_variants();
legate_kvikio::TileReadByOffsetsTask::register_variants();
}

} // namespace
26 changes: 26 additions & 0 deletions legate/examples/hdf5_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.


import h5py
import numpy as np

import legate_kvikio.kerchunk


def hdf5_io(filename):
a = np.arange(10000).reshape((100, 100))

# Write array using h5py
with h5py.File(filename, "w") as f:
f.create_dataset("mydataset", chunks=(10, 10), data=a)

# Read hdf5 file using legate+kerchunk
b = legate_kvikio.kerchunk.hdf5_read(filename, dataset_name="mydataset")

# They should be equal
assert (a == b).all()


if __name__ == "__main__":
hdf5_io("/tmp/legate-kvikio-io.hdf5")
27 changes: 27 additions & 0 deletions legate/examples/zarr_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import cunumeric as num

import legate.core
import legate_kvikio.zarr


def zarr_io(dirname):
a = num.arange(10000).reshape(100, 100)

# Write array to a Zarr file by chunks of 10x10.
legate_kvikio.zarr.write_array(a, dirname, chunks=(10, 10))

# Block until done writing.
legate.core.get_legate_runtime().issue_execution_fence(block=True)

# Read array from a Zarr file.
b = legate_kvikio.zarr.read_array(dirname)

# They should be equal
assert (a == b).all()


if __name__ == "__main__":
zarr_io("/tmp/legate-kvikio-zarr-io")
Loading