Skip to content

Commit

Permalink
[Done]Fault tolerant job (#212)
Browse files Browse the repository at this point in the history
* fault tolerant job

* ft job

* update

* tested job submit with ft job

* update
  • Loading branch information
typhoonzero authored Jul 20, 2017
1 parent f916c16 commit 009314e
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 13 deletions.
9 changes: 9 additions & 0 deletions docker/k8s_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def fetch_pserver_ips():
pserver_ips = [item[1] for item in pod_list]
return ",".join(pserver_ips)

def fetch_master_ip():
label_selector = "paddle-job-master=%s" % PADDLE_JOB_NAME
pod_list = fetch_pods_info(label_selector)
master_ip = ""
if len(pod_list) >=1:
master_ip = pod_list[0][1]
return master_ip

def fetch_trainer_id():
label_selector = "paddle-job=%s" % PADDLE_JOB_NAME
Expand All @@ -60,5 +67,7 @@ def fetch_trainer_id():
print fetch_pserver_ips()
elif command == "fetch_trainer_id":
print fetch_trainer_id()
elif command == "fetch_master_ip":
print fetch_master_ip()
elif command == "wait_pods_running":
wait_pods_running(sys.argv[2], sys.argv[3])
21 changes: 21 additions & 0 deletions docker/paddle_k8s
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ start_pserver() {
--num_gradient_servers=$PADDLE_INIT_NUM_GRADIENT_SERVERS
}

start_new_pserver() {
export MASTER_IP=$(python /root/k8s_tools.py fetch_master_ip)
/usr/bin/pserver \
-port=$PADDLE_INIT_PORT \
-num-pservers=$PSERVERS \
-log-level=debug \
-etcd-endpoint=http://$PADDLE_INIT_MASTER_IP:2379
}

start_master() {
/usr/bin/master \
-port=8080 \
-endpoints=http://127.0.0.1:2379
}

check_trainer_ret() {
ret=$1
echo "job returned $ret...setting pod return message..."
Expand Down Expand Up @@ -92,6 +107,12 @@ case "$1" in
start_trainer)
start_trainer $2
;;
start_new_pserver)
start_new_pserver
;;
start_master)
start_master
;;
--help)
usage
;;
Expand Down
9 changes: 7 additions & 2 deletions go/paddlecloud/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ type SubmitCmd struct {
Topology string `json:"topology"`
Datacenter string `json:"datacenter"`
Passes int `json:"passes"`
Image string `json:"image"`
Registry string `json:"registry"`
// docker image to run jobs
Image string `json:"image"`
Registry string `json:"registry"`
// Alpha features:
// TODO: separate API versions
FaultTolerant bool `json:"faulttolerant"`
}

// Name is subcommands name.
Expand Down Expand Up @@ -71,6 +75,7 @@ func (p *SubmitCmd) SetFlags(f *flag.FlagSet) {
f.IntVar(&p.Passes, "passes", 1, "Pass count for training job")
f.StringVar(&p.Image, "image", "", "Runtime Docker image for the job")
f.StringVar(&p.Registry, "registry", "", "Registry secret name for the runtime Docker image")
f.BoolVar(&p.FaultTolerant, "faulttolerant", false, "if true, use new fault-tolerant pservers")
}

// Execute submit command.
Expand Down
2 changes: 2 additions & 0 deletions paddlecloud/paddlecloud/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,5 @@

# Path store all cuda, nvidia driver libs
NVIDIA_LIB_PATH="/usr/local/nvidia/lib64"
# etcd image for fault-tolerant jobs
ETCD_IMAGE="quay.io/coreos/etcd:v3.2.1"
85 changes: 83 additions & 2 deletions paddlecloud/paddlejob/paddle_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
__all__ = ["PaddleJob"]
DEFAULT_PADDLE_PORT=7164
DEFAULT_MASTER_PORT=8080
DEFAULT_ETCD_PORT=2379

