Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobparser for crd TrainingJob #14

Merged
merged 9 commits into from
Apr 12, 2018
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*~
vendor/
.glide/
.vscode/
32 changes: 32 additions & 0 deletions pkg/apis/paddlepaddle/v1/training_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package v1

import (
"encoding/json"
"fmt"
)

// Elastic returns true if the job can scale to more workers.
func (s *TrainingJob) Elastic() bool {
return s.Spec.Trainer.MinInstance < s.Spec.Trainer.MaxInstance
}

// GPU convert Resource Limit Quantity to int
func (s *TrainingJob) GPU() int {
q := s.Spec.Trainer.Resources.Limits.NvidiaGPU()
gpu, ok := q.AsInt64()
if !ok {
// FIXME: treat errors
gpu = 0
}
return int(gpu)
}

// NeedGPU returns true if the job need GPU resource to run.
func (s *TrainingJob) NeedGPU() bool {
return s.GPU() > 0
}

func (s *TrainingJob) String() string {
b, _ := json.MarshalIndent(s, "", " ")
return fmt.Sprintf("%s", b)
}
13 changes: 7 additions & 6 deletions pkg/apis/paddlepaddle/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type TrainingJobSpec struct {
Passes int `json:"passes,omitempty"`
Volumes []corev1.Volume `json:"volumes"`
VolumeMounts []corev1.VolumeMount `json:"VolumeMounts"`
//TODO(m3ngyang) simplify the structure of sub-resource(mengyang)
//TrainingJob components.
Master MasterSpec `json:"master"`
Pserver PserverSpec `json:"pserver"`
Expand Down Expand Up @@ -111,12 +112,12 @@ type TrainerJobScaleStatus struct {
type TrainingResourceType string

const (
// MASTER is the master name of TrainingResourceType.
MASTER TrainingResourceType = "MASTER"
// PSERVER is the pserver name of TrainingResourceType.
PSERVER TrainingResourceType = "PSERVER"
// TRAINER is the trainer name of TrainingResourceType.
TRAINER TrainingResourceType = "TRAINER"
// Master is the master name of TrainingResourceType.
Master TrainingResourceType = "MASTER"
// Pserver is the pserver name of TrainingResourceType.
Pserver TrainingResourceType = "PSERVER"
// Trainer is the trainer name of TrainingResourceType.
Trainer TrainingResourceType = "TRAINER"
)

// ResourceState is the state of a type of resource
Expand Down
331 changes: 331 additions & 0 deletions pkg/updater/jobparser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
/* Copyright (c) 2016 PaddlePaddle Authors All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

package updater

import (
"errors"
"fmt"
"strconv"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1beta1 "k8s.io/api/extensions/v1beta1"
apiresource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

paddlev1 "github.com/paddlepaddle/edl/pkg/apis/paddlepaddle/v1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import "github.com/paddlepaddle/edl/v1"

)

const (
imagePullPolicy = "Always"
)

// DefaultJobParser implement a basic JobParser.
type DefaultJobParser struct {
}

// setDefaultAndValidate updates default values for the added job and validates the fields.
func setDefaultAndValidate(job *paddlev1.TrainingJob) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paddlev1 => edl

And for all the rest namings.

// Fill in default values
// FIXME: Need to test. What is the value if specified "omitempty"
if job.Spec.Port == 0 {
job.Spec.Port = 7164
}
if job.Spec.PortsNum == 0 {
job.Spec.PortsNum = 1
}
if job.Spec.PortsNumForSparse == 0 {
job.Spec.PortsNumForSparse = 1
}
if job.Spec.Image == "" {
job.Spec.Image = "paddlepaddle/paddlecloud-job"
}
if job.Spec.Passes == 0 {
job.Spec.Passes = 1
}

if !job.Spec.FaultTolerant && job.Elastic() {
return errors.New("max-instances should equal to min-instances when fault_tolerant is disabled")
}
// TODO: add validations.(helin)
return nil
}

// NewTrainingJob generates a whole structure of TrainingJob
func (p *DefaultJobParser) NewTrainingJob(job *paddlev1.TrainingJob) (*paddlev1.TrainingJob, error) {
if err := setDefaultAndValidate(job); err != nil {
return nil, err
}

useHostNetwork := job.Spec.HostNetwork
if job.Spec.FaultTolerant {
job.Spec.Master.ReplicaSpec = parseToMaster(job)
if useHostNetwork {
job.Spec.Master.ReplicaSpec.Spec.Template.Spec.HostNetwork = true
}
}
job.Spec.Pserver.ReplicaSpec = parseToPserver(job)
job.Spec.Trainer.ReplicaSpec = parseToTrainer(job)
if useHostNetwork {
job.Spec.Pserver.ReplicaSpec.Spec.Template.Spec.HostNetwork = true
job.Spec.Trainer.ReplicaSpec.Spec.Template.Spec.HostNetwork = true
}
return job, nil
}

// parseToPserver generate a pserver replicaset resource according to "TrainingJob" resource specs.
func parseToPserver(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet {
replicas := int32(job.Spec.Pserver.MinInstance)
command := make([]string, 2, 2)
// FIXME: refine these part.(typhoonzero)
if job.Spec.FaultTolerant {
command = []string{"paddle_k8s", "start_pserver"}
} else {
command = []string{"paddle_k8s", "start_new_pserver"}
}

return &v1beta1.ReplicaSet{
TypeMeta: metav1.TypeMeta{
Kind: "extensions/v1beta1",
APIVersion: "ReplicaSet",
},
ObjectMeta: metav1.ObjectMeta{
Name: job.ObjectMeta.Name + "-pserver",
Namespace: job.ObjectMeta.Namespace,
},
Spec: v1beta1.ReplicaSetSpec{
Replicas: &replicas,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"paddle-job-pserver": job.ObjectMeta.Name},
},
Spec: corev1.PodSpec{
Volumes: job.Spec.Volumes,
Containers: []corev1.Container{
corev1.Container{
Name: "pserver",
Image: job.Spec.Image,
Ports: podPorts(job),
Env: podEnv(job),
Command: command,
Resources: job.Spec.Pserver.Resources,
},
},
},
},
},
}
}

// parseToTrainer parse TrainingJob to a kubernetes job resource.
func parseToTrainer(job *paddlev1.TrainingJob) *batchv1.Job {
replicas := int32(job.Spec.Trainer.MinInstance)
command := make([]string, 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make([]string, 2) creates a slice which is never used. var command []string is suffice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

if job.Spec.FaultTolerant {
command = []string{"paddle_k8s", "start_trainer"}
} else {
command = []string{"paddle_k8s", "start_new_trainer"}
}

return &batchv1.Job{
TypeMeta: metav1.TypeMeta{
Kind: "Job",
APIVersion: "batch/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: job.ObjectMeta.Name + "-trainer",
Namespace: job.ObjectMeta.Namespace,
},
Spec: batchv1.JobSpec{
Parallelism: &replicas,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"paddle-job": job.ObjectMeta.Name},
},
Spec: corev1.PodSpec{
Volumes: job.Spec.Volumes,
Containers: []corev1.Container{
corev1.Container{
Name: "trainer",
Image: job.Spec.Image,
ImagePullPolicy: imagePullPolicy,
Command: command,
VolumeMounts: job.Spec.VolumeMounts,
Ports: podPorts(job),
Env: podEnv(job),
Resources: job.Spec.Trainer.Resources,
},
},
RestartPolicy: "Never",
},
},
},
}
}

func masterResource(job *paddlev1.TrainingJob) *corev1.ResourceRequirements {
// TODO(gongwb): config master resource?
return &corev1.ResourceRequirements{
Limits: corev1.ResourceList{
"cpu": *apiresource.NewQuantity(int64(2), apiresource.DecimalSI),
"memory": apiresource.MustParse("1Gi"),
},
Requests: corev1.ResourceList{
"cpu": *apiresource.NewQuantity(int64(1), apiresource.DecimalSI),
"memory": apiresource.MustParse("500Mi"),
},
}
}

func getEtcdPodSpec(job *paddlev1.TrainingJob) *corev1.Container {
command := []string{"etcd", "-name", "etcd0",
"-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001",
"-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001",
"-initial-advertise-peer-urls", "http://$(POD_IP):2380",
"-listen-peer-urls", "http://0.0.0.0:2380",
"-initial-cluster", "etcd0=http://$(POD_IP):2380",
"-initial-cluster-state", "new"}

return &corev1.Container{
Name: "etcd",
Image: "quay.io/coreos/etcd:v3.2.1",
ImagePullPolicy: imagePullPolicy,
// TODO(gongwb): etcd ports?
Env: podEnv(job),
Command: command,
}
}

// parseToMaster parse TrainingJob to a kubernetes replicaset resource.
func parseToMaster(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet {
replicas := int32(1)
// FIXME: refine these part.(typhoonzero)
command := []string{"paddle_k8s", "start_master"}

return &v1beta1.ReplicaSet{
TypeMeta: metav1.TypeMeta{
Kind: "extensions/v1beta1",
APIVersion: "ReplicaSet",
},
ObjectMeta: metav1.ObjectMeta{
Name: job.ObjectMeta.Name + "-master",
Namespace: job.ObjectMeta.Namespace,
},
Spec: v1beta1.ReplicaSetSpec{
Replicas: &replicas,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"paddle-job-master": job.ObjectMeta.Name},
},
Spec: corev1.PodSpec{
Volumes: job.Spec.Volumes,
Containers: []corev1.Container{
corev1.Container{
Name: "master",
Image: job.Spec.Image,
ImagePullPolicy: imagePullPolicy,
Ports: masterPorts(job),
Command: command,
VolumeMounts: job.Spec.VolumeMounts,
Resources: *masterResource(job),
},
*getEtcdPodSpec(job),
},
},
},
},
}
}

// general functions that pserver, trainer use the same
func podPorts(job *paddlev1.TrainingJob) []corev1.ContainerPort {
portsTotal := job.Spec.PortsNum + job.Spec.PortsNumForSparse
ports := make([]corev1.ContainerPort, 0)
basePort := int32(job.Spec.Port)
for i := 0; i < portsTotal; i++ {
ports = append(ports, corev1.ContainerPort{
Name: fmt.Sprintf("jobport-%d", basePort),
ContainerPort: basePort,
})
basePort++
}
return ports
}

func masterPorts(job *paddlev1.TrainingJob) []corev1.ContainerPort {
ports := []corev1.ContainerPort{
corev1.ContainerPort{
Name: "master-port",
ContainerPort: 8080,
},
corev1.ContainerPort{
Name: "etcd-port",
ContainerPort: 2379,
},
}
return ports
}

func podEnv(job *paddlev1.TrainingJob) []corev1.EnvVar {
needGPU := "0"
if job.NeedGPU() {
needGPU = "1"
}
trainerCount := 1
if job.NeedGPU() {
q := job.Spec.Trainer.Resources.Requests.NvidiaGPU()
trainerCount = int(q.Value())
} else {
q := job.Spec.Trainer.Resources.Requests.Cpu()
// FIXME: CPU resource value can be less than 1.
trainerCount = int(q.Value())
}

return []corev1.EnvVar{
corev1.EnvVar{Name: "PADDLE_JOB_NAME", Value: job.ObjectMeta.Name},
// NOTICE: TRAINERS, PSERVERS, PADDLE_INIT_NUM_GRADIENT_SERVERS
// these env are used for non-faulttolerant training,
// use min-instance all the time. When job is elastic,
// these envs are not used.
corev1.EnvVar{Name: "TRAINERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)},
corev1.EnvVar{Name: "PSERVERS", Value: strconv.Itoa(job.Spec.Pserver.MinInstance)},
corev1.EnvVar{Name: "ENTRY", Value: job.Spec.Trainer.Entrypoint},
// FIXME: TOPOLOGY deprecated
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

corev1.EnvVar{Name: "TOPOLOGY", Value: job.Spec.Trainer.Entrypoint},
corev1.EnvVar{Name: "TRAINER_PACKAGE", Value: job.Spec.Trainer.Workspace},
corev1.EnvVar{Name: "PADDLE_INIT_PORT", Value: strconv.Itoa(job.Spec.Port)},
// PADDLE_INIT_TRAINER_COUNT should be same to gpu number when use gpu
// and cpu cores when using cpu
corev1.EnvVar{Name: "PADDLE_INIT_TRAINER_COUNT", Value: strconv.Itoa(trainerCount)},
corev1.EnvVar{Name: "PADDLE_INIT_PORTS_NUM", Value: strconv.Itoa(job.Spec.PortsNum)},
corev1.EnvVar{Name: "PADDLE_INIT_PORTS_NUM_FOR_SPARSE", Value: strconv.Itoa(job.Spec.PortsNumForSparse)},
corev1.EnvVar{Name: "PADDLE_INIT_NUM_GRADIENT_SERVERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)},
corev1.EnvVar{Name: "PADDLE_INIT_NUM_PASSES", Value: strconv.Itoa(job.Spec.Passes)},
corev1.EnvVar{Name: "PADDLE_INIT_USE_GPU", Value: needGPU},
corev1.EnvVar{Name: "LD_LIBRARY_PATH", Value: "/usr/local/cuda/lib64"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LD_LIBRARY_PATH 的值 需要根据 volumeMounts中 nvidia-libraries 的值进行更新

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个如何更新呢?

corev1.EnvVar{Name: "NAMESPACE", ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
}},
corev1.EnvVar{Name: "POD_IP", ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
}},
}
}

// general functions end
Loading