Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose MultiDeviceExecutor and overlapped AG+matmul to python API #3923

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

samnordmann
Copy link
Collaborator

@samnordmann samnordmann commented Feb 19, 2025

  • Expose using MultiDeviceExecutor instead of FusionExecutorCache to the python API. The feature is disabled by default -- to activate it, call FusionDefinition.use_multidevice_executor() in the python fusion instance.
  • Expose ParallelType::Stream
  • Write a python test exercising the pipelined AG+linear algo which achieves comm/compute overlap. To run:
mpirun -x NVFUSER_DUMP=host_ir -np 8 python -m pytest tests/python/test_multidevice_overlap.py -vs --only-mpi -k test_overlap_allgather_matmul_stream_outermost

@samnordmann
Copy link
Collaborator Author

!test

Copy link

github-actions bot commented Feb 19, 2025

Review updated until commit d27031f

Description

  • Expose MultiDeviceExecutor to Python API, disabled by default.

  • Add ParallelType::Stream to Python bindings.

  • Implement Python test for pipelined AG+linear algo with stream parallelism.

  • Update FusionDefinition to support MultiDeviceExecutor.


Changes walkthrough 📝

Relevant files
Enhancement
fusion_definition.cpp
Add MultiDeviceExecutor support in FusionDefinition           

