Skip to content

Commit

Permalink
Use KvikIO in Dask-CUDA
Browse files Browse the repository at this point in the history
  • Loading branch information
jakirkham committed Jun 27, 2023
1 parent 83c6476 commit 000b896
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
2 changes: 1 addition & 1 deletion ci/release/update-version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function sed_runner() {
# Bump cudf and dask-cudf testing dependencies
sed_runner "s/cudf=.*/cudf=${NEXT_SHORT_TAG}/g" dependencies.yaml
sed_runner "s/dask-cudf=.*/dask-cudf=${NEXT_SHORT_TAG}/g" dependencies.yaml
sed_runner "s/cucim=.*/cucim=${NEXT_SHORT_TAG}/g" dependencies.yaml
sed_runner "s/kvikio=.*/kvikio=${NEXT_SHORT_TAG}/g" dependencies.yaml
sed_runner "s/ucx-py=.*/ucx-py=${NEXT_UCXPY_VERSION}/g" dependencies.yaml

# CI files
Expand Down
23 changes: 13 additions & 10 deletions dask_cuda/disk_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,13 @@ def __init__(

if self.gds_enabled:
try:
import cucim.clara.filesystem as cucim_fs # noqa F401
import kvikio # noqa F401
except ImportError:
raise ImportError("GPUDirect Storage requires the cucim Python package")
raise ImportError(
"GPUDirect Storage requires the kvikio Python package"
)
else:
self.gds_enabled = bool(cucim_fs.is_gds_available())
self.gds_enabled = kvikio.DriverProperties().is_gds_available

def gen_file_path(self) -> str:
"""Generate an unique file path"""
Expand Down Expand Up @@ -164,12 +166,11 @@ def disk_write(path: str, frames: Iterable, shared_filesystem: bool, gds=False)
cuda_frames = tuple(hasattr(f, "__cuda_array_interface__") for f in frames)
frame_lengths = tuple(map(nbytes, frames))
if gds and any(cuda_frames):
import cucim.clara.filesystem as cucim_fs
import kvikio

with cucim_fs.open(path, "w") as f:
with kvikio.CuFile(path, "w") as f:
for frame, length in zip(frames, frame_lengths):
f.pwrite(buf=frame, count=length, file_offset=0, buf_offset=0)

f.pwrite(buf=frame, count=length, file_offset=0, buf_offset=0).get()
else:
with open(path, "wb") as f:
for frame in frames:
Expand Down Expand Up @@ -201,16 +202,18 @@ def disk_read(header: Mapping, gds=False) -> list:
"""
ret = []
if gds:
import cucim.clara.filesystem as cucim_fs # isort:skip
import kvikio # isort:skip

with cucim_fs.open(header["path"], "rb") as f:
with kvikio.CuFile(header["path"], "rb") as f:
file_offset = 0
for length, is_cuda in zip(header["frame-lengths"], header["cuda-frames"]):
if is_cuda:
buf = get_new_cuda_buffer()(length)
else:
buf = np.empty((length,), dtype="u1")
f.pread(buf=buf, count=length, file_offset=file_offset, buf_offset=0)
f.pread(
buf=buf, count=length, file_offset=file_offset, buf_offset=0
).get()
file_offset += length
ret.append(buf)
else:
Expand Down
2 changes: 1 addition & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ dependencies:
common:
- output_types: [conda]
packages:
- cucim=23.08
- cudf=23.08
- dask-cudf=23.08
- kvikio=23.08
- pytest
- pytest-cov
- ucx-proc=*=gpu
Expand Down

0 comments on commit 000b896

Please sign in to comment.