Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interface for Process Creation (DDPSpawn vs. DDP) #10985

Closed
awaelchli opened this issue Dec 8, 2021 · 8 comments · Fixed by #11643
Closed

Interface for Process Creation (DDPSpawn vs. DDP) #10985

awaelchli opened this issue Dec 8, 2021 · 8 comments · Fixed by #11643
Assignees
Labels
design Includes a design discussion discussion In a discussion stage feature Is an improvement or enhancement strategy: ddp DistributedDataParallel
Milestone

Comments

@awaelchli
Copy link
Contributor

awaelchli commented Dec 8, 2021

🚀 Feature

Extrude an interface that allows us to abstract and disentangle the process creation / spawning from plugins.

Motivation

The simplifications that #10059 introduced brought the DDPSpawnPlugin and DDPPlugin closer together in their function, execution order and API. The fundamental difference between the two however remains in how the processes are created.

DDPSpawnPlugin
The spawning logic in DDPSpawn comprises mainly these three methods:
https://github.com/PyTorchLightning/pytorch-lightning/blob/aeb0b5595fd73d086f4ae0f99d3f1f112f6a4c29/pytorch_lightning/plugins/training_type/ddp_spawn.py#L152
https://github.com/PyTorchLightning/pytorch-lightning/blob/aeb0b5595fd73d086f4ae0f99d3f1f112f6a4c29/pytorch_lightning/plugins/training_type/ddp_spawn.py#L245
https://github.com/PyTorchLightning/pytorch-lightning/blob/aeb0b5595fd73d086f4ae0f99d3f1f112f6a4c29/pytorch_lightning/plugins/training_type/ddp_spawn.py#L271

DDPPlugin
As with the spawn plugin, the creation of subprocesses is quite strongly decoupled in a single method in the DDPPlugin:

https://github.com/PyTorchLightning/pytorch-lightning/blob/aeb0b5595fd73d086f4ae0f99d3f1f112f6a4c29/pytorch_lightning/plugins/training_type/ddp.py#L155

