From cd397cfc8ec7ff9e149f673c2752e1890c22b95f Mon Sep 17 00:00:00 2001 From: gongweibao Date: Mon, 13 Nov 2017 20:14:39 +0800 Subject: [PATCH 1/6] init --- go/controller/controller.go | 4 +-- go/controller/jobparser.go | 50 +++++++++++++++++++++++++++++++++++++ go/paddlectl/submit.go | 1 - 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/go/controller/controller.go b/go/controller/controller.go index c29a515d..f439e993 100644 --- a/go/controller/controller.go +++ b/go/controller/controller.go @@ -106,8 +106,8 @@ func (c *Controller) onAdd(obj interface{}) { c.autoscaler.OnAdd(job) // TODO: if we need to create training job instance by the resource, // you should add the following code: - // var parser DefaultJobParser - // c.clientset.ExtensionsV1beta1().ReplicaSets(namespace).Create(parser.ParseToPserver(job)) + var parser DefaultJobParser + c.clientset.ExtensionsV1beta1().ReplicaSets(namespace).Create(parser.ParseToPserver(job)) } func (c *Controller) onUpdate(oldObj, newObj interface{}) { diff --git a/go/controller/jobparser.go b/go/controller/jobparser.go index 2f97d1b1..1cae8cbd 100644 --- a/go/controller/jobparser.go +++ b/go/controller/jobparser.go @@ -109,6 +109,56 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R // ParseToTrainer parse TrainingJob to a kubernetes job resource. func (p *DefaultJobParser) ParseToTrainer(job *paddlejob.TrainingJob) *batchv1.Job { + if job.Spec.FaultTolerant { + command = []string{"paddle_k8s", "start_trainer"} + } else { + command = []string{"paddle_k8s", "start_new_trainer"} + } + + j := batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: job.ObjectMeta.Jobname + "-trainer", + Namespace: job.ObjectMeta.Namespace, + }, + Spec: batchv1.JobSpec{ + Parallelism: &t.MinInstance, + Template: v1.PodTemplateSpec{ + ObjectMeta: batchv1.ObjectMeta{ + Labels: map[string]string{"paddle-job-tainer": job.ObjectMeta.Name}, + }, + Spec: v1.PodSpec{ + Voumes: []v1.Volume{ + Name: job.ObjectMeta.Name + "trainer-workspace", + VolumeSource: v1.VolumeSource{ + HostPath: v1.HostPathVolumeSource{ + //Path: job.Spec.Trainer.Workspace, + Path: "/workspace", + }, + }, + }, + Containers: []v1.Container{ + v1.Container{ + Name: job.ObjectMeta.Name, + Image: job.Spec.Image, + ImagePullPolicy: "Always", + Command: command, + volumeMounts: v1.VolumeMount{ + Name: job.ObjectMeta.Name + "trainer-workspace", + MountPath: "/workspace", + }, + Ports: podPorts(job), + Env: podEnv(job), + Resources: job.Spec.Trainer.Resources, + }, + }, + }, + }, + }, + } // TODO: create job. return &batchv1.Job{} } diff --git a/go/paddlectl/submit.go b/go/paddlectl/submit.go index 13635851..3c4da000 100644 --- a/go/paddlectl/submit.go +++ b/go/paddlectl/submit.go @@ -115,7 +115,6 @@ func (p *SubmitCmd) getMaster() *paddlejob.MasterSpec { // GetTrainingJob get's paddlejob.TrainingJob struct filed by Submitcmd paramters. func (p *SubmitCmd) GetTrainingJob() *paddlejob.TrainingJob { - t := paddlejob.TrainingJob{ metav1.TypeMeta{ Kind: "TrainingJob", From 3553d32d4e3c29604464b2a6f46db68daded5191 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Tue, 14 Nov 2017 17:44:01 +0800 Subject: [PATCH 2/6] compile ok --- go/controller/autoscaler.go | 1 + go/controller/controller.go | 30 ++++++- go/controller/jobparser.go | 159 ++++++++++++++++++++++++++++-------- go/paddlectl/submit.go | 30 +++++-- 4 files changed, 178 insertions(+), 42 deletions(-) diff --git a/go/controller/autoscaler.go b/go/controller/autoscaler.go index 23f4d2f1..f311e83c 100644 --- a/go/controller/autoscaler.go +++ b/go/controller/autoscaler.go @@ -420,6 +420,7 @@ func (a *Autoscaler) Monitor() { // scale it. var tj *batchv1.Job var err error + log.Debug("trainerjob:%s\n", e.Job) tj, err = a.cluster.GetTrainerJob(e.Job) if err != nil { log.Error( diff --git a/go/controller/controller.go b/go/controller/controller.go index f439e993..f7594d80 100644 --- a/go/controller/controller.go +++ b/go/controller/controller.go @@ -26,6 +26,7 @@ package controller import ( "context" + "encoding/json" log "github.com/inconshreveable/log15" @@ -104,10 +105,33 @@ func (c *Controller) onAdd(obj interface{}) { job := obj.(*paddlejob.TrainingJob) log.Debug("TrainingJob resource added", "name", job.ObjectMeta.Name) c.autoscaler.OnAdd(job) - // TODO: if we need to create training job instance by the resource, - // you should add the following code: + + // TODO(gongwb):open it when all are ready. + // All-are-ready means: + // create trainjob from paddlectl + // scheduler can schedule trainjobs var parser DefaultJobParser - c.clientset.ExtensionsV1beta1().ReplicaSets(namespace).Create(parser.ParseToPserver(job)) + p := parser.ParseToPserver(job) + t := parser.ParseToTrainer(job) + m := parser.ParseToMaster(job) + + b, _ := json.MarshalIndent(p, "", " ") + log.Debug("create pserver:" + string(b[:])) + + b, _ = json.MarshalIndent(t, "", " ") + log.Debug("create trainer-job:" + string(b[:])) + + b, _ = json.MarshalIndent(m, "", " ") + log.Debug("create master:" + string(b[:])) + + // TODO(gongwb): create them + // just like: + // namespace := job.ObjectMeta.Namespace + // _, err := c.clientset.ExtensionsV1beta1().ReplicaSets(namespace).Create(p) + // if err != nil { + // b, _ := json.MarshalIndent(p, "", " ") + // log.Debug("create pserver:%s\terror: %v", string(b[:]), err) + // } } func (c *Controller) onUpdate(oldObj, newObj interface{}) { diff --git a/go/controller/jobparser.go b/go/controller/jobparser.go index 1cae8cbd..24d62008 100644 --- a/go/controller/jobparser.go +++ b/go/controller/jobparser.go @@ -20,6 +20,7 @@ import ( "strconv" paddlejob "github.com/PaddlePaddle/cloud/go/api" + apiresource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" batchv1 "k8s.io/client-go/pkg/apis/batch/v1" @@ -93,7 +94,7 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R Volumes: podVolumes(job), Containers: []v1.Container{ v1.Container{ - Name: job.ObjectMeta.Name, + Name: "pserver", Image: job.Spec.Image, Ports: podPorts(job), Env: podEnv(job), @@ -109,64 +110,124 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R // ParseToTrainer parse TrainingJob to a kubernetes job resource. func (p *DefaultJobParser) ParseToTrainer(job *paddlejob.TrainingJob) *batchv1.Job { + replicas := int32(job.Spec.Trainer.MinInstance) + command := make([]string, 2, 2) if job.Spec.FaultTolerant { command = []string{"paddle_k8s", "start_trainer"} } else { command = []string{"paddle_k8s", "start_new_trainer"} } - j := batchv1.Job{ + return &batchv1.Job{ TypeMeta: metav1.TypeMeta{ Kind: "Job", APIVersion: "batch/v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: job.ObjectMeta.Jobname + "-trainer", + Name: job.ObjectMeta.Name + "-trainer", Namespace: job.ObjectMeta.Namespace, }, Spec: batchv1.JobSpec{ - Parallelism: &t.MinInstance, + Parallelism: &replicas, Template: v1.PodTemplateSpec{ - ObjectMeta: batchv1.ObjectMeta{ - Labels: map[string]string{"paddle-job-tainer": job.ObjectMeta.Name}, + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"paddle-job": job.ObjectMeta.Name}, }, Spec: v1.PodSpec{ - Voumes: []v1.Volume{ - Name: job.ObjectMeta.Name + "trainer-workspace", - VolumeSource: v1.VolumeSource{ - HostPath: v1.HostPathVolumeSource{ - //Path: job.Spec.Trainer.Workspace, - Path: "/workspace", - }, - }, - }, + Volumes: podVolumes(job), Containers: []v1.Container{ v1.Container{ - Name: job.ObjectMeta.Name, + Name: "trainer", Image: job.Spec.Image, ImagePullPolicy: "Always", Command: command, - volumeMounts: v1.VolumeMount{ - Name: job.ObjectMeta.Name + "trainer-workspace", - MountPath: "/workspace", - }, - Ports: podPorts(job), - Env: podEnv(job), - Resources: job.Spec.Trainer.Resources, + VolumeMounts: podVolumeMounts(job), + Ports: podPorts(job), + Env: podEnv(job), + Resources: job.Spec.Trainer.Resources, }, }, + RestartPolicy: "Never", }, }, }, } - // TODO: create job. - return &batchv1.Job{} +} + +func masterResource(job *paddlejob.TrainingJob) *v1.ResourceRequirements { + // TODO(gongwb): config master resource? + return &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "cpu": *apiresource.NewQuantity(int64(1), apiresource.DecimalSI), + "memory": apiresource.MustParse("500Mi"), + }, + Requests: v1.ResourceList{ + "cpu": *apiresource.NewQuantity(int64(2), apiresource.DecimalSI), + "memory": apiresource.MustParse("1Gi"), + }, + } +} + +func getEtcdPodSpec(job *paddlejob.TrainingJob) *v1.Container { + command := []string{"etcd", "-name", "etcd0", + "-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", + "-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", + "-initial-advertise-peer-urls", "http://$(POD_IP):2380", + "-listen-peer-urls", "http://0.0.0.0:2380", + "-initial-cluster", "etcd0=http://$(POD_IP):2380", + "-initial-cluster-state", "new"} + + return &v1.Container{ + Name: "etcd", + Image: "quay.io/coreos/etcd:v3.2.1", + ImagePullPolicy: "Always", + // TODO(gongwb): etcd ports? + Env: podEnv(job), + Command: command, + } } // ParseToMaster parse TrainingJob to a kubernetes replicaset resource. func (p *DefaultJobParser) ParseToMaster(job *paddlejob.TrainingJob) *v1beta1.ReplicaSet { - // TODO: create master if needed. - return &v1beta1.ReplicaSet{} + replicas := int32(1) + // FIXME: refine these part. + command := []string{"paddle_k8s", "start_master"} + + return &v1beta1.ReplicaSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "extensions/v1beta1", + APIVersion: "ReplicaSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: job.ObjectMeta.Name + "-master", + Namespace: job.ObjectMeta.Namespace, + }, + Spec: v1beta1.ReplicaSetSpec{ + Replicas: &replicas, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"paddle-job-master": job.ObjectMeta.Name}, + }, + Spec: v1.PodSpec{ + // TODO: setup pserver volumes on cloud. + Volumes: podVolumes(job), + Containers: []v1.Container{ + v1.Container{ + Name: "master", + Image: job.Spec.Image, + ImagePullPolicy: "Always", + Ports: masterPorts(job), + // TODO(gongwb):master env + Command: command, + VolumeMounts: podVolumeMounts(job), + Resources: *masterResource(job), + }, + *getEtcdPodSpec(job), + }, + }, + }, + }, + } } // ----------------------------------------------------------------------- @@ -183,7 +244,21 @@ func podPorts(job *paddlejob.TrainingJob) []v1.ContainerPort { }) basePort++ } - return []v1.ContainerPort{} + return ports +} + +func masterPorts(job *paddlejob.TrainingJob) []v1.ContainerPort { + ports := []v1.ContainerPort{ + v1.ContainerPort{ + Name: "master-port", + ContainerPort: 8080, + }, + v1.ContainerPort{ + Name: "etcd-port", + ContainerPort: 2379, + }, + } + return ports } func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { @@ -200,6 +275,7 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { // FIXME: CPU resource value can be less than 1. trainerCount = int(q.Value()) } + return []v1.EnvVar{ v1.EnvVar{Name: "PADDLE_JOB_NAME", Value: job.ObjectMeta.Name}, // NOTICE: TRAINERS, PSERVERS, PADDLE_INIT_NUM_GRADIENT_SERVERS @@ -221,23 +297,42 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { v1.EnvVar{Name: "PADDLE_INIT_NUM_GRADIENT_SERVERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)}, v1.EnvVar{Name: "PADDLE_INIT_NUM_PASSES", Value: strconv.Itoa(job.Spec.Passes)}, v1.EnvVar{Name: "PADDLE_INIT_USE_GPU", Value: needGPU}, + + // FIXME(gongwb): LD_LIBRARY_PATH? v1.EnvVar{Name: "LD_LIBRARY_PATH", Value: job.Spec.Trainer.Entrypoint}, v1.EnvVar{Name: "NAMESPACE", ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{ FieldPath: "metadata.namespace", }, }}, + v1.EnvVar{Name: "POD_IP", ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }}, } } func podVolumes(job *paddlejob.TrainingJob) []v1.Volume { - // TODO: prepare volumes. - return []v1.Volume{} + return []v1.Volume{ + v1.Volume{ + Name: job.ObjectMeta.Name + "-workspace", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: job.Spec.Trainer.Workspace, + }, + }, + }, + } } func podVolumeMounts(job *paddlejob.TrainingJob) []v1.VolumeMount { - // TODO: preapare volume mounts for pods. - return []v1.VolumeMount{} + return []v1.VolumeMount{ + v1.VolumeMount{ + Name: job.ObjectMeta.Name + "-workspace", + MountPath: job.Spec.Trainer.Workspace, + }, + } } // ----------------------------------------------------------------------- diff --git a/go/paddlectl/submit.go b/go/paddlectl/submit.go index 3c4da000..9c308889 100644 --- a/go/paddlectl/submit.go +++ b/go/paddlectl/submit.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "os" + "path" "strings" "k8s.io/client-go/pkg/api/v1" @@ -73,9 +74,8 @@ func (*SubmitCmd) Usage() string { func (p *SubmitCmd) getTrainer() *paddlejob.TrainerSpec { return &paddlejob.TrainerSpec{ - Entrypoint: p.Entry, - // FIXME(gongwb): workspace - + Entrypoint: p.Entry, + Workspace: getJobPfsPath(p.Jobpackage, p.Jobname), MinInstance: p.MinInstance, MaxInstance: p.MaxInstance, Resources: v1.ResourceRequirements{ @@ -200,14 +200,30 @@ func NewSubmitter(cmd *SubmitCmd) *Submitter { return &s } +func getJobPfsPath(jobPackage, jobName string) string { + _, err := os.Stat(jobPackage) + if os.IsNotExist(err) { + return jobPackage + } + + return path.Join("/pfs", Config.ActiveConfig.Name, "home", Config.ActiveConfig.Username, "jobs", jobName) +} + // putFiles puts files to pfs and // if jobPackage is not a local dir, skip uploading package. -func putFiles(jobPackage, jobName string) error { +func putFilesToPfs(jobPackage, jobName string) error { _, pkgerr := os.Stat(jobPackage) if pkgerr == nil { - // FIXME: upload job package to paddle cloud. + dest := getJobPfsPath(jobPackage, jobName) + if !strings.HasSuffix(jobPackage, "/") { + jobPackage = jobPackage + "/" + } + err := putFiles(jobPackage, dest) + if err != nil { + return err + } } else if os.IsNotExist(pkgerr) { - return fmt.Errorf("stat jobpackage '%s' error: %v", jobPackage, pkgerr) + glog.Warning("jobpackage not a local dir, skip upload.") } return nil @@ -228,7 +244,7 @@ func (s *Submitter) Submit(jobPackage string, jobName string) error { return err } - if err := putFiles(jobPackage, jobName); err != nil { + if err := putFilesToPfs(jobPackage, jobName); err != nil { return err } From 7e9459c6a1e03eb39487273d2261f769b9073228 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Tue, 14 Nov 2017 17:47:32 +0800 Subject: [PATCH 3/6] rm not need --- go/controller/autoscaler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/go/controller/autoscaler.go b/go/controller/autoscaler.go index f311e83c..23f4d2f1 100644 --- a/go/controller/autoscaler.go +++ b/go/controller/autoscaler.go @@ -420,7 +420,6 @@ func (a *Autoscaler) Monitor() { // scale it. var tj *batchv1.Job var err error - log.Debug("trainerjob:%s\n", e.Job) tj, err = a.cluster.GetTrainerJob(e.Job) if err != nil { log.Error( From ec6e375939b2b29bdfd57d9217bc9119471863de Mon Sep 17 00:00:00 2001 From: gongweibao Date: Wed, 15 Nov 2017 11:50:29 +0800 Subject: [PATCH 4/6] fix by helin's comments --- go/controller/controller.go | 13 +++---------- go/controller/jobparser.go | 2 +- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/go/controller/controller.go b/go/controller/controller.go index f7594d80..f6721eea 100644 --- a/go/controller/controller.go +++ b/go/controller/controller.go @@ -116,22 +116,15 @@ func (c *Controller) onAdd(obj interface{}) { m := parser.ParseToMaster(job) b, _ := json.MarshalIndent(p, "", " ") - log.Debug("create pserver:" + string(b[:])) + log.Debug("create pserver:" + string(b)) b, _ = json.MarshalIndent(t, "", " ") - log.Debug("create trainer-job:" + string(b[:])) + log.Debug("create trainer-job:" + string(b)) b, _ = json.MarshalIndent(m, "", " ") - log.Debug("create master:" + string(b[:])) + log.Debug("create master:" + string(b)) // TODO(gongwb): create them - // just like: - // namespace := job.ObjectMeta.Namespace - // _, err := c.clientset.ExtensionsV1beta1().ReplicaSets(namespace).Create(p) - // if err != nil { - // b, _ := json.MarshalIndent(p, "", " ") - // log.Debug("create pserver:%s\terror: %v", string(b[:]), err) - // } } func (c *Controller) onUpdate(oldObj, newObj interface{}) { diff --git a/go/controller/jobparser.go b/go/controller/jobparser.go index 24d62008..7ea150eb 100644 --- a/go/controller/jobparser.go +++ b/go/controller/jobparser.go @@ -111,7 +111,7 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R // ParseToTrainer parse TrainingJob to a kubernetes job resource. func (p *DefaultJobParser) ParseToTrainer(job *paddlejob.TrainingJob) *batchv1.Job { replicas := int32(job.Spec.Trainer.MinInstance) - command := make([]string, 2, 2) + command := make([]string, 2) if job.Spec.FaultTolerant { command = []string{"paddle_k8s", "start_trainer"} } else { From 2219f7ad4816d08cd0c0dc53e6ebc22b8c362432 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Wed, 15 Nov 2017 11:59:18 +0800 Subject: [PATCH 5/6] add simplefile --- go/cmd/paddlectl/main.go | 10 +- go/paddlectl/simplefile.go | 198 +++++++++++++++++++++++++++++++++++++ go/paddlectl/submit.go | 7 +- 3 files changed, 202 insertions(+), 13 deletions(-) create mode 100644 go/paddlectl/simplefile.go diff --git a/go/cmd/paddlectl/main.go b/go/cmd/paddlectl/main.go index c2927c4d..5bbbe0d2 100644 --- a/go/cmd/paddlectl/main.go +++ b/go/cmd/paddlectl/main.go @@ -14,14 +14,8 @@ func main() { subcommands.Register(subcommands.CommandsCommand(), "") subcommands.Register(&paddlectl.SubmitCmd{}, "") - // TODO(gongwb): add these commands. - // subcommands.Register(&paddlecloud.LogsCommand{}, "") - // subcommands.Register(&paddlecloud.GetCommand{}, "") - // subcommands.Register(&paddlecloud.KillCommand{}, "") - // subcommands.Register(&paddlecloud.SimpleFileCmd{}, "") - // subcommands.Register(&paddlecloud.RegistryCmd{}, "") - // subcommands.Register(&paddlecloud.DeleteCommand{}, "") - // subcommands.Register(&paddlecloud.PublishCmd{}, "") + // TODO(gongwb): add more commands. + subcommands.Register(&paddlectl.SimpleFileCmd{}, "") flag.Parse() ctx := context.Background() diff --git a/go/paddlectl/simplefile.go b/go/paddlectl/simplefile.go new file mode 100644 index 00000000..3258b0f6 --- /dev/null +++ b/go/paddlectl/simplefile.go @@ -0,0 +1,198 @@ +package paddlectl + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "net/url" + "os" + "path" + "path/filepath" + "strings" + + "github.com/PaddlePaddle/cloud/go/utils/restclient" + "github.com/google/subcommands" +) + +// TODO(gongwb): rm simplefile.go under paddlecloud/ + +// SimpleFileCmd define the subcommand of simple file operations. +type SimpleFileCmd struct { +} + +// Name is subcommands name. +func (*SimpleFileCmd) Name() string { return "file" } + +// Synopsis is subcommands synopsis. +func (*SimpleFileCmd) Synopsis() string { return "Simple file operations." } + +// Usage is subcommands Usage. +func (*SimpleFileCmd) Usage() string { + return `file [put|get] or file ls : + dst must be like /pfs/[datacenter]/home/[username] + Options: +` +} + +// SetFlags registers subcommands flags. +func (p *SimpleFileCmd) SetFlags(f *flag.FlagSet) { +} + +// Execute file ops. +func (p *SimpleFileCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { + if f.NArg() < 1 || f.NArg() > 3 { + f.Usage() + return subcommands.ExitFailure + } + switch f.Arg(0) { + case "put": + err := putFiles(f.Arg(1), f.Arg(2)) + if err != nil { + fmt.Fprintf(os.Stderr, "put file error: %s\n", err) + return subcommands.ExitFailure + } + case "get": + err := getFile(f.Arg(1), f.Arg(2)) + if err != nil { + fmt.Fprintf(os.Stderr, "get file error: %s\n", err) + return subcommands.ExitFailure + } + case "ls": + err := lsFile(f.Arg(1)) + if err != nil { + fmt.Fprintf(os.Stderr, "ls file error: %s\n", err) + return subcommands.ExitFailure + } + default: + f.Usage() + return subcommands.ExitFailure + } + return subcommands.ExitSuccess +} + +func lsFile(dst string) error { + query := url.Values{} + query.Set("path", dst) + query.Set("dc", Config.ActiveConfig.Name) + respStr, err := restclient.GetCall(Config.ActiveConfig.Endpoint+"/api/v1/filelist/", query) + if err != nil { + return err + } + var respObj interface{} + if err = json.Unmarshal(respStr, &respObj); err != nil { + return err + } + // FIXME: Print an error if error message is not empty. Use response code instead + errMsg := respObj.(map[string]interface{})["msg"].(string) + if len(errMsg) > 0 { + return errors.New("list file error: " + errMsg) + } + items := respObj.(map[string]interface{})["items"].([]interface{}) + for _, fn := range items { + fmt.Println(fn.(string)) + } + return nil +} + +func putFiles(src string, dest string) error { + f, err := os.Stat(src) + if err != nil { + return err + } + if strings.HasPrefix(src, "..") { + return errors.New("src path should be inside your submiting path") + } + switch mode := f.Mode(); { + case mode.IsDir(): + if err := filepath.Walk(src, func(path string, info os.FileInfo, err error) error { + if info.Mode().IsRegular() { + srcs := strings.Split(filepath.Clean(src), string(os.PathSeparator)) + paths := strings.Split(path, string(os.PathSeparator)) + var destFile string + if strings.HasSuffix(src, "/") { + destFile = filepath.Join(dest, strings.Join(paths[len(srcs):len(paths)], string(os.PathSeparator))) + } else { + destFile = filepath.Join(dest, strings.Join(paths[len(srcs)-1:len(paths)], string(os.PathSeparator))) + } + putFile(path, destFile) + } + return nil + }); err != nil { + return err + } + + case mode.IsRegular(): + _, f := filepath.Split(src) + return putFile(src, filepath.Join(dest, f)) + } + return nil +} + +func putFile(src string, dest string) error { + fmt.Printf("Uploading ... %s %s\n", src, dest) + query := url.Values{} + _, srcFile := path.Split(src) + destDir, destFile := path.Split(dest) + var destFullPath string + if len(destFile) == 0 { + destFullPath = path.Join(destDir, srcFile) + } else { + destFullPath = dest + } + query.Set("path", destFullPath) + respStr, err := restclient.PostFile(Config.ActiveConfig.Endpoint+"/api/v1/file/", src, query) + if err != nil { + return err + } + var respObj interface{} + if err = json.Unmarshal(respStr, &respObj); err != nil { + return err + } + // FIXME: Print an error if error message is not empty. Use response code instead + errMsg := respObj.(map[string]interface{})["msg"].(string) + if len(errMsg) > 0 { + fmt.Fprintf(os.Stderr, "upload file error: %s\n", errMsg) + } + return nil +} + +func getFile(src string, dest string) error { + query := url.Values{} + query.Set("path", src) + req, err := restclient.MakeRequestToken(Config.ActiveConfig.Endpoint+"/api/v1/file/", "GET", nil, "", query) + if err != nil { + return err + } + resp, err := restclient.HTTPClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.Status != restclient.HTTPOK { + return errors.New("server error: " + resp.Status) + } + _, srcFile := path.Split(src) + destDir, destFile := path.Split(dest) + var destFullPath string + if len(destFile) == 0 { + destFullPath = path.Join(destDir, srcFile) + } else { + destFullPath = dest + } + if _, err = os.Stat(destFullPath); err == nil { + return errors.New("file already exist: " + destFullPath) + } + out, err := os.Create(destFullPath) + if err != nil { + return err + } + defer out.Close() + _, err = io.Copy(out, resp.Body) + if err != nil { + return err + } + return nil +} diff --git a/go/paddlectl/submit.go b/go/paddlectl/submit.go index 9c308889..5e249caa 100644 --- a/go/paddlectl/submit.go +++ b/go/paddlectl/submit.go @@ -263,12 +263,9 @@ func (s *Submitter) Submit(jobPackage string, jobName string) error { return err } - if err := kubeutil.CreateTrainingJob(client, namespace, s.args.GetTrainingJob()); err != nil { - return err - } - - return nil + return kubeutil.CreateTrainingJob(client, namespace, s.args.GetTrainingJob()) } + func checkJobName(jobName string) error { if strings.Contains(jobName, "_") || strings.Contains(jobName, ".") { return errors.New(invalidJobName) From 8f051235549d25d74d06042ae65935a776225ae8 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Thu, 16 Nov 2017 09:59:26 +0800 Subject: [PATCH 6/6] fix by wuyi's comments --- go/controller/jobparser.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/go/controller/jobparser.go b/go/controller/jobparser.go index 7ea150eb..ab9631d9 100644 --- a/go/controller/jobparser.go +++ b/go/controller/jobparser.go @@ -27,6 +27,10 @@ import ( v1beta1 "k8s.io/client-go/pkg/apis/extensions/v1beta1" ) +const ( + imagePullPolicy = "Always" +) + // JobParser is a interface can parse "TrainingJob" to // ReplicaSet and job. type JobParser interface { @@ -139,7 +143,7 @@ func (p *DefaultJobParser) ParseToTrainer(job *paddlejob.TrainingJob) *batchv1.J v1.Container{ Name: "trainer", Image: job.Spec.Image, - ImagePullPolicy: "Always", + ImagePullPolicy: imagePullPolicy, Command: command, VolumeMounts: podVolumeMounts(job), Ports: podPorts(job), @@ -180,7 +184,7 @@ func getEtcdPodSpec(job *paddlejob.TrainingJob) *v1.Container { return &v1.Container{ Name: "etcd", Image: "quay.io/coreos/etcd:v3.2.1", - ImagePullPolicy: "Always", + ImagePullPolicy: imagePullPolicy, // TODO(gongwb): etcd ports? Env: podEnv(job), Command: command, @@ -215,7 +219,7 @@ func (p *DefaultJobParser) ParseToMaster(job *paddlejob.TrainingJob) *v1beta1.Re v1.Container{ Name: "master", Image: job.Spec.Image, - ImagePullPolicy: "Always", + ImagePullPolicy: imagePullPolicy, Ports: masterPorts(job), // TODO(gongwb):master env Command: command, @@ -297,9 +301,7 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { v1.EnvVar{Name: "PADDLE_INIT_NUM_GRADIENT_SERVERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)}, v1.EnvVar{Name: "PADDLE_INIT_NUM_PASSES", Value: strconv.Itoa(job.Spec.Passes)}, v1.EnvVar{Name: "PADDLE_INIT_USE_GPU", Value: needGPU}, - - // FIXME(gongwb): LD_LIBRARY_PATH? - v1.EnvVar{Name: "LD_LIBRARY_PATH", Value: job.Spec.Trainer.Entrypoint}, + v1.EnvVar{Name: "LD_LIBRARY_PATH", Value: "/usr/local/cuda/lib64"}, v1.EnvVar{Name: "NAMESPACE", ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{ FieldPath: "metadata.namespace", @@ -317,6 +319,7 @@ func podVolumes(job *paddlejob.TrainingJob) []v1.Volume { return []v1.Volume{ v1.Volume{ Name: job.ObjectMeta.Name + "-workspace", + // TODO(gongwb): add support to ceph fs and mount public path. VolumeSource: v1.VolumeSource{ HostPath: &v1.HostPathVolumeSource{ Path: job.Spec.Trainer.Workspace,