Skip to content

Commit

Permalink
support distributed with poprun (PaddlePaddle#669)
Browse files Browse the repository at this point in the history
* support distributed with poprun

* fix typo

* fix mpi-global-args

* add debug info

* fix mpi-local-args

* fix launch failed

* update samples

* perfect ut

* perfect ut 2

* perfect ut 3

* rm useless code

* fix ut

* fix script
  • Loading branch information
yaozhixin authored Apr 28, 2022
1 parent 39be71f commit 4a5bd99
Show file tree
Hide file tree
Showing 7 changed files with 689 additions and 103 deletions.
117 changes: 117 additions & 0 deletions python/paddle/distributed/fleet/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,33 @@ def _parse_args():
)
base_group.add_argument("--selected_mlus", dest="mlus")

if fluid.core.is_compiled_with_ipu():
base_group.add_argument(
"--num_ipus",
type=int,
default=1,
help="It's for ipu training. The total number of IPUs requested. For example: "
"--num_ipus=128 will request 128 IPUs for training.")
base_group.add_argument(
"--ipus_per_replica",
type=int,
default=1,
help="It's for ipu training. The number of IPUs requested by each replica. For example: "
"--ipus_per_replica=8 will allocate 8 IPUs to each replica.")
base_group.add_argument(
"--partition_name",
type=str,
default="pod64",
help="It's for ipu training. Set the name of the re-created partition."
)
base_group.add_argument(
"--vipu_server",
type=str,
default="127.0.0.1",
help="It's for ipu training. Choose the vipu server host to enable vipu."
"Vipu server host is able to be checked by the command \`vipu-admin --server-version\` out of the docker container."
)

