Skip to content

Commit

Permalink
Collect KvikIO futures and wait on them
Browse files Browse the repository at this point in the history
Instead of waiting on each `FutureIO` from KvikIO before creating the
next one, submit all IO requests and then get each one at the end.
  • Loading branch information
jakirkham committed Jun 28, 2023
1 parent 8f59794 commit 8fffb99
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions dask_cuda/disk_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,11 @@ def disk_write(path: str, frames: Iterable, shared_filesystem: bool, gds=False)
import kvikio

with kvikio.CuFile(path, "w") as f:
futs = []
for b in frames:
f.pwrite(buf=b).get()
futs.append(f.pwrite(buf=b))
for each_fut in futs:
each_fut.get()
else:
with open(path, "wb") as f:
os.writev(f.fileno(), frames) # type: ignore
Expand Down Expand Up @@ -210,10 +213,13 @@ def disk_read(header: Mapping, gds=False) -> list:
for length, is_cuda in zip(header["frame-lengths"], header["cuda-frames"])
)
with kvikio.CuFile(header["path"], "rb") as f:
futs = []
file_offset = 0
for b in ret:
f.pread(buf=b, file_offset=file_offset).get()
futs.append(f.pread(buf=b, file_offset=file_offset))
file_offset += b.nbytes
for each_fut in futs:
each_fut.get()
else:
ret.extend(
np.empty((length,), dtype="u1") for length in header["frame-lengths"]
Expand Down

0 comments on commit 8fffb99

Please sign in to comment.