class PaddleJob(object):
"""
Expand All @@ -24,7 +26,9 @@ def __init__(self,
gpu=0,
volumes=[],
registry_secret=None,
envs = {}):
envs = {},
new_pserver=True,
etcd_image="quay.io/coreos/etcd:v3.2.1"):

self._ports_num=1
self._ports_num_for_sparse=1
Expand All @@ -46,6 +50,12 @@ def __init__(self,
self._registry_secret = registry_secret
self._passes = passes
self._usr_envs = envs
# master resources are static
self._mastercpu = 1
self._mastermemory = "300Mi"
# use new pserver for tolerant
self._new_pserver = new_pserver
self._etcd_image = etcd_image

@property
def pservers(self):
Expand All @@ -59,6 +69,9 @@ def parallelism(self):
def runtime_image(self):
return self._image

def _get_master_name(self):
return "%s-master" % self._name

def _get_pserver_name(self):
return "%s-pserver" % self._name

Expand Down Expand Up @@ -101,11 +114,27 @@ def _get_pserver_container_ports(self):
port += 1
return ports

def _get_master_container_ports(self):
ports = []
port = DEFAULT_MASTER_PORT
ports.append({"containerPort": DEFAULT_MASTER_PORT, "name":"master-port"})
ports.append({"containerPort": DEFAULT_ETCD_PORT, "name":"etcd-port"})
return ports

def _get_master_labels(self):
return {"paddle-job-master": self._name}

def _get_pserver_labels(self):
return {"paddle-job-pserver": self._name}

def _get_master_entrypoint(self):
return ["paddle_k8s", "start_master"]

def _get_pserver_entrypoint(self):
return ["paddle_k8s", "start_pserver"]
if not self._new_pserver:
return ["paddle_k8s", "start_pserver"]
else:
return ["paddle_k8s", "start_new_pserver"]

def _get_trainer_entrypoint(self):
if self._entry:
Expand All @@ -128,6 +157,58 @@ def _get_trainer_volume_mounts(self):
volume_mounts.append(item["volume_mount"])
return volume_mounts

def new_master_job(self):
"""
return: Master ReplicaSet
"""
rs = {
"apiVersion": "extensions/v1beta1",
"kind": "ReplicaSet",
"metadata":{
"name": self._get_master_name(),
},
"spec":{
"replicas": 1, # NOTE: always 1 replica of master
"template": {
"metadata": {
"labels": self._get_master_labels()
},
"spec": {
# mount trainer volumes to dispatch datasets
"volumes": self._get_trainer_volumes(),
"containers":[{
"name": self._name,
"image": self._image,
"ports": self._get_master_container_ports(),
"env": self.get_env(),
"volumeMounts": self._get_trainer_volume_mounts(),
"command": self._get_master_entrypoint(),
"resources": {
"requests": {
"memory": str(self._mastermemory),
"cpu": str(self._mastercpu)
},
"limits": {
"memory": str(self._mastermemory),
"cpu": str(self._mastercpu)
}
}
}, {
"name": self._name + "-etcd",
"image": self._etcd_image,
"command": ["etcd", "-name", "etcd0", "-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", "-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", "-initial-advertise-peer-urls", "http://$(POD_IP):2380", "-listen-peer-urls", "http://0.0.0.0:2380", "-initial-cluster", "etcd0=http://$(POD_IP):2380", "-initial-cluster-state", "new"],
"env": [{
"name": "POD_IP",
"valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}}
}]

}]
}
}
}
}
return rs

def new_trainer_job(self):
"""
return: Trainer job, it's a Kubernetes Job
Expand Down
55 changes: 46 additions & 9 deletions paddlecloud/paddlejob/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def post(self, request, format=None):
obj = json.loads(request.body)
topology = obj.get("topology", "")
entry = obj.get("entry", "")
fault_tolerant = obj.get("faulttolerant", False)
api_client = notebook.utils.get_user_api_client(username)
if not topology and not entry:
return utils.simple_response(500, "no topology or entry specified")
Expand Down Expand Up @@ -127,7 +128,7 @@ def post(self, request, format=None):
))
envs = {}
envs.update({"PADDLE_CLOUD_CURRENT_DATACENTER": dc})

# ===================== create PaddleJob instance ======================
paddle_job = PaddleJob(
name = job_name,
job_package = package_in_pod,
Expand All @@ -144,26 +145,39 @@ def post(self, request, format=None):
passes = obj.get("passes", 1),
registry_secret = registry_secret,
volumes = volumes,
envs = envs
envs = envs,
new_pserver = fault_tolerant,
etcd_image = settings.ETCD_IMAGE
)
# ========== submit master ReplicaSet if using fault_tolerant feature ==
# FIXME: alpha features in separate module
if fault_tolerant:
try:
ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set(
namespace,
paddle_job.new_master_job())
except ApiException, e:
logging.error("error submitting master job: %s", e)
return utils.simple_response(500, str(e))
# ========================= submit pserver job =========================
try:
ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set(
namespace,
paddle_job.new_pserver_job(),
pretty=True)
paddle_job.new_pserver_job())
except ApiException, e:
logging.error("error submitting pserver job: %s " % e)
logging.error("error submitting pserver job: %s ", e)
return utils.simple_response(500, str(e))

#submit trainer job, it's Kubernetes Job
# ========================= submit trainer job =========================
try:
ret = client.BatchV1Api(api_client=api_client).create_namespaced_job(
namespace,
paddle_job.new_trainer_job(),
pretty=True)
paddle_job.new_trainer_job())
except ApiException, e:
logging.error("error submitting trainer job: %s" % e)
return utils.simple_response(500, str(e))

# TODO(typhoonzero): stop master and pservers when job finish or fails

return utils.simple_response(200, "")

def delete(self, request, format=None):
Expand Down Expand Up @@ -222,6 +236,29 @@ def delete(self, request, format=None):
except ApiException, e:
logging.error("error deleting pserver pods: %s" % str(e))
delete_status.append(str(e))

# delete master rs
master_name = jobname + "-master"
try:
u_status = client.ExtensionsV1beta1Api(api_client=api_client)\
.delete_namespaced_replica_set(master_name, namespace, {})
except ApiException, e:
logging.error("error deleting master: %s" % str(e))
delete_status.append(str(e))

# delete master pods
try:
# master replica set has label with jobname
job_pod_list = client.CoreV1Api(api_client=api_client)\
.list_namespaced_pod(namespace,
label_selector="paddle-job-master=%s"%jobname)
for i in job_pod_list.items:
u_status = client.CoreV1Api(api_client=api_client)\
.delete_namespaced_pod(i.metadata.name, namespace, {})
except ApiException, e:
logging.error("error deleting master pods: %s" % str(e))
delete_status.append(str(e))

if len(delete_status) > 0:
retcode = 500
else:
Expand Down

0 comments on commit 009314e

Please sign in to comment.