Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate messages in UCX #3732

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
070a19f
Define `as_cuda_array`
jakirkham Apr 21, 2020
bee6f0b
Send/recv host and device frames in a message each
jakirkham Apr 21, 2020
a9b3161
Compute host and device total frames size
jakirkham Apr 23, 2020
5ed7332
Fast path cases with 0 or 1 frames
jakirkham Apr 23, 2020
0473527
Add concat helper functions
jakirkham Apr 23, 2020
610e864
Add split helper functions
jakirkham Apr 23, 2020
87c85cf
Coerce other types to NumPy/CuPy arrays
jakirkham Apr 28, 2020
0dc0bb0
Only return `DeviceBuffer`s/`memoryview`s
jakirkham Apr 28, 2020
3d325f8
Make sure global split functions are overridden
jakirkham Apr 28, 2020
107a2db
Drop leftover line
jakirkham Apr 28, 2020
dbd57cf
Finish accumulation
jakirkham Apr 28, 2020
6fba794
Fix variable name
jakirkham Apr 28, 2020
c04bb39
Fix other variable names
jakirkham Apr 28, 2020
820fbc4
Ensure `uint8` is used in concat/split
jakirkham Apr 28, 2020
18d4331
Use `nbytes` with buffer objects
jakirkham Apr 28, 2020
19dfbf6
Move sync before send/recv of device buffers
jakirkham Apr 28, 2020
1a4a324
Use RMM allocator with CuPy
jakirkham Apr 28, 2020
5bf32e0
Fix arg to `device_concat` in fallback
jakirkham Apr 28, 2020
fb6ba72
Add `device_split` fallback
jakirkham Apr 28, 2020
0dcbd5c
Assign `cupy.split` before copying each array
jakirkham Apr 28, 2020
5c3ad3a
Skip last size when splitting
jakirkham Apr 28, 2020
5663983
Cast Numba arrays to `uint8`
jakirkham Apr 28, 2020
791fb26
Skip test that segfaults now
jakirkham Apr 28, 2020
6eac4d4
Use `as_cuda_array` to take view
jakirkham Apr 28, 2020
99a73a0
Check `len` to `concat` frames
jakirkham Apr 30, 2020
c4c6801
Allocate frames only when needed
jakirkham Apr 30, 2020
62f7f12
Restore test that was previously segfaulting
jakirkham Apr 30, 2020
877dab4
Run black
jakirkham Apr 30, 2020
81f718b
Compute total frame sizes during allocation
jakirkham Apr 30, 2020
ee528d5
Update comments
jakirkham Apr 30, 2020
af94abb
Collect individual frame metadata then collect it
jakirkham Apr 30, 2020
94aee85
Group `nframes` with other data collection
jakirkham Apr 30, 2020
a9900a0
Move comment closer to `synchronize_stream`
jakirkham Apr 30, 2020
f37951a
Rewrite `device_split` to use `cupy.copyto`
jakirkham Apr 30, 2020
a236401
Drop synchronize call
jakirkham Apr 30, 2020
b26b58d
Allocate `device_array`s directly in `split`
jakirkham Apr 30, 2020
a05293b
Use `device_array` to allocate memory in concat
jakirkham Apr 30, 2020
0d046d7
Drop unneeded slice
jakirkham Apr 30, 2020
e4a6d1e
Run black
jakirkham Apr 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,30 +202,40 @@ async def write(
)
sizes = tuple(nbytes(f) for f in frames)

host_frames_size = 0
device_frames_size = 0
for is_cuda, each_size in zip(cuda_frames, sizes):
if is_cuda:
device_frames_size += each_size
host_frames = host_array(0)
device_frames = device_array(0)
if nframes == 1:
jakirkham marked this conversation as resolved.
Show resolved Hide resolved
if cuda_frames[0]:
device_frames = frames[0]
else:
host_frames_size += each_size

host_frames = host_array(host_frames_size)
device_frames = device_array(device_frames_size)

# Pack frames
host_frames_view = memoryview(host_frames)
device_frames_view = as_device_array(device_frames)
for each_frame, is_cuda, each_size in zip(frames, cuda_frames, sizes):
if each_size:
host_frames = frames[0]
elif nframes > 1:
host_frames_size = 0
device_frames_size = 0
for is_cuda, each_size in zip(cuda_frames, sizes):
if is_cuda:
each_frame_view = as_device_array(each_frame)
device_frames_view[:each_size] = each_frame_view[:]
device_frames_view = device_frames_view[each_size:]
device_frames_size += each_size
else:
each_frame_view = memoryview(each_frame).cast("B")
host_frames_view[:each_size] = each_frame_view[:]
host_frames_view = host_frames_view[each_size:]
host_frames_size += each_size

host_frames = host_array(host_frames_size)
device_frames = device_array(device_frames_size)

# Pack frames
host_frames_view = memoryview(host_frames)
device_frames_view = as_device_array(device_frames)
for each_frame, is_cuda, each_size in zip(
frames, cuda_frames, sizes
):
if each_size:
if is_cuda:
each_frame_view = as_device_array(each_frame)
device_frames_view[:each_size] = each_frame_view[:]
device_frames_view = device_frames_view[each_size:]
else:
each_frame_view = memoryview(each_frame).cast("B")
host_frames_view[:each_size] = each_frame_view[:]
host_frames_view = host_frames_view[each_size:]

# Send meta data

Expand Down