diff --git a/go/cmd/edl/edl.go b/go/cmd/edl/edl.go index 6cf422cd..db2eed34 100644 --- a/go/cmd/edl/edl.go +++ b/go/cmd/edl/edl.go @@ -1,10 +1,10 @@ package main import ( - "context" "flag" log "github.com/inconshreveable/log15" + "github.com/wangkuiyi/candy" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -21,9 +21,7 @@ func main() { flag.Parse() lvl, err := log.LvlFromString(*logLevel) - if err != nil { - panic(err) - } + candy.Must(err) log.Root().SetHandler( log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)), @@ -36,27 +34,18 @@ func main() { } else { cfg, err = rest.InClusterConfig() } - if err != nil { - panic(err) - } + candy.Must(err) - // setup some optional configuration - edlresource.RegisterTrainingJob(cfg) + edlresource.RegisterResource(cfg, &edlresource.TrainingJob{}, &edlresource.TrainingJobList{}) clientset, err := kubernetes.NewForConfig(cfg) - if err != nil { - panic(err) - } + candy.Must(err) client, err := rest.RESTClientFor(cfg) - if err != nil { - panic(err) - } + candy.Must(err) controller, err := edl.New(client, clientset, *maxLoadDesired) - if err != nil { - panic(err) - } + candy.Must(err) - controller.Run(context.Background()) + controller.Run() } diff --git a/go/edl/autoscaler.go b/go/edl/autoscaler.go index 729caf89..8d2fc956 100644 --- a/go/edl/autoscaler.go +++ b/go/edl/autoscaler.go @@ -31,36 +31,6 @@ const ( defaultLoopDur = time.Second * 5 ) -// ClusterResource is the resource of a cluster -type ClusterResource struct { - NodeCount int - - GPURequest int - GPULimit int - GPUTotal int - - CPURequestMilli int64 - CPULimitMilli int64 - CPUTotalMilli int64 - - MemoryRequestMega int64 - MemoryLimitMega int64 - MemoryTotalMega int64 - - NodeInfos NodeInfos -} - -// NodeInfos is the information of all nodes. -type NodeInfos struct { - NodesCPUIdleMilli map[string]int64 - NodesMemoryFreeMega map[string]int64 -} - -// String returns the string that represents NodeInfo when printed. -func (n NodeInfos) String() string { - return fmt.Sprintf("NodeInfo(%d nodes)", len(n.NodesCPUIdleMilli)) -} - type job struct { Config *edlresource.TrainingJob TrainerJob *batchv1.Job @@ -97,7 +67,7 @@ func (j job) Fulfillment() float64 { type Autoscaler struct { ticker *time.Ticker cluster *Cluster - jobs map[string]job + jobs map[string]*job eventCh chan event maxLoadDesired float64 } @@ -114,7 +84,7 @@ func newAutoscaler(cluster *Cluster, options ...func(*Autoscaler)) *Autoscaler { c := &Autoscaler{ cluster: cluster, ticker: time.NewTicker(defaultLoopDur), - jobs: make(map[string]job), + jobs: make(map[string]*job), eventCh: make(chan event), maxLoadDesired: 1.0, } @@ -124,7 +94,7 @@ func newAutoscaler(cluster *Cluster, options ...func(*Autoscaler)) *Autoscaler { return c } -type jobs []job +type jobs []*job func (j jobs) Len() int { return len(j) @@ -159,12 +129,12 @@ func (j jobs) Swap(a int, b int) { } // elastic job filter. -func elastic(j job) bool { +func elastic(j *job) bool { return j.Config.Elastic() } // gpu job filter. -func gpu(j job) bool { +func gpu(j *job) bool { return j.Config.NeedGPU() } @@ -202,7 +172,7 @@ func (a *Autoscaler) OnUpdate(trainingjob *edlresource.TrainingJob) { // sortedJobs return the names of sorted jobs by fulfillment and // tiebreakers in ascending order. -func sortedJobs(j []job, filters ...func(job) bool) []job { +func sortedJobs(j []*job, filters ...func(*job) bool) []*job { var js jobs nextJob: for _, v := range j { @@ -218,17 +188,17 @@ nextJob: return js } -func searchAssignableNode(r *ClusterResource, j job) (nodeName string) { - for name, idle := range r.NodeInfos.NodesCPUIdleMilli { +func searchAssignableNode(r *ClusterResource, j *job) (nodeName string) { + for name, idle := range r.Nodes.NodesCPUIdleMilli { if j.TrainerCPURequestMilli() <= idle && - j.TrainerMemRequestMega() <= r.NodeInfos.NodesMemoryFreeMega[name] { + j.TrainerMemRequestMega() <= r.Nodes.NodesMemoryFreeMega[name] { return name } } return "" } -func scaleDryRun(r *ClusterResource, j job, curDiff int, maxLoadDesired float64, scaleDown bool) (additional int) { +func scaleDryRun(r *ClusterResource, j *job, curDiff int, maxLoadDesired float64, scaleDown bool) (additional int) { additionalGPUInstance := 0 additionalCPUInstance := 0 cpuRequestMilli := j.TrainerCPURequestMilli() @@ -241,8 +211,8 @@ func scaleDryRun(r *ClusterResource, j job, curDiff int, maxLoadDesired float64, r.CPURequestMilli += cpuRequestMilli * int64(additional) r.MemoryRequestMega += memRequestMega * int64(additional) if nodeName != "" { - r.NodeInfos.NodesCPUIdleMilli[nodeName] += cpuRequestMilli * int64(additional) - r.NodeInfos.NodesMemoryFreeMega[nodeName] += memRequestMega * int64(additional) + r.Nodes.NodesCPUIdleMilli[nodeName] += cpuRequestMilli * int64(additional) + r.Nodes.NodesMemoryFreeMega[nodeName] += memRequestMega * int64(additional) } }() @@ -320,13 +290,16 @@ func scaleDryRun(r *ClusterResource, j job, curDiff int, maxLoadDesired float64, return } -func scaleAllDryRun(jobs []job, r ClusterResource, maxLoadDesired float64) map[string]int { +// scaleAllJobsDryRun pretends to rescale all jobs in order to find +// out the number of pods should be added/deleted for each job, or +// say, delta. It returns a map from job name to the delta. +func scaleAllJobsDryRun(jobs []*job, r ClusterResource, maxLoadDesired float64) map[string]int { // Iteratively calculate scaling diff until nothing changes. diff := make(map[string]int) for { noChange := true sorted := sortedJobs(jobs, elastic) - dryRun := func(j job, isScaleDown bool) { + dryRun := func(j *job, isScaleDown bool) { name := j.Config.Name additional := scaleDryRun(&r, j, diff[name], maxLoadDesired, isScaleDown) log.Debug( @@ -363,7 +336,7 @@ func scaleAllDryRun(jobs []job, r ClusterResource, maxLoadDesired float64) map[s return diff } -func (a *Autoscaler) scaleAll(target map[string]int) { +func (a *Autoscaler) scaleAllJobs(target map[string]int) { for name := range target { log.Info("scaling job", "name", name, "target", target[name]) @@ -402,130 +375,101 @@ func (a *Autoscaler) scaleAll(target map[string]int) { } } -// Monitor monitors the cluster resources and training jobs in a loop, -// scales the training jobs according to the cluster resource. -func (a *Autoscaler) Monitor() { - for { - select { - case <-a.ticker.C: - case e := <-a.eventCh: - log.Debug("monitor received event", "event", e) - switch e.Type { - case add: - fallthrough - case update: - // TODO(helin): schedule the training - // k8s Job. Currently we don't - // schedule the trainer job, but only - // scale it. - var tj *batchv1.Job - var err error - tj, err = a.cluster.GetTrainerJob(e.Job) - if err != nil { - log.Error( - "Error getting the trainer k8s job, will sync later.", - "name", e.Job.ObjectMeta.Name, - "error", err, - ) - j := job{ - Config: e.Job, - TrainerJob: nil, - } - a.jobs[e.Job.ObjectMeta.Name] = j - continue - } - j := job{ - Config: e.Job, - TrainerJob: tj, - } - a.jobs[e.Job.ObjectMeta.Name] = j - case del: - // TODO(helin): delete all created - // resources (e.g., trainer Job, - // pserver Replica Set) when we - // schedules the resources. - delete(a.jobs, e.Job.ObjectMeta.Name) - default: - log.Error("unrecognized event", "event", e) - } +// updateJobList updates the data structure a.jobs according to +// received events about the TrainingJob resource. It returns true if +// the EDL controller need to do some scheduling work. If it returns +// false, the EDL controller could simply go on monitoring other +// events. +func (a *Autoscaler) updateJobList(evt event) bool { + log.Debug("monitor received event", "event", evt) + switch evt.Type { + case add, update: + j := &job{Config: evt.Job} + a.jobs[evt.Job.ObjectMeta.Name] = j + if a.tryToRetrieveTrainerJobInTrainingJob(evt.Job.ObjectMeta.Name, j) != nil { + return false } + case del: + // TODO(helin): delete all created resources (e.g., + // trainer Job, pserver Replica Set) when we schedules + // the resources. + delete(a.jobs, evt.Job.ObjectMeta.Name) + default: + log.Error("unrecognized event", "event", evt) + } + + return true +} - r, err := a.cluster.SyncResource() +// findPendingJob returns true if there is at least one training job +// whose all pods are pending. +func (a *Autoscaler) findPendingJob() bool { + for jobName, job := range a.jobs { + if a.tryToRetrieveTrainerJobInTrainingJob(jobName, job) != nil { + continue + } + total, _, pending, err := a.cluster.JobPods(job.Config) if err != nil { - log.Error("error sync resource", "error", err) + log.Error("check if job is running failed", "error", err) continue } - log.Info("sync cluster resource done", "resource", r) - - havePending := false - for key, j := range a.jobs { - // k8s job for trainers may not be created immediently - // try sync it here - if j.TrainerJob == nil { - tj, err := a.cluster.GetTrainerJob(j.Config) - if err != nil { - log.Error( - "Error getting the trainer k8s job, will sync later.", - "name", j.Config.ObjectMeta.Name, - "error", err, - ) - continue - } - j.TrainerJob = tj - a.jobs[key] = j - } - total, _, pending, err := a.cluster.JobPods(j.Config) - if err != nil { - log.Error("check if job is running failed", "error", err) - continue - } + if total == pending { + return true + } + } + return false +} - if total == pending { - havePending = true - break - } +func (a *Autoscaler) tryToRetrieveTrainerJobInTrainingJob(jobName string, job *job) error { + // TODO(helin): Because we wrote the conversion from + // TrainingJob into ReplicaSet and Job in paddlelcloud instead + // of EDL controller (please refer to above TODO comment for + // details), we might suffer from the problem that when the + // EDL controller calls cluster.GetTrainerJob, it doesn't + // return TrainerJob before the conversion is done at the + // paddlecloud. After we fix this problem, we can remove the + // following call to cluster.GetTrainerJob and keep the one in + // updateJobLists. + if job.TrainerJob == nil { + tj, err := a.cluster.GetTrainerJob(job.Config) + if err != nil { + log.Error( + "Error getting the trainer k8s job, will sync later.", + "name", job.Config.ObjectMeta.Name, + "error", err, + ) + return err } + job.TrainerJob = tj + } + return nil +} - var js []job - for key, j := range a.jobs { - // k8s job for trainers may not be created immediently - // try sync it here - if j.TrainerJob == nil { - tj, err := a.cluster.GetTrainerJob(j.Config) - if err != nil { - log.Error( - "Error getting the trainer k8s job, will sync later.", - "name", j.Config.ObjectMeta.Name, - "error", err, - ) - continue - } - j.TrainerJob = tj - a.jobs[key] = j +// Monitor monitors the cluster resources and training jobs in a loop, +// scales the training jobs according to the cluster resource. +func (a *Autoscaler) Run() { + for { + select { + case <-a.ticker.C: + case evt := <-a.eventCh: + if !a.updateJobList(evt) { + continue // If nothing important, go on looping. } + } - total, running, pending, err := a.cluster.JobPods(j.Config) - if err != nil { - log.Error("check if job is running failed", "error", err) - continue - } + r, err := a.cluster.InquiryResource() + if err != nil { + log.Error("Cluster.InquiryResource", "error", err) + continue + } + log.Debug("Cluster.InquiryResource done", "resource", r) - log.Info( - "job info", - "name", key, - "running", running, - "pending", pending, - "total", total, - ) + diff := scaleAllJobsDryRun( + a.findTrainingJobsMightBeRescheduled( + a.findPendingJob()), + r, a.maxLoadDesired) - // Scale jobs only when all pods' "Phase" are - // running, or some job is pending. - if total == running || havePending { - js = append(js, j) - } - } - diff := scaleAllDryRun(js, r, a.maxLoadDesired) target := make(map[string]int) for k, v := range diff { target[k] = int(*a.jobs[k].TrainerJob.Spec.Parallelism) + v @@ -535,6 +479,33 @@ func (a *Autoscaler) Monitor() { log.Info("calculated scaling plan", "target", target, "clusterResource", r) } - a.scaleAll(target) + + a.scaleAllJobs(target) } } + +func (a *Autoscaler) findTrainingJobsMightBeRescheduled(havePending bool) []*job { + var js []*job + for jobName, job := range a.jobs { + if a.tryToRetrieveTrainerJobInTrainingJob(jobName, job) != nil { + continue + } + + total, running, pending, err := a.cluster.JobPods(job.Config) + if err != nil { + log.Error("check if job is running failed", "error", err) + continue + } + log.Info("job info", "name", jobName, "running", running, "pending", pending, "total", total) + + // A job is subject to rescheduling if it is in a + // stable status, which means that all its pods are + // running. Or, if there is a job whose all pods are + // pending, we might need to reschedule all other jobs + // to make a room for the pending job. + if total == running || havePending { + js = append(js, job) + } + } + return js +} diff --git a/go/edl/autoscaler_internal_test.go b/go/edl/autoscaler_internal_test.go index 36e0ec40..0d6cd3ac 100644 --- a/go/edl/autoscaler_internal_test.go +++ b/go/edl/autoscaler_internal_test.go @@ -53,7 +53,7 @@ func makePtr(c int) *int32 { return &p } -func makeJob(name string, cpuReq, cpuLim, memReq, memLim, gpuLim string, min, max, p int) job { +func makeJob(name string, cpuReq, cpuLim, memReq, memLim, gpuLim string, min, max, p int) *job { cr, err := resource.ParseQuantity(cpuReq) if err != nil { panic(err) @@ -75,7 +75,7 @@ func makeJob(name string, cpuReq, cpuLim, memReq, memLim, gpuLim string, min, ma panic(err) } - j := job{ + j := &job{ Config: &edlresource.TrainingJob{}, TrainerJob: &batchv1.Job{}, } @@ -106,7 +106,7 @@ func TestScaleDryRunSatisfied(t *testing.T) { assert.Equal(t, 0, scaleDryRun(&r, j, 0, 1.0, false)) } -var allIdleNodeInfos = NodeInfos{ +var allIdleNodes = Nodes{ NodesCPUIdleMilli: map[string]int64{"node0": 99999}, NodesMemoryFreeMega: map[string]int64{"node0": 99999}, } @@ -119,7 +119,7 @@ func TestScaleDryRunMoreCPU(t *testing.T) { MemoryRequestMega: 100, MemoryLimitMega: 100, MemoryTotalMega: 1000, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1) assert.Equal(t, 1, scaleDryRun(&r, j, 0, 1.0, false)) @@ -133,7 +133,7 @@ func TestScaleDryRunNoMoreCPU(t *testing.T) { MemoryRequestMega: 100, MemoryLimitMega: 100, MemoryTotalMega: 1000, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1) @@ -151,7 +151,7 @@ func TestScaleDryRunMoreGPU(t *testing.T) { GPULimit: 0, GPURequest: 0, GPUTotal: 10, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "10Mi", "10Mi", "1", 1, 3, 1) assert.Equal(t, 1, scaleDryRun(&r, j, 0, 1.0, false)) @@ -169,7 +169,7 @@ func TestScaleDryRunNoMoreGPU(t *testing.T) { GPULimit: 10, GPURequest: 10, GPUTotal: 10, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "10Mi", "10Mi", "1", 1, 3, 1) @@ -207,7 +207,7 @@ func TestScaleDryRunScaleDownToMin(t *testing.T) { GPULimit: 10, GPURequest: 10, GPUTotal: 10, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "10Mi", "10Mi", "0", 1, 3, 3) @@ -227,7 +227,7 @@ func TestScaleDryRunScaleDownFullCluster(t *testing.T) { GPULimit: 10, GPURequest: 10, GPUTotal: 10, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "10Mi", "10Mi", "0", 1, 3, 3) @@ -246,7 +246,7 @@ func TestScaleDryRunNoMem(t *testing.T) { GPULimit: 10, GPURequest: 10, GPUTotal: 10, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1) @@ -260,11 +260,11 @@ func TestScaleAllDryRunNoMem(t *testing.T) { MemoryLimitMega: 1000, MemoryTotalMega: 1000, GPUTotal: 10, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "1", "1", "1", 1, 3, 1) - scale := scaleAllDryRun([]job{j}, r, 1.0)["name"] + scale := scaleAllJobsDryRun([]*job{j}, r, 1.0)["name"] assert.Equal(t, 0, scale) } @@ -279,11 +279,11 @@ func TestScaleAllDryRun(t *testing.T) { GPULimit: 8, GPURequest: 8, GPUTotal: 10, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1) - scale := scaleAllDryRun([]job{j}, r, 1.0)["name"] + scale := scaleAllJobsDryRun([]*job{j}, r, 1.0)["name"] assert.Equal(t, 2, scale) } @@ -298,11 +298,11 @@ func TestScaleAllDryRunNotFull(t *testing.T) { GPULimit: 0, GPURequest: 0, GPUTotal: 10, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1) - scale := scaleAllDryRun([]job{j}, r, 0.8)["name"] + scale := scaleAllJobsDryRun([]*job{j}, r, 0.8)["name"] assert.Equal(t, 1, scale) } @@ -317,11 +317,11 @@ func TestScaleAllDryRunDownNotFull(t *testing.T) { GPULimit: 0, GPURequest: 0, GPUTotal: 10, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 3) - scale := scaleAllDryRun([]job{j}, r, 0.8)["name"] + scale := scaleAllJobsDryRun([]*job{j}, r, 0.8)["name"] assert.Equal(t, -1, scale) } @@ -336,11 +336,11 @@ func TestScaleAllDryRunLessCPU(t *testing.T) { GPULimit: 8, GPURequest: 8, GPUTotal: 10, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "1", "1", "1", 1, 3, 1) - scale := scaleAllDryRun([]job{j}, r, 1.0)["name"] + scale := scaleAllJobsDryRun([]*job{j}, r, 1.0)["name"] assert.Equal(t, 1, scale) } @@ -355,11 +355,11 @@ func TestScaleAllDryRunLessGPU(t *testing.T) { GPULimit: 9, GPURequest: 9, GPUTotal: 10, - NodeInfos: allIdleNodeInfos, + Nodes: allIdleNodes, } j := makeJob("name", "1", "1", "1", "1", "1", 1, 3, 1) - scale := scaleAllDryRun([]job{j}, r, 1.0)["name"] + scale := scaleAllJobsDryRun([]*job{j}, r, 1.0)["name"] assert.Equal(t, 1, scale) } @@ -375,7 +375,7 @@ func TestFulfillment(t *testing.T) { } func TestSortedJobs(t *testing.T) { - jobs := []job{ + jobs := []*job{ makeJob("a", "1", "1", "1", "1", "1", 1, 2, 2), makeJob("b", "1", "1", "1", "1", "1", 1, 20, 2), makeJob("c", "1", "1", "1", "1", "1", 1, 10, 2), @@ -398,7 +398,7 @@ func TestSortedJobs(t *testing.T) { } func TestSortedJobsGPUOnly(t *testing.T) { - jobs := []job{ + jobs := []*job{ makeJob("a", "1", "1", "1", "1", "1", 1, 2, 2), makeJob("b", "1", "1", "1", "1", "0", 1, 20, 2), makeJob("c", "1", "1", "1", "1", "0", 1, 10, 2), @@ -420,7 +420,7 @@ func TestSortedJobsGPUOnly(t *testing.T) { } func TestSortedJobsWithTie(t *testing.T) { - jobs := []job{ + jobs := []*job{ makeJob("a", "1", "0", "1", "1", "1", 1, 2, 1), makeJob("b", "1", "1", "1", "1", "0", 1, 2, 1), makeJob("c", "10", "10", "1", "1", "0", 1, 2, 1), diff --git a/go/edl/autoscaler_test.go b/go/edl/autoscaler_test.go index 883a854a..09e1369e 100644 --- a/go/edl/autoscaler_test.go +++ b/go/edl/autoscaler_test.go @@ -26,12 +26,12 @@ func TestNew(t *testing.T) { assert.NotNil(t, c) } -func TestMonitor(t *testing.T) { +func TestRun(t *testing.T) { c := newAutoscaler(nil) ch := make(chan struct{}) go func() { - c.Monitor() + c.Run() close(ch) }() diff --git a/go/edl/cluster.go b/go/edl/cluster.go index fb444aea..dd9cdc21 100644 --- a/go/edl/cluster.go +++ b/go/edl/cluster.go @@ -28,7 +28,54 @@ import ( "k8s.io/kubernetes/pkg/api" ) -// Cluster resprensents a Kubernetes cluster. +// ClusterResource is the resource of a cluster +type ClusterResource struct { + NodeCount int // The total number of nodes in the cluster. + + // Each Kubernetes job could require some number of GPUs in + // the range of [request, limit]. + GPURequest int // \sum_job num_gpu_request(job) + GPULimit int // \sum_job num_gpu_limit(job) + GPUTotal int // The total number of GPUs in the cluster + + // Each Kubernetes job could require some CPU timeslices in + // the unit of *milli*. + CPURequestMilli int64 // \sum_job cpu_request_in_milli(job) + CPULimitMilli int64 // \sum_job cpu_request_in_milli(job) + CPUTotalMilli int64 // The total amount of CPUs in the cluster in milli. + + // Each Kubernetes job could require some amount of memory in + // the unit of *mega*. + MemoryRequestMega int64 // \sum_job memory_request_in_mega(job) + MemoryLimitMega int64 // \sum_job memory_limit_in_mega(job) + MemoryTotalMega int64 // The total amount of memory in the cluster in mega. + + Nodes Nodes +} + +// Nodes records the amount of idle CPU and free memory of each node +// in the cluster. +type Nodes struct { + NodesCPUIdleMilli map[string]int64 // node id -> idle CPU + NodesMemoryFreeMega map[string]int64 // node id -> free memory +} + +func (ns *Nodes) String() string { + if len(ns.NodesCPUIdleMilli) != len(ns.NodesMemoryFreeMega) { + panic("Inconsistent length in Nodes") + } + + return fmt.Sprintf("%d Nodes", len(ns.NodesCPUIdleMilli)) +} + +// Cluster is our interface to the Kubernetes cluster. It can inquiry +// the cluster's overall status and the status of a specific +// PaddlePaddle trainning job. It can also create training jobs and +// replica. +// +// TODO(yi): The above functionalities are NOT logically related with +// each other. I am not sure if it is a good idea to group them in +// this source file. type Cluster struct { clientset *kubernetes.Clientset } @@ -125,8 +172,8 @@ func updateNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]i return } -// SyncResource will update free and total resources in k8s cluster. -func (c *Cluster) SyncResource() (res ClusterResource, err error) { +// InquiryResource returns the idle and total resources of the Kubernetes cluster. +func (c *Cluster) InquiryResource() (res ClusterResource, err error) { nodes := c.clientset.CoreV1().Nodes() nodeList, err := nodes.List(metav1.ListOptions{}) if err != nil { @@ -186,7 +233,7 @@ func (c *Cluster) SyncResource() (res ClusterResource, err error) { CPULimitMilli: allLimits.Cpu().ScaledValue(resource.Milli), MemoryLimitMega: allLimits.Memory().ScaledValue(resource.Mega), - NodeInfos: NodeInfos{ + Nodes: Nodes{ NodesCPUIdleMilli: nodesCPUIdleMilli, NodesMemoryFreeMega: nodesMemoryFreeMega, }, diff --git a/go/edl/controller.go b/go/edl/controller.go index d3cebd65..e4689a04 100644 --- a/go/edl/controller.go +++ b/go/edl/controller.go @@ -25,8 +25,8 @@ package edl import ( - "context" "encoding/json" + "sync" log "github.com/inconshreveable/log15" @@ -61,19 +61,21 @@ func New(c *rest.RESTClient, cs *kubernetes.Clientset, maxLoadDesired float64) ( } // Run start to watch kubernetes events and do handlers. -func (c *Controller) Run(ctx context.Context) error { - err := c.startWatch(ctx) - if err != nil { - return err - } - - go c.autoscaler.Monitor() - - <-ctx.Done() - return ctx.Err() +func (c *Controller) Run() { + var wg sync.WaitGroup + wg.Add(2) + go func() { + c.WatchTrainingJobs() + wg.Done() + }() + go func() { + c.autoscaler.Run() + wg.Done() + }() + wg.Wait() } -func (c *Controller) startWatch(ctx context.Context) error { +func (c *Controller) WatchTrainingJobs() { source := cache.NewListWatchFromClient( c.client, edlresource.TrainingJobs, @@ -101,8 +103,7 @@ func (c *Controller) startWatch(ctx context.Context) error { DeleteFunc: c.onDelete, }) - go informer.Run(ctx.Done()) - return nil + informer.Run(make(chan struct{})) // A channel will never close. } func (c *Controller) onAdd(obj interface{}) { diff --git a/go/edl/resource/training_job.go b/go/edl/resource/training_job.go index deb09d71..459f4e80 100644 --- a/go/edl/resource/training_job.go +++ b/go/edl/resource/training_job.go @@ -12,7 +12,52 @@ See the License for the specific language governing permissions and limitations under the License. */ -// sample resource: +package resource + +import ( + "encoding/json" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + clientgoapi "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" +) + +// TrainingJobs string for registration +const TrainingJobs = "TrainingJobs" + +// A TrainingJob is a Kubernetes resource, it describes a PaddlePaddle +// training job. As a Kubernetes resource, +// +// - Its content must follow the Kubernetes resource definition convention. +// - It must be a Go struct with JSON tags. +// - It must implement the deepcopy interface. +// +// To start a PadldePaddle training job, +// +// (1) The user uses the paddlecloud command line tool, which sends +// the command line arguments to the paddlecloud HTTP server. +// +// (2) The paddlecloud server converts the command line arguments into +// a TrainingJob resource and sends it to the Kubernetes API server. +// +// +// (3) the EDL controller, which moinitors events about the +// TrainingJob resource accepted by the Kubernetes API server, +// converts the TrainingJob resource into the following Kubernetes +// resources: +// +// (3.1) a ReplicaSet of the master process +// (3.2) a ReplicaSet of the parameter server proceses +// (3.3) a Job of trainer processes +// +// (4) some default controllers provided by Kubernetes monitors events +// about ReplicaSet and Job creates and maintains the Pods. +// +// An example TrainingJob instance: /* apiVersion: paddlepaddle.org/v1 kind: TrainingJob @@ -51,25 +96,6 @@ spec: cpu: "500m" memory: "600Mi" */ - -package resource - -import ( - "encoding/json" - - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer" - clientgoapi "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" -) - -// TrainingJobs string for registration -const TrainingJobs = "TrainingJobs" - -// TrainingJob defination // +k8s:deepcopy-gen=true // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type TrainingJob struct { @@ -89,8 +115,8 @@ type TrainingJobSpec struct { PortsNumForSparse int `json:"ports_num_for_sparse,omitempty"` FaultTolerant bool `json:"fault_tolerant,omitempty"` Passes int `json:"passes,omitempty"` - Volumes []v1.Volume `json:volumes` - VolumeMounts []v1.VolumeMount `json:VolumeMounts` + Volumes []v1.Volume `json:"volumes"` + VolumeMounts []v1.VolumeMount `json:"VolumeMounts"` // Job components. Trainer TrainerSpec `json:"trainer"` Pserver PserverSpec `json:"pserver"` @@ -149,12 +175,6 @@ type TrainingJobList struct { Items []TrainingJob `json:"items"` } -// NeedGPU returns true if the job need GPU resource to run. -func (s *TrainingJob) NeedGPU() bool { - q := s.Spec.Trainer.Resources.Limits.NvidiaGPU() - return q.CmpInt64(0) == 1 -} - // Elastic returns true if the job can scale to more workers. func (s *TrainingJob) Elastic() bool { return s.Spec.Trainer.MinInstance < s.Spec.Trainer.MaxInstance @@ -168,17 +188,24 @@ func (s *TrainingJob) GPU() int { // FIXME: treat errors gpu = 0 } - return int(gpu) } +// NeedGPU returns true if the job need GPU resource to run. +func (s *TrainingJob) NeedGPU() bool { + return s.GPU() > 0 +} + func (s *TrainingJob) String() string { b, _ := json.MarshalIndent(s, "", " ") return string(b[:]) } -// RegisterTrainingJob registers TrainingJob a new type of resource to Kubernetes. -func RegisterTrainingJob(config *rest.Config) { +// RegisterResource registers a resource type and the corresponding +// resource list type to the local Kubernetes runtime under group +// version "paddlepaddle.org", so the runtime could encode/decode this +// Go type. It also change config.GroupVersion to "paddlepaddle.org". +func RegisterResource(config *rest.Config, resourceType, resourceListType runtime.Object) *rest.Config { groupversion := schema.GroupVersion{ Group: "paddlepaddle.org", Version: "v1", @@ -189,16 +216,13 @@ func RegisterTrainingJob(config *rest.Config) { config.ContentType = runtime.ContentTypeJSON config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: clientgoapi.Codecs} - schemeBuilder := runtime.NewSchemeBuilder( - func(scheme *runtime.Scheme) error { - scheme.AddKnownTypes( - groupversion, - &TrainingJob{}, - &TrainingJobList{}, - &v1.ListOptions{}, - &v1.DeleteOptions{}, - ) - return nil - }) - schemeBuilder.AddToScheme(clientgoapi.Scheme) + clientgoapi.Scheme.AddKnownTypes( + groupversion, + resourceType, + resourceListType, + &v1.ListOptions{}, + &v1.DeleteOptions{}, + ) + + return config } diff --git a/go/glide.lock b/go/glide.lock index 73cdb86c..e3dd9aa4 100644 --- a/go/glide.lock +++ b/go/glide.lock @@ -1,5 +1,5 @@ -hash: 9c125b7ddc893ea6228034bc28995209118f4702b183cb6cec49ab7a776708db -updated: 2018-01-15T20:33:34.067167+08:00 +hash: ec75366a24092a9e6ab3e50b78c8f4fe836f4a79c32bb21277bab8a13a816e6f +updated: 2018-01-31T13:52:07.261668-08:00 imports: - name: github.com/bitly/go-simplejson version: aabad6e819789e569bd6aabf444c935aa9ba1e44 @@ -57,7 +57,7 @@ imports: - name: github.com/gorilla/context version: 215affda49addc4c8ef7e2534915df2c8c35c6cd - name: github.com/gorilla/mux - version: 7f08801859139f86dfafd1c296e2cba9a80d292e + version: 53c1911da2b537f792e7cafcb446b05ffe33b996 - name: github.com/gregjones/httpcache version: 787624de3eb7bd915c329cba748687a3b22666a6 subpackages: @@ -94,6 +94,8 @@ imports: version: 5bd2802263f21d8788851d5305584c82a5c75d7e - name: github.com/spf13/pflag version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 +- name: github.com/wangkuiyi/candy + version: 1b9030d056fa9f8c4b1f9c91b52fe4b8ab4cd8cc - name: golang.org/x/crypto version: 81e90905daefcd6fd217b62423c0908922eadb30 subpackages: @@ -264,7 +266,7 @@ imports: subpackages: - pkg/common - name: k8s.io/kubernetes - version: f50b3431ea82d981d7d2e62d9f9da44e724fbe41 + version: f7c6586ab26f07a6573218b0b820b1f9b92e76a0 subpackages: - pkg/api testImports: diff --git a/go/glide.yaml b/go/glide.yaml index 45d33f60..c4604483 100644 --- a/go/glide.yaml +++ b/go/glide.yaml @@ -19,4 +19,5 @@ import: - package: github.com/go-stack/stack version: v1.6.0 - package: k8s.io/code-generator - version: kubernetes-1.8.5 \ No newline at end of file + version: kubernetes-1.8.5 +- package: github.com/wangkuiyi/candy diff --git a/go/utils/kubeutil/kubeutil.go b/go/utils/kubeutil/kubeutil.go index 261d23e0..50f950de 100644 --- a/go/utils/kubeutil/kubeutil.go +++ b/go/utils/kubeutil/kubeutil.go @@ -31,7 +31,7 @@ func CreateClient(kubeconfig string) (*rest.RESTClient, *kubernetes.Clientset, e return nil, nil, fmt.Errorf("create clientset from config '%s' error: %v", kubeconfig, err) } - edlresource.RegisterTrainingJob(config) + edlresource.RegisterResource(config, &edlresource.TrainingJob{}, &edlresource.TrainingJobList{}) client, err := rest.RESTClientFor(config) if err != nil {