base_group.add_argument(
"training_script",
type=str,
Expand Down Expand Up @@ -432,6 +459,78 @@ def launch_ps(args, distribute_mode):
return


def launch_ipu(args):
# The number of replicas for data parallel
assert (args.num_ipus % args.ipus_per_replica) == 0, \
"The number of IPUs:{} mod the number of IPUs per replica:{} must == 0".format(args.num_ipus, args.ipus_per_replica)
num_replicas = int(args.num_ipus / args.ipus_per_replica)

# The number of processes
nproc_per_node = 1
if args.nproc_per_node is not None:
nproc_per_node = args.nproc_per_node
num_nodes = len(args.ips.split(','))
num_procs = num_nodes * nproc_per_node
assert (num_replicas % num_procs) == 0, \
"The number of replicas:{} mod the number of processes:{} must == 0".format(num_replicas, num_procs)

# ips and endpoints
ips = args.ips.replace(' ', '').split(',')
endpoints = [x + ":8090" for x in ips]

# args for poprun
poprun_args = ['poprun']

poprun_args.append('--num-instances={}'.format(num_procs))
poprun_args.append('--num-replicas={}'.format(num_replicas))
poprun_args.append('--ipus-per-replica={}'.format(args.ipus_per_replica))
poprun_args.append('--host={}'.format(','.join(ips)))
poprun_args.append('--vipu-partition={}'.format(args.partition_name))
poprun_args.append('--vipu-server-host={}'.format(args.vipu_server))

poprun_args.extend([
'--update-partition=yes', '--vipu-server-timeout=120',
'--print-topology=yes', '--numa-aware=yes'
])
poprun_args.append('--mpi-global-args=\'--tag-output\'')

# global envs
global_envs = '--mpi-local-args=\''
log_level = os.getenv('POPART_LOG_LEVEL', None)
if log_level:
global_envs += '-x POPART_LOG_LEVEL={} '.format(log_level)
global_envs += '-x PADDLE_TRAINERS_NUM={} -x PADDLE_TRAINER_ENDPOINTS={}'.format(
num_procs, ','.join(endpoints))
global_envs += '\''
poprun_args.append(global_envs)

# local envs
for idx in range(num_procs):
cur_endpoint = endpoints[idx // nproc_per_node]
rank_in_node = idx % nproc_per_node
poprun_args.append(
'--instance-mpi-local-args={}:\"-x PADDLE_TRAINER_ID={} -x PADDLE_CURRENT_ENDPOINT={} -x PADDLE_RANK_IN_NODE={}\"'.
format(idx, idx, cur_endpoint, rank_in_node))

# executor
poprun_args.append("python3.7")
# script and script args
poprun_args.append(args.training_script)
for arg in args.training_script_args:
poprun_args.append(arg)

# for debug
print("----------- PopRun Command -----------")
for i in range(len(poprun_args) - 1):
print("%s \\" % (poprun_args[i]))
print("%s" % (poprun_args[len(poprun_args) - 1]))
print("---------------------------------------")

# Launch
subprocess.run(" ".join(poprun_args), shell=True)
return


def infer_backend(args):
if args.backend != "auto": return
if fluid.core.is_compiled_with_cuda():
Expand All @@ -442,6 +541,8 @@ def infer_backend(args):
args.backend = 'bkcl'
elif fluid.core.is_compiled_with_mlu():
args.backend = 'cncl'
elif fluid.core.is_compiled_with_ipu():
args.backend = 'gcl'
else:
args.backend = 'gloo'

Expand Down Expand Up @@ -560,6 +661,15 @@ def launch():
- ``training_script_args``: The args of training_script. e.g., ``--lr=0.1``
IPU Parameters:
- ``--num_ipus``: It's for ipu training. The total number of IPUs requested. e.g., ``--num_ipus=128`` will request 128 IPUs for training.
- ``--ipus_per_replica``: It's for ipu training. The number of IPUs requested by each replica. e.g., ``--ipus_per_replica=8`` will allocate 8 IPUs to each replica.
- ``--partition_name``: It's for ipu training. Set the name of the re-created partition.
- ``--vipu_server``: It's for ipu training. Choose the vipu server host to enable vipu. Vipu server host is able to be checked by the command \`vipu-admin --server-version\` out of the docker container.
Collective Parameters:
- ``--ips``: Paddle cluster nodes ips, e.g., ``--ips=192.168.0.16,192.168.0.17``. Default ``--ips=127.0.0.1``.
Expand Down Expand Up @@ -710,6 +820,13 @@ def launch():

#assert args.backend in ['gloo', 'nccl', 'bkcl', 'cncl', 'heter', 'unknown']

# Only support Collective mode with IPUs
if args.backend == 'gcl':
if distribute_mode != DistributeMode.COLLECTIVE:
raise ValueError("Only support Collective mode with IPUs.")
launch_ipu(args)
return

if args.backend == 'gloo':
logger.warning("launch start with CPUONLY mode")

Expand Down
27 changes: 22 additions & 5 deletions python/paddle/distributed/fleet/launch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class DeviceMode():
ASCEND_NPU = 3
UNKNOWN = 3
MLU = 4
IPU = 5


class Cluster(object):
Expand Down Expand Up @@ -804,6 +805,10 @@ def get_device_mode(backend):
print("launch train in CPU mode")
return DeviceMode.CPU

if backend == 'gcl':
print("launch train in IPU mode")
return DeviceMode.IPU

raise RuntimeError("Don't supported devices")


Expand Down Expand Up @@ -1768,11 +1773,14 @@ def start_pod_heter_worker(self, args, pod):


def check_backend(backend):
if backend not in ['nccl', 'gloo', 'bkcl', 'cncl', 'auto', 'hccl', 'heter']:
raise ValueError("paddle.distributed initialize error, "
"backend argument can only be one of "
"'nccl', 'gloo', 'bkcl', 'auto', 'hccl', 'heter' "
"but got %s" % backend)
if backend not in [
'nccl', 'gloo', 'bkcl', 'cncl', 'auto', 'hccl', 'heter', 'gcl'
]:
raise ValueError(
"paddle.distributed initialize error, "
"backend argument can only be one of "
"'nccl', 'gloo', 'bkcl', 'auto', 'hccl', 'heter', 'gcl' "
"but got %s" % backend)

if backend == 'nccl' and not fluid.core.is_compiled_with_cuda():
raise ValueError(
Expand All @@ -1798,6 +1806,12 @@ def check_backend(backend):
"your paddle is not compiled with mlu but you assign 'cncl' as backend."
)

if backend == 'gcl' and not fluid.core._is_compiled_with_ipu():
raise ValueError(
"paddle.distributed initialize error, "
"your paddle is not compiled with ipu but you assign 'gcl' as backend."
)


def block_windows_and_macos(backend):
if backend != 'gloo': return
Expand All @@ -1824,4 +1838,7 @@ def get_backend_by_compile_flag():
if fluid.core.is_compiled_with_mlu():
return 'cncl'

if fluid.core.is_compiled_with_ipu():
return 'gcl'

return 'gloo'
80 changes: 80 additions & 0 deletions python/paddle/fluid/tests/unittests/ipu/disabled/run_dist_ipu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/bin/bash

# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -e

partition_name=pod64-lr17
vipu_server=lr17-1-ctrl
allclose_script="
import sys
import numpy as np
data1 = np.loadtxt(\"ipu_res.txt\")
data2 = np.loadtxt(\"cpu_res.txt\")
if np.allclose(data1[::16], data2, atol=1e-6):
sys.exit(0)
else:
sys.exit(1)
"

for opt in lamb sgd adam ;
do
for onchip in False True ;
do
for rts in False True ;
do
echo "Testcase: opt: ${opt}, onchip: ${onchip}, rts: ${rts}"
echo "paddle.distributed.fleet.launch test with IPUs..."
python3.7 -m paddle.distributed.fleet.launch \
--run_mode=collective \
--ips=localhost \
--nproc_per_node=2 \
--ipus_per_replica=2 \
--num_ipus=8 \
--partition_name=${partition_name} \
--vipu_server=${vipu_server} \
test_dist_data_parallel_ipu.py ${opt} ipu_res.txt ${onchip} ${rts} > ipu.log
echo "paddle.distributed.fleet.launch test with IPUs...Done"

echo "paddle normal test with CPU..."
export POPLAR_IPUMODEL=1
python3.7 test_dist_data_parallel_ipu.py ${opt} cpu_res.txt > cpu.log
unset POPLAR_IPUMODEL
echo "paddle normal test with CPU...Done"

echo "Compare results..."
python3.7 -c """${allclose_script}"""
if [ $? -eq 0 ];then
echo "Compare results...Done"
else
echo "Error occurs. Please check ipu.log, cpu.log, ipu_res.txt and cpu_res.txt"
exit 0
fi
done
done
done

if [ -f "ipu.log" ]; then
rm "ipu.log"
fi
if [ -f "cpu.log" ]; then
rm "cpu.log"
fi
if [ -f "ipu_res.txt" ]; then
rm "ipu_res.txt"
fi
if [ -f "cpu_res.txt" ]; then
rm "cpu_res.txt"
fi
Loading

0 comments on commit 4a5bd99

Please sign in to comment.