csrc/python_frontend/fusion_definition.cpp

  • Add logic to use MultiDeviceExecutor if enabled.
  • Add error handling for user-defined schedules with
    MultiDeviceExecutor.
  • Update fusion retrieval logic based on MultiDeviceExecutor usage.
  • +20/-5   
    python_bindings.cpp
    Add Stream ParallelType and MultiDeviceExecutor toggle     

    csrc/python_frontend/python_bindings.cpp

  • Add ParallelType::Stream to Python bindings.
  • Add method to enable MultiDeviceExecutor in FusionDefinition.
  • +6/-2     
    executor.h
    Add container method to HostIrEvaluator                                   

    csrc/host_ir/executor.h

    • Add container method to HostIrEvaluator.
    +4/-0     
    executor.h
    Update MultiDeviceExecutor constructor and add hirEvaluator method

    csrc/multidevice/executor.h

  • Update MultiDeviceExecutor constructor to use default communicator.
  • Add hirEvaluator method to MultiDeviceExecutor.
  • +5/-1     
    fusion_cache.h
    Add MultiDeviceExecutor to FusionSchedules                             

    csrc/python_frontend/fusion_cache.h

  • Include multidevice/executor.h.
  • Add multi_device_executor to FusionSchedules.
  • +3/-0     
    fusion_definition.h
    Add use_multidevice_executor flag to FusionDefinition       

    csrc/python_frontend/fusion_definition.h

  • Include multidevice/executor.h.
  • Add use_multidevice_executor flag to FusionDefinition.
  • +6/-0     
    Bug fix
    multidevice_fixtures.py
    Fix local rank usage in multidevice_fixtures                         

    tests/python/multidevice_fixtures.py

    • Fix local rank usage in shard_tensor method.
    +1/-1     
    Tests
    test_multidevice_overlap.py
    Add test for MultiDeviceExecutor with Stream ParallelType

    tests/python/test_multidevice_overlap.py

  • Add new test case for pipelined AG+linear algo with stream
    parallelism.
  • +91/-0   

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    🧪 PR contains tests
    ⚡ Recommended focus areas for review

    Error Handling

    Ensure that the error handling for use_multidevice_executor is robust and covers all edge cases, especially when user-defined schedules are involved.

        !use_multidevice_executor,
        "multidevice_executor is not supported "
        "for user-defined schedules.");
    if (isProfilerEnabledWithCupti()) {
    Performance Metrics

    Include performance metrics in the test to validate the performance gains achieved by using MultiDeviceExecutor and overlapped AG+matmul.

    # SPDX-FileCopyrightText: Copyright (c) 2024-present NVIDIA CORPORATION & AFFILIATES.
    # All rights reserved.
    # SPDX-License-Identifier: BSD-3-Clause
    
    import pytest
    import torch
    
    import multidevice_fixtures
    import nvfuser
    from nvfuser import DataType, FusionDefinition
    
    multidevice_test = multidevice_fixtures.multidevice_test
    
    
    class OverlapAGMatmulStreamOutermost(FusionDefinition):
        def __init__(self, m, k, n, s, num_devices):
            super().__init__()
            self.m = m
            self.k = k
            self.n = n
            self.s = s
            self._num_devices = num_devices
            self.use_multidevice_executor()
    
        def definition(self) -> None:
            m, k, n, s, d = (
                self.m,
                self.k,
                self.n,
                self.s,
                self._num_devices,
            )
            self.x = self.define_tensor(
                shape=[s, d, m // (s * d), k], contiguity=True, dtype=DataType.BFloat16
            )
            self.weight = self.define_tensor(
                shape=[n, k], contiguity=True, dtype=DataType.BFloat16
            )
            self.bias = self.define_tensor(
                shape=[n], contiguity=True, dtype=DataType.BFloat16
            )
    
            self.out = self.ops.linear(
                self.x, self.weight, self.bias
            )  # [s, d, m//(s*d), n]
    
            self.add_output(self.out)
    
        def multidevice_schedule(self):
            mesh = nvfuser.DeviceMesh(range(self._num_devices))
            for tv in [
                self.x,
                self.weight,
                self.bias,
                self.out,
            ]:
                self.sched._set_device_mesh(tv, mesh)
    
            self.sched.parallelize(self.x, 1, nvfuser.ParallelType.mesh_x)
            self.sched.parallelize(self.out, 0, nvfuser.ParallelType.stream)
    
    
    @pytest.mark.mpi
    def test_overlap_allgather_matmul_stream_outermost(multidevice_test, benchmark):
        N_WARMUPS, N_ITERATIONS = 5, 15
        m, k, n, s, d = 1024, 1024, 1024, 8, multidevice_test.size
    
        torch.cuda.set_device(multidevice_test.local_rank)
        x_unsharded = torch.testing.make_tensor(
            s, d, m // (s * d), k, dtype=torch.bfloat16, device="cpu"
        )
        x = multidevice_test.shard_tensor(
            x_unsharded, 1, nvfuser.DeviceMesh(range(multidevice_test.size))
        )
        weight = torch.testing.make_tensor(n, k, dtype=torch.bfloat16, device="cuda")
        bias = torch.testing.make_tensor(n, dtype=torch.bfloat16, device="cuda")
        ins = [x, weight, bias]
        out_ref = torch.nn.functional.linear(x_unsharded, weight.cpu(), bias.cpu())
    
        fd = OverlapAGMatmulStreamOutermost(m, k, n, s, d)
    
        # warmup
        for _ in range(N_WARMUPS):
            outs = fd.execute(ins)
            out = outs[0].local.cpu()
            assert out.dtype == torch.bfloat16
            assert out.shape == torch.Size([s, d, m // (s * d), n])
            torch.testing.assert_close(out, out_ref, rtol=1e-1, atol=1e-1)
    
        # benchmark
        benchmark.pedantic(lambda: fd.execute(ins), rounds=N_ITERATIONS)
    Default Communicator

    Verify that the default communicator instance is correctly initialized and used across different scenarios.

    Communicator& comm = Communicator::getInstance(),

    @samnordmann samnordmann force-pushed the expose_MultiDeviceEXecutor_to_python branch from 416eaec to bbb7ac1 Compare February 19, 2025 11:24
    @samnordmann
    Copy link
    Collaborator Author

    !test

    @samnordmann samnordmann changed the title Expose MultiDeviceExecutor to python API Expose MultiDeviceExecutor and overlapped AG+matmul to python API Feb 19, 2025
    Copy link
    Collaborator

    @wujingyue wujingyue left a comment

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Thanks for changing the Python side of the thing! I have one question for @rdspring1. Depending on that, I can either merge this or do some more cleanups in a separate PR before merging this.

    @@ -367,6 +368,11 @@ class NVF_API FusionDefinition : public FusionState {

    private:
    mutable std::optional<std::string> debug_output_ = std::nullopt;

    public:
    //! (Experimental) toggle using MultiDeviceExecutor directly instead of the
    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Instead of this, I'd create an enable option to let the Python frontend use MultiDeviceExecutor. Enable options can be controlled by the user like this.

    Copy link
    Collaborator Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    I can't see how this fits in the _enable_option param which needs to correspond to options in csrc/options.h, whereas our toggle only deals with python API's internal.

    Can you clarify?

    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    I was confused that this was a per-execution flag. Now I agree inside FusionDefinition makes sense.

    Can we specify "use multidevice executor" via the constructor? This way, readers can know for sure the underlying executor won't be switched over. While it's technically possible to fd.execute via FEC, set_mde, and then fd.execute via MDE, I'd like to keep the support simple to save troubles.

    @kevinstephano
    Copy link
    Collaborator

    Given that multi-device executor is separate from our mainline of execution, could we use a separate execute function completely until the functionality is integrated into FusionExecutorCache? We are very sensitive to adding latency to our hot path from python down the stack.

    @wujingyue
    Copy link
    Collaborator

    Given that multi-device executor is separate from our mainline of execution, could we use a separate execute function completely until the functionality is integrated into FusionExecutorCache? We are very sensitive to adding latency to our hot path from python down the stack.

    I suspect we'll need to replicate a lot of code from FusionDefinition.execute, FusionDefinition._execute and FusionDefinition::execute to have a clean separation. But correct me if I'm wrong. I don't worry too much about a few flag checks, but if they turn out to be expensive that'll be a good lesson for me!

    @samnordmann
    Copy link
    Collaborator Author

    Given that multi-device executor is separate from our mainline of execution, could we use a separate execute function completely until the functionality is integrated into FusionExecutorCache? We are very sensitive to adding latency to our hot path from python down the stack.

    I suspect we'll need to replicate a lot of code from FusionDefinition.execute, FusionDefinition._execute and FusionDefinition::execute to have a clean separation. But correct me if I'm wrong. I don't worry too much about a few flag checks, but if they turn out to be expensive that'll be a good lesson for me!

    I agree with Jingyue. For now the patch only adds one boolean check in the data path so it shouldn't affect performance at all. Adding a whole new "execute" function to the API sounds much bigger of change for something that can fit in the current structure

    @samnordmann
    Copy link
    Collaborator Author

    !test

    @samnordmann
    Copy link
    Collaborator Author

    !test

    @samnordmann
    Copy link
    Collaborator Author

    !test

    @xwang233
    Copy link
    Collaborator

    CI script failed. If retry doesn't work, please contact maintainers with reference A#5839.

    There are merge conflicts in your PR so the build couldn't start. 😢

    …DIA/Fuser into expose_MultiDeviceEXecutor_to_python
    @samnordmann
    Copy link
    Collaborator Author

    !test

    wujingyue added a commit that referenced this pull request Feb 27, 2025
    As requested by myself at
    #3923 (comment)
    
    This PR tries to fix the Python frontend to construct
    `FusionExecutorCache`s with a ready-to-run fusion. Previously, the code
    constructs FusionExecutorCache with an empty fusion and populates it
    later. This is not how we normally use and test FusionExecutorCache and
    is indeed problematic in some situations. For example,
    `FusionExecutorCache::exact_map_` was always empty because the
    constructor thought it was going to run an empty fusion. See one of the
    tests fixed by this PR.
    
    cc @samnordmann
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    None yet
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    4 participants