Skip to content

Commit

Permalink
Use KvikIO in Dask-CUDA
Browse files Browse the repository at this point in the history
  • Loading branch information
jakirkham committed Jun 2, 2022
1 parent e2ff3d7 commit 2ef69b3
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions dask_cuda/disk_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ def __init__(

if self.gds_enabled:
try:
import cucim.clara.filesystem as cucim_fs # noqa F401
import kvikio # noqa F401
except ImportError:
raise ImportError("GPUDirect Storage requires the cucim Python package")
else:
self.gds_enabled = bool(cucim_fs.is_gds_available())
self.gds_enabled = kvikio.DriverProperties().is_gds_availabe

def gen_file_path(self) -> str:
"""Generate an unique file path"""
Expand Down Expand Up @@ -164,11 +164,16 @@ def disk_write(path: str, frames: Iterable, shared_filesystem: bool, gds=False)
cuda_frames = tuple(hasattr(f, "__cuda_array_interface__") for f in frames)
frame_lengths = tuple(map(nbytes, frames))
if gds and any(cuda_frames):
import cucim.clara.filesystem as cucim_fs
import kvikio

with cucim_fs.open(path, "w") as f:
futs = []
with kvikio.CuFile(path, "w") as f:
for frame, length in zip(frames, frame_lengths):
f.pwrite(buf=frame, count=length, file_offset=0, buf_offset=0)
futs.append(
f.pwrite(buf=frame, count=length, file_offset=0, buf_offset=0)
)
for each_fut in futs:
futs.get()

else:
with open(path, "wb") as f:
Expand Down Expand Up @@ -201,18 +206,25 @@ def disk_read(header: Mapping, gds=False) -> list:
"""
ret = []
if gds:
import cucim.clara.filesystem as cucim_fs # isort:skip
import kvikio # isort:skip

with cucim_fs.open(header["path"], "rb") as f:
with kvikio.CuFile(header["path"], "rb") as f:
file_offset = 0
futs = []
for length, is_cuda in zip(header["frame-lengths"], header["cuda-frames"]):
if is_cuda:
buf = get_new_cuda_buffer()(length)
else:
buf = np.empty((length,), dtype="u1")
f.pread(buf=buf, count=length, file_offset=file_offset, buf_offset=0)
futs.append(
f.pread(
buf=buf, count=length, file_offset=file_offset, buf_offset=0
)
)
file_offset += length
ret.append(buf)
for each_fut in futs:
futs.get()
else:
with open(str(header["path"]), "rb") as f:
for length in header["frame-lengths"]:
Expand Down

0 comments on commit 2ef69b3

Please sign in to comment.