The Trainer today (after #10896) has to differentiate between the two and call them differently:

    if isinstance(self.training_type_plugin, DDPSpawnPlugin):
        spawn_output = self.training_type_plugin.spawn(trainer_fn, *args, **kwargs)
        self.training_type_plugin._recover_results_in_main_process(spawn_output, self)
        return spawn_output.trainer_results
    else:
        return trainer_fn(*args, **kwargs)

Here, the plugin type check leaks into the trainer. This and the fact that the spawning logic is quite isolated inside the respective plugins motivates a refactor that separates them. Two designs have been proposed so far.

Pitch

Proposal 1 (@ananthsub):

class AbstractSpawn(ABC):

   @abstractmethod
    def spawn(self, ...)

   @abstractmethod
    def collect_rank_zero_results(...):
        pass

   @abstractmethod
    def recover_results_in_main_process(...):
        pass


class DDPSpawnPlugin(ParallelPlugin, AbstractSpawn):
    def spawn(self, function, *args, **kwargs):
        ...

    def recover_results_in_main_process(...):
        pass

    def collect_rank_zero_results(...):
        pass

In this proposal, the Trainer call reduces to:

if isinstance(self.training_type_plugin, AbstractSpawn):
    ...
else:
    ...

Proposal 2 (@awaelchli):

    class Executor(ABC):
        def create_processes(...):
            ...
    
    class ScriptExecutor(Executor):
        # calls script in subprocesses like in current DDPPlugin

    class SpawnExecutor(Executor):
        # spawns processes from Trainer function like in DDPSpawnPlugin

        # draft implementation
        def create_processes(fn):
            # trainer reference up for debate
            output = self._spawn(self._wrap(fn))
            return self.recover_results_in_main_process(trainer, output)
        
        def _wrap(fn):
            fn()  # run it
            return self.collect_rank_zero_results()

The plugins would then own an instance of this executor. The DDPPlugin and DDPSpawnPlugin would collapse to a single class, for the sake of demonstration call it DDPNew, and it owns either a ScriptExecutor or a SpawnExecutor:

class DDPNew(ParallelPlugin):
    def __init__(..., executor: Executor)
        self.checkpoint_io = ...
        self.executor = executor

Alternatives

Additional context

At this point a very open discussion. The proposal may be updated depending on the feedback and discussions.

#10896 (comment)
Thanks @ananthsub for kicking off the discussion.


If you enjoy Lightning, check out our other projects! ⚡

  • Metrics: Machine learning metrics for distributed, scalable PyTorch applications.

  • Lite: enables pure PyTorch users to scale their existing code on any kind of device while retaining full control over their own loops and optimization logic.

  • Flash: The fastest way to get a Lightning baseline! A collection of tasks for fast prototyping, baselining, fine-tuning, and solving problems with deep learning.

  • Bolts: Pretrained SOTA Deep Learning models, callbacks, and more for research and production with PyTorch Lightning and PyTorch.

  • Lightning Transformers: Flexible interface for high-performance research using SOTA Transformers leveraging Pytorch Lightning, Transformers, and Hydra.

cc @Borda @tchaton @justusschock @awaelchli @kaushikb11 @akihironitta

@awaelchli awaelchli added feature Is an improvement or enhancement strategy: ddp DistributedDataParallel strategy: ddp spawn design Includes a design discussion discussion In a discussion stage labels Dec 8, 2021
@four4fish
Copy link
Contributor

+1 for pass spawn_executor proposal. I think have more unified behavior between spawning and non-spawning benefit the long run

@ananthsub
Copy link
Contributor

ananthsub commented Dec 8, 2021

My opinions, but looking for more feedback

Proposal 1

Pros:

  • The trainer still dictates the main control flow. Readers don't need to hop across trainer to strategy to see how the main loops are driven.
  • It's a single if/else without concerns of today's pre_dispach/dispatch/post_dispatch possibly re-emerging.

Cons:

  • This doubles the number of parallel plugin classes we have, since we need to offer 1 strategy with spawn and 1 without. This becomes tedious/overhead.

Proposal 2

Questions: Need to work out how strategies, cluster environments, and executors would work out. For instance, how does the executor take into account if processes are launched externally to the trainer (e.g. slurm, torchelastic)? In this case, we don't want the trainer to do any subprocess launch or spawn execution. This could be deduced by using the ClusterEnvironment. which is available to all parallel plugins:

https://github.com/PyTorchLightning/pytorch-lightning/blob/aeb0b5595fd73d086f4ae0f99d3f1f112f6a4c29/pytorch_lightning/plugins/environments/cluster_environment.py#L28-L31

https://github.com/PyTorchLightning/pytorch-lightning/blob/aeb0b5595fd73d086f4ae0f99d3f1f112f6a4c29/pytorch_lightning/plugins/training_type/parallel.py#L44

Pros:

  • No inheritance explosion

Cons:

  • Use cases that don't need external process creation at all see this in the strategy API. Both when the executor instance is added, and when we likely add an entry point on the main strategy API for the trainer to call.

We still might end up with an if/else check in the main trainer flow if executors are available only to parallel plugins. it wouldn't make sense to add them to single device plugins, would it? then there are only 2 options I see:

A. we supply a default implementation in the base training type plugin to call the trainer function with arguments, and call it ffrom the trainer like this. Plugins end up overriding this baed on the executor's availability (e.g. parallel plugins) but this is very wide open: custom strategy could do anything at this point without a very clear contract.

self.training_type_plugin.run(trainer_fn, args, kwargs)

B. Only parallel plugins which can create processes really need an executor (as seen as cluster environments not being on the base training type plugin, only the parallel ones). then the trainer code might end up like this:

if hasattr(self.training_type_plugin, "executor") and isinstance(self.training_type_plugin.executor, Executor):
    self.training_type_plugin.executor.create_processes(....)
else:
    # normal control flow

which is essentially back to proposal 1, from the trainer POV

  • The Trainer delegates the main control flow logic to training type plugin. Readers have to hop across trainer & strategy to get to how/when Trainer._run is actually run.

For discussions sake, mentioning other options, possibly far out:

  1. Add a new flag on the Trainer: process_launcher: Optional[str] = None

Such that the current way of specifying strategies goes from:
ddp_spawn -> strategy="ddp", process_launcher="spawn" or strategy="ddp", process_launcher="popen"

But breaking this up, especially something like strategy="ddp_cpu" into strategy="ddp", accelerator="cpu", process_launcher="spawn" can easily becomes tedious for users. And this still doesn't resolve the cluster environment overlap.

The intention of raising this is to discuss whether "spawn" is really an integral part of the strategy, especially if the executor is pulled out to its own component and can be mixed in easily with strategies. And if it's not, whether it's worth carrying this forward on the way that strategies are specified to the Trainer.

@justusschock
Copy link
Member

Personally I would vote for the executor approach and I believe most of @ananthsub concerns about it could be resolved by having a SingleProcessExecutor that basically implements the executor interface but doesn't do anything.

That means we don't need to special case anything in the trainer, it would be exposed in the strategy API, but it would be safe to call it since it doesn't do anything.

Regarding

custom strategy could do anything at this point without a very clear contract

I can see how this could be a problem, but on the other hand, this could also allow us to have stuff like horovod only as a separate Executor, Collective and Environment (2 of these we will have anyways) and other than that behave the same as the DDP strategy. And the "contract" basically is that the Executor would have to call the given function with the given arguments at some point. Whether this may include wrapping (e.g. for horovod ) or preparing some other requirement shouldn't matter I think.

@tchaton
Copy link
Contributor

tchaton commented Dec 8, 2021

I prefer the Executor approach + self.training_type_plugin.run(trainer_fn, args, kwargs). I believe this would provide enough flexibility and finally align features among distributed plugins.

@awaelchli
Copy link
Contributor Author

awaelchli commented Dec 9, 2021

I think @ananthsub makes a valid point about the complement of Cluster environments that creates processes externally vs. the executors (a name alternative could be "launcher"). @justusschock Who/what would determine which executor to use? Would the AcceleratorConnector be doing this check?

if self.cluster_environment.creates_processes_externally:
    strategy.executor = SingleProcessExecutor()
elif strategy=="ddp_spawn"
    strategy.executor = SpawnExecutor()
elif strategy=="ddp"
    strategy.executor = ScriptExecutor()

One thing to observe here is that the executor is really only useful once and is then no longer needed. Making it an attribute of the strategy might lead to the impression that you can call it again in subprocesses, which could be a source of bugs and confusion. This tells me that the executor should probably be produced by a function/method and used locally.

executor = ???.get_executor()
executor(trainer_fn, ...)

@justusschock
Copy link
Member

@awaelchli For now I think the logic would live in the AcceleratorConnector, yes. Although this would probably be a candidate for the next thing to refactor as this is becoming more and more cluttered and non-trivial. With the current structure however, I don't see another place where this would fit in better.

I like the idea of having that not as an instance, although you may have to request it once per entrypoint then :)

@awaelchli awaelchli added this to the 1.6 milestone Jan 25, 2022
@ananthsub
Copy link
Contributor

@awaelchli @rohitgr7 another option could be to move out the definition of creates_processes_externally and set_world_ranks from the ClusterEnvironment entirely. AFAICT, the only class which implements these is the LightningEnvironment, which could be superseded by the runner/launcher here. That would also reduce the boilerplate needed to write a new cluster environment.

@rohitgr7 rohitgr7 mentioned this issue Jan 27, 2022
12 tasks
@awaelchli
Copy link
Contributor Author

awaelchli commented Jan 27, 2022

@ananthsub definitely something we could explore.

Now I think it is time for deciding on the name!! The issue here names the executor, but that's just the name we came up with at that time. There are other suitable names we could go for:

  • executor 👍
  • launcher (my preference) ❤️
  • runner 😃
  • ?

Please leave your suggestions :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design Includes a design discussion discussion In a discussion stage feature Is an improvement or enhancement strategy: ddp DistributedDataParallel
Projects
No open projects
Status: Done
Development

Successfully merging a pull request may close this issue.

6 participants