Skip to content

Commit

Permalink
Improve pre and postcopy dependencies
Browse files Browse the repository at this point in the history
Not added with a sledgehammer anymore, hooks into the same dependency
tracking all other operations use.
  • Loading branch information
olsaarik committed Sep 29, 2021
1 parent 33f8357 commit dd05b77
Showing 1 changed file with 25 additions and 22 deletions.
47 changes: 25 additions & 22 deletions sccl/ncclize.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ class _Threadblock:
# The steps may expand into multiple operations here
ops: list = field(default_factory=list)

@dataclass
class _Copy:
input_offset: int
output_offset: int

@dataclass
class _Op:
gpu: int
Expand Down Expand Up @@ -367,6 +362,14 @@ def combine_copies(copies):
combine_copies(gpu.precopies)
combine_copies(gpu.postcopies)

# Expand copies by instances if necessary
if instances > 1:
for rank, gpu in gpus.items():
for copy in itertools.chain(gpu.precopies, gpu.postcopies):
copy.src_offset *= instances
copy.dst_offset *= instances
copy.cnt *= instances

def get_buffer_and_offset(gpu, addr):
# Map an address to one of the named buffers
if addr in gpu.inputs:
Expand Down Expand Up @@ -420,6 +423,14 @@ def make_interval(a,b):
writers = defaultdict(list)
# Track all the reads since the last write to each buffer index
readers = defaultdict(list)

# Initialize readers and writers for precopies
for rank, gpu in gpus.items():
for op in gpu.precopies:
for i in range(op.cnt):
readers[(rank,op.src_buffer,op.src_offset+i)].append(op)
writers[(rank,op.dst_buffer,op.dst_offset+i)].append(op)

for step_idx, step in enumerate(algorithm.steps):
new_writers = defaultdict(list)
new_readers = defaultdict(list)
Expand Down Expand Up @@ -511,16 +522,18 @@ def make_interval(a,b):
for key, deps in new_readers.items():
readers[key].extend(deps)

# Add dependencies for postcopies
for rank, gpu in gpus.items():
for op in gpu.postcopies:
for i in range(op.cnt):
op.depends.extend(writers[(rank,op.src_buffer,op.src_offset+i)])
op.depends.extend(readers[(rank,op.dst_buffer,op.dst_offset+i)])
op.depends.extend(writers[(rank,op.dst_buffer,op.dst_offset+i)])

# Fixup everything to match the instanced sends when multiple instances are generated
if instances > 1:
for rank, gpu in gpus.items():
# Expand copies
for copy in itertools.chain(gpu.precopies, gpu.postcopies):
copy.src_offset *= instances
copy.dst_offset *= instances
copy.cnt *= instances

# Multiply the other metadata with instances
# Multiply metadata with instances
def expand_mappings(mappings):
return { addr * instances + i: idx * instances + i for addr, idx in mappings.items() for i in range(instances) }
gpu.inputs = expand_mappings(gpu.inputs)
Expand Down Expand Up @@ -574,16 +587,6 @@ def expand_mappings(mappings):
cpy_tb = _Threadblock(0)
cpy_tb.rbid = len(gpu.threadblocks)
cpy_tb.steps = gpu.precopies + gpu.postcopies
if len(gpu.precopies) > 0:
end_pre = gpu.precopies[-1]
for tb in gpu.threadblocks:
if len(tb.steps) > 0:
tb.steps[0].depends.append(end_pre)
if len(gpu.postcopies) > 0:
start_post = gpu.postcopies[0]
for tb in gpu.threadblocks:
if len(tb.steps) > 0:
start_post.depends.append(tb.steps[-1])
gpu.threadblocks.append(cpy_tb)

# Filter out dependencies within the same threadblock and mark all ops that have a dependence on them
Expand Down

0 comments on commit dd05b77

Please sign in to comment.