Skip to content

Commit

Permalink
Parallelization for standalone mode with NCCL (#487)
Browse files Browse the repository at this point in the history
  • Loading branch information
pan-x-c authored Mar 13, 2023
1 parent fe1806b commit c776b99
Show file tree
Hide file tree
Showing 15 changed files with 641 additions and 25 deletions.
6 changes: 3 additions & 3 deletions federatedscope/autotune/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,11 @@ def eval_in_fs(cfg, config=None, budget=0, client_cfgs=None, trial_index=0):
data, modified_config = get_data(config=trial_cfg.clone())
trial_cfg.merge_from_other_cfg(modified_config)
trial_cfg.freeze()
fed_runner = get_runner(data=data,
server_class=get_server_cls(trial_cfg),
fed_runner = get_runner(server_class=get_server_cls(trial_cfg),
client_class=get_client_cls(trial_cfg),
config=trial_cfg.clone(),
client_configs=client_cfgs)
client_configs=client_cfgs,
data=data)
results = fed_runner.run()

return results
Expand Down
36 changes: 26 additions & 10 deletions federatedscope/core/auxiliaries/runner_builder.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from federatedscope.core.fed_runner import StandaloneRunner, DistributedRunner
from federatedscope.core.parallel.parallel_runner import \
StandaloneMultiGPURunner


def get_runner(data, server_class, client_class, config, client_configs=None):
def get_runner(server_class,
client_class,
config,
client_configs=None,
data=None):
"""
Instantiate a runner based on a configuration file
Args:
data: ``core.data.StandaloneDataDict`` in standalone mode, \
``core.data.ClientData`` in distribute mode
server_class: server class
client_class: client class
config: configurations for FL, see ``federatedscope.core.configs``
Expand All @@ -18,21 +22,33 @@ def get_runner(data, server_class, client_class, config, client_configs=None):
Note:
The key-value pairs of built-in runner and source are shown below:
=============================== ==============================
Mode Source
=============================== ==============================
``standalone`` ``core.fed_runner.StandaloneRunner``
``distributed`` ``core.fed_runner.DistributedRunner``
=============================== ==============================
============================= ===============================
Mode Source
============================= ===============================
``standalone`` ``core.fed_runner.StandaloneRunner``
``distributed`` ``core.fed_runner.DistributedRunner``
``standalone(process_num>1)`` ``core.auxiliaries.parallel_runner.``
``StandaloneMultiGPURunner``
============================= ===============================
"""

mode = config.federate.mode.lower()
process_num = config.federate.process_num

if mode == 'standalone':
runner_cls = StandaloneRunner
if process_num <= 1:
runner_cls = StandaloneRunner
else:
runner_cls = StandaloneMultiGPURunner
elif mode == 'distributed':
runner_cls = DistributedRunner

# federated dataset might change the number of clients
# thus, we allow the creation procedure of dataset to modify the global
# cfg object
if runner_cls is StandaloneMultiGPURunner:
data = None

return runner_cls(data=data,
server_class=server_class,
client_class=client_class,
Expand Down
57 changes: 57 additions & 0 deletions federatedscope/core/communication.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import grpc
from concurrent import futures
import logging
import torch.distributed as dist

from collections import deque

from federatedscope.core.configs.config import global_cfg
from federatedscope.core.proto import gRPC_comm_manager_pb2, \
gRPC_comm_manager_pb2_grpc
from federatedscope.core.gRPC_server import gRPCComServeFunc
from federatedscope.core.message import Message

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class StandaloneCommManager(object):
"""
Expand Down Expand Up @@ -39,7 +46,57 @@ def get_neighbors(self, neighbor_id=None):
return self.neighbors

def send(self, message):
# All the workers share one comm_queue
self.comm_queue.append(message)


class StandaloneDDPCommManager(StandaloneCommManager):
"""
The communicator used for standalone mode with multigpu
"""
def __init__(self, comm_queue, monitor=None, id2comm=None):
super().__init__(comm_queue, monitor)
self.id2comm = id2comm
self.device = "cuda:{}".format(dist.get_rank())

def _send_model_para(self, model_para, dst_rank):
for v in model_para.values():
t = v.to(self.device)
dist.send(tensor=t, dst=dst_rank)

def send(self, message):
is_model_para = message.msg_type == 'model_para'
is_evaluate = message.msg_type == 'evaluate'
if self.id2comm is None:
# client to server
if is_model_para:
model_para = message.content[1]
message.content = (message.content[0], {})
self.comm_queue.append(message) if isinstance(
self.comm_queue, deque) else self.comm_queue.put(message)
self._send_model_para(model_para, 0)
else:
self.comm_queue.append(message) if isinstance(
self.comm_queue, deque) else self.comm_queue.put(message)
else:
receiver = message.receiver
if not isinstance(receiver, list):
receiver = [receiver]
if is_model_para or is_evaluate:
model_para = message.content
message.content = {}
for idx, each_comm in enumerate(self.comm_queue):
for each_receiver in receiver:
if each_receiver in self.neighbors and \
self.id2comm[each_receiver] == idx:
each_comm.put(message)
break
if is_model_para or is_evaluate:
for each_receiver in receiver:
if each_receiver in self.neighbors and \
self.id2comm[each_receiver] == idx:
self._send_model_para(model_para, idx + 1)
break
download_bytes, upload_bytes = message.count_bytes()
self.monitor.track_upload_bytes(upload_bytes)

Expand Down
1 change: 1 addition & 0 deletions federatedscope/core/configs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ The configurations related to FL settings are defined in `cfg_fl_setting.py`.
| `federate.join_in_info` | (list of string) [] | The information requirements (from server) for joining in the FL course. | We support 'num_sample/client_resource' and allow user customization.
| `federate.sampler` | (string) 'uniform' </br> Choices: {'uniform', 'group'} | The sample strategy of server used for client selection in a training round. | - |
| `federate.` </br>`resource_info_file` | (string) '' | the device information file to record computation and communication ability | - |
| `federate.process_num` | (int) 1 | The number of parallel processes. It only takes effect when `use_gpu=True`, `backend='torch'`, `federate.mode='standalone'` and `federate.share_local_model=False`, and the value is required to be not greater than the number of GPUs. | - |
#### `distribute`: for distribute mode
| Name | (Type) Default Value | Description | Note |
|:----:|:-----:|:---------- |:---- |
Expand Down
19 changes: 19 additions & 0 deletions federatedscope/core/configs/cfg_fl_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from federatedscope.core.configs.config import CN
from federatedscope.register import register_config
import torch

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,6 +45,11 @@ def extend_fl_setting_cfg(cfg):
cfg.federate.resource_info_file = "" # the device information file to
# record computation and communication ability

# The configurations for parallel in standalone
cfg.federate.process_num = 1
cfg.federate.master_addr = '127.0.0.1' # parameter of torch distributed
cfg.federate.master_port = 29500 # parameter of torch distributed

# atc (TODO: merge later)
cfg.federate.atc_vanilla = False
cfg.federate.atc_load_from = ''
Expand Down Expand Up @@ -198,6 +204,19 @@ def assert_fl_setting_cfg(cfg):
logger.warning('Set cfg.federate.make_global_eval=True since '
'cfg.federate.merge_test_data=True')

if cfg.federate.process_num > 1 and cfg.federate.mode != 'standalone':
cfg.federate.process_num = 1
logger.warning('Parallel training can only be used in standalone mode'
', thus cfg.federate.process_num is modified to 1')
if cfg.federate.process_num > 1 and not torch.cuda.is_available():
cfg.federate.process_num = 1
logger.warning(
'No GPU found for your device, set cfg.federate.process_num=1')
if torch.cuda.device_count() < cfg.federate.process_num:
cfg.federate.process_num = torch.cuda.device_count()
logger.warning(
'We found the number of gpu is insufficient, '
f'thus cfg.federate.process_num={cfg.federate.process_num}')
# TODO
if cfg.vertical.use:
if cfg.vertical.algo == 'lr' and hasattr(cfg, "trainer") and \
Expand Down
4 changes: 4 additions & 0 deletions federatedscope/core/configs/yacs_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,10 @@ def _merge_a_into_b(a, b, root, key_list):
else:
if root.key_is_deprecated(full_key):
continue
elif k in [
'__cfg_check_funcs__', '__help_info__', 'is_ready_for_run'
]:
continue
elif root.key_is_renamed(full_key):
root.raise_key_rename_error(full_key)
else:
Expand Down
6 changes: 6 additions & 0 deletions federatedscope/core/fed_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ def check(self):

class StandaloneRunner(BaseRunner):
def _set_up(self):
"""
To set up server and client for standalone mode.
"""
self.is_run_online = True if self.cfg.federate.online_aggr else False
self.shared_comm_queue = deque()

Expand Down Expand Up @@ -500,6 +503,9 @@ def _run_simulation(self):

class DistributedRunner(BaseRunner):
def _set_up(self):
"""
To set up server or client for distributed mode.
"""
# sample resource information
if self.resource_info is not None:
sampled_index = np.random.choice(list(self.resource_info.keys()))
Expand Down
1 change: 1 addition & 0 deletions federatedscope/core/monitors/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
torch = None

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

global_all_monitors = [
] # used in standalone mode, to merge sys metric results for all workers
Expand Down
85 changes: 85 additions & 0 deletions federatedscope/core/parallel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Parallelization for standalone mode

To facilitate developers to quickly verify their algorithms, we designed and implemented `StandaloneMultiGPURunner` with torch distributed data parallel (DDP). The new runner can better utilize the computing resources of multiple GPUs and accelerate training in standalone mode of FederatedScope.

## When to use
Use `StandaloneMultiGPURunner` when you have **multiple GPUs (>=2)** in your machine and need quick verification with **standalone mode**.


## Configuration

Add `federate.process_num` item in the configuration file to parallelize the training.

> Note: `federate.process_num` only takes effect when `use_gpu=True`, `backend='torch'`, `federate.mode='standalone'` and `federate.share_local_model=False`, and the value is required to be not greater than the number of GPUs.
```yaml
use_gpu: True
backend: 'torch'
device: 0
early_stop:
patience: 5
seed: 12345
federate:
mode: standalone
client_num: 100
total_round_num: 20
sample_client_rate: 0.2
share_local_model: False
process_num: 4 # run 4 processes simultaneously
...
```

## Use cases

Here we give an example to demonstrate the efficiency of `StandaloneMultiGPURunner` compared to `StandaloneRunner`. The configuration file and experiment result are listed below.
The experiment result shows that the totoal running time of `StandaloneMultiGPURunner` is only 1/3 of `StandaloneRunner` in the case of 8 GPUs.

```yaml
use_gpu: True
device: 0
early_stop:
patience: 5
seed: 12345
federate:
mode: standalone
client_num: 100
total_round_num: 10
sample_client_rate: 0.4
share_local_model: False
# use StandaloneMultiGPURunner with 8 GPUs
process_num: 8
# use StandaloneRunner
# process_num: 1

data:
root: data/
type: femnist
splits: [0.6,0.2,0.2]
batch_size: 10
subsample: 0.05
num_workers: 0
transform: [['ToTensor'], ['Normalize', {'mean': [0.1307], 'std': [0.3081]}]]
model:
type: convnet2
hidden: 2048
out_channels: 62
train:
local_update_steps: 1
batch_or_epoch: epoch
optimizer:
lr: 0.01
weight_decay: 0.0
grad:
grad_clip: 5.0
criterion:
type: CrossEntropyLoss
trainer:
type: cvtrainer
eval:
freq: 10
metrics: ['acc', 'correct']
```
| | StandaloneMultiGPURunner | StandaloneRunner |
| :---: | :---: | :---: |
| Total running time (minute) | 0.2406 | 0.7292 |
Empty file.
Loading

0 comments on commit c776b99

Please sign in to comment.