Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][compiled graphs] Add CPU-based NCCL communicator for development #48440

Merged
merged 49 commits into from
Dec 19, 2024

Conversation

anyadontfly
Copy link
Contributor

@anyadontfly anyadontfly commented Oct 30, 2024

Why are these changes needed?

This allows developers to debug DAGs with collective ops on CPU. Currently we use Ray actor to perform allreduce.

num_workers = 2
workers = [CPUTorchTensorWorker.remote() for _ in range(num_workers)]
cpu_group = CPUCommunicator(num_workers, workers)

with InputNode() as inp:
    computes = [worker.compute.bind(inp) for worker in workers]
    collectives = collective.allreduce.bind(computes, transport=cpu_group)
    recvs = [worker.recv.bind(collective)  for worker, collective in zip(workers, collectives)]
    dag = MultiOutputNode(recvs)

compiled_dag = dag.experimental_compile()
ray.get(compiled_dag.execute())

Related issue number

Closes #47936

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

tfsingh and others added 3 commits October 28, 2024 22:46
…er of allreduce ops; minor change: sesddefaultdict on line63 for simplicity"; added line 94 for unsupported collective ops
@stephanie-wang
Copy link
Contributor

Could you also add a test for this?

Copy link
Contributor

@AndyUB AndyUB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@jcotant1 jcotant1 added core Issues that should be addressed in Ray Core compiled-graphs labels Nov 18, 2024
@tfsingh
Copy link
Contributor

tfsingh commented Nov 22, 2024

Just a heads up that @anyadontfly and I are writing some e2e tests with the compiled dag API, and should have them done by tomorrow.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this looks pretty good! Left some comments for cleanup/clarification but overall the structure looks good.

self.collective_data: Dict[int, List[torch.Tensor]] = defaultdict(list)
# Buffer for the number of actors seen, each entry is one p2p op.
self.num_actors_seen = defaultdict(int)
# Number of actors who have read the result, and are about the exit the function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Number of actors who have read the result, and are about the exit the function.
# Number of actors who have read the result, and are about to exit the function.

self.communicators.add(comm)

received_tensor = ray.get(comm.wait_p2p.remote(self.num_ops[comm_key]))
assert (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case you can probably just directly return the received_tensor (allocator is needed for cases where a receive buffer needs to be allocated before the recv happens).

assert (
ray.get_gpu_ids()
), "Actors participating in NCCL group must have at least one GPU assigned"
if not custom_nccl_group or not isinstance(custom_nccl_group, CPUNcclGroup):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not custom_nccl_group or not isinstance(custom_nccl_group, CPUNcclGroup):
if not (custom_nccl_group and isinstance(custom_nccl_group, CPUNcclGroup)):

nit, a bit more readable with a single not.

return result


class CPUCommunicator:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this inherit from GPUCommunicator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One suggestion is to rename the GPUCommunicator into a generic Communicator or DeviceCommunicator, and you can have it return a string of the expected resource type that actors in the group should have.

@@ -572,17 +574,46 @@ def _do_init_nccl_group(
)


def _do_init_cpu_group(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you see if we can reuse the existing _do_init_nccl_group instead?

recv_buf = torch.empty_like(send_buf)
nccl_group.allreduce(send_buf, recv_buf, self._op)
ctx = ChannelContext.get_current()
if ctx.nccl_groups:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I think you can restructure this to just take the "default group", whether it's NCCL or CPU, and then the code inside the if-else branches is the same for both.

Comment on lines 129 to 131
self.nccl_groups: Dict[str, "GPUCommunicator"] = {}
# Used for the torch.Tensor CPU transport.
self.cpu_groups: Dict[str, "CPUCommunicator"] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// group ID -> Communicator
Option 1: self.device_groups: Dict[str, Communicator]

// resource label -> group ID -> Communicator
Option2: self.device_groups: Dict[str, str, Communicator]

Copy link
Contributor Author

@anyadontfly anyadontfly Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep the name self.nccl_groups?

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this looks great! A couple minor comments about naming then we can merge it. Let's have get_device_type return "gpu" instead of "nccl".

@abstractmethod
def get_device_type() -> str:
"""
Return the type of the communicator (nccl or cpu).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Return the type of the communicator (nccl or cpu).
Return the type of the communicator (gpu or cpu).

@@ -307,3 +304,6 @@ def destroy(self) -> None:
# flag is True when they exit from the abort.
self._comm.abort()
self._comm.destroy()

def get_device_type(self) -> str:
return "nccl"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "nccl"
return "gpu"

"nccl" is the transport name but "gpu" is the device that each actor is expected to have.

You could add a get_transport_name and that can return NCCL instead.

@@ -856,7 +856,7 @@ def __init__(
self._use_default_nccl_group = False
# This is set to the specified custom nccl group
# if there exists a type hint of `transport=nccl_group`.
self._custom_nccl_group_p2p: Optional[GPUCommunicator] = None
self._custom_nccl_group_p2p: Optional[Communicator] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, it would be good to replace-all nccl_group with communicator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Also we have functions for initializing and destroying CPUCommunicator in torch_tensor_nccl_channel.py. Do we have to change the file name of torch_tensor_nccl_channel.py or to put those functions in a separate file?

@stephanie-wang stephanie-wang added the go add ONLY when ready to merge, run all tests label Dec 17, 2024
@stephanie-wang stephanie-wang changed the title (WIP) [core][compiled graphs] Add CPU-based NCCL communicator for development [core][compiled graphs] Add CPU-based NCCL communicator for development Dec 19, 2024
@stephanie-wang stephanie-wang merged commit 6676cc1 into ray-project:master Dec 19, 2024
4 of 5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
compiled-graphs core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core][compiled graphs] Add CPU-based NCCL communicator for development
6 participants