Skip to content

Commit

Permalink
Lint fix for apis, cache, job and state package of controller
Browse files Browse the repository at this point in the history
  • Loading branch information
nikita15p committed Jun 24, 2019
1 parent 1c55eb9 commit eba782d
Show file tree
Hide file tree
Showing 14 changed files with 112 additions and 86 deletions.
4 changes: 0 additions & 4 deletions hack/.golint_failures
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
volcano.sh/volcano/pkg/admission
volcano.sh/volcano/pkg/controllers/apis
volcano.sh/volcano/pkg/controllers/cache
volcano.sh/volcano/pkg/controllers/job
volcano.sh/volcano/pkg/controllers/job/state
volcano.sh/volcano/test/e2e
9 changes: 9 additions & 0 deletions pkg/controllers/apis/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)

//JobInfo struct
type JobInfo struct {
Namespace string
Name string
Expand All @@ -32,6 +33,7 @@ type JobInfo struct {
Pods map[string]map[string]*v1.Pod
}

//Clone function clones the k8s pod values to the JobInfo struct
func (ji *JobInfo) Clone() *JobInfo {
job := &JobInfo{
Namespace: ji.Namespace,
Expand All @@ -51,12 +53,15 @@ func (ji *JobInfo) Clone() *JobInfo {
return job
}

//SetJob sets the volcano jobs values to the JobInfo struct
func (ji *JobInfo) SetJob(job *v1alpha1.Job) {
ji.Name = job.Name
ji.Namespace = job.Namespace
ji.Job = job
}

//AddPod adds the k8s pod object values to the Pods field
//of JobStruct if it doesn't exist. Otherwise it throws error
func (ji *JobInfo) AddPod(pod *v1.Pod) error {
taskName, found := pod.Annotations[v1alpha1.TaskSpecKey]
if !found {
Expand All @@ -81,6 +86,7 @@ func (ji *JobInfo) AddPod(pod *v1.Pod) error {
return nil
}

//UpdatePod updates the k8s pod object values to the existing pod
func (ji *JobInfo) UpdatePod(pod *v1.Pod) error {
taskName, found := pod.Annotations[v1alpha1.TaskSpecKey]
if !found {
Expand All @@ -105,6 +111,7 @@ func (ji *JobInfo) UpdatePod(pod *v1.Pod) error {
return nil
}

//DeletePod deletes the given k8s pod from the JobInfo struct
func (ji *JobInfo) DeletePod(pod *v1.Pod) error {
taskName, found := pod.Annotations[v1alpha1.TaskSpecKey]
if !found {
Expand All @@ -127,6 +134,7 @@ func (ji *JobInfo) DeletePod(pod *v1.Pod) error {
return nil
}

//Request struct
type Request struct {
Namespace string
JobName string
Expand All @@ -138,6 +146,7 @@ type Request struct {
JobVersion int32
}

//String function returns the request in string format
func (r Request) String() string {
return fmt.Sprintf(
"Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d",
Expand Down
20 changes: 12 additions & 8 deletions pkg/controllers/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ func keyFn(ns, name string) string {
return fmt.Sprintf("%s/%s", ns, name)
}

//JobKeyByName gets the key for the job name
func JobKeyByName(namespace string, name string) string {
return keyFn(namespace, name)
}

//JobKeyByReq gets the key for the job request
func JobKeyByReq(req *apis.Request) string {
return keyFn(req.Namespace, req.JobName)
}

//JobKey gets the "ns"/"name" format of the given job
func JobKey(job *v1alpha1.Job) string {
return keyFn(job.Namespace, job.Name)
}
Expand All @@ -66,6 +69,7 @@ func jobKeyOfPod(pod *v1.Pod) (string, error) {
return keyFn(pod.Namespace, jobName), nil
}

//New gets the job Cache
func New() Cache {
return &jobCache{
jobs: map[string]*apis.JobInfo{},
Expand Down Expand Up @@ -133,11 +137,11 @@ func (jc *jobCache) Update(obj *v1alpha1.Job) error {
defer jc.Unlock()

key := JobKey(obj)
if job, found := jc.jobs[key]; !found {
job, found := jc.jobs[key]
if !found {
return fmt.Errorf("failed to find job <%v>", key)
} else {
job.Job = obj
}
job.Job = obj

return nil
}
Expand All @@ -147,12 +151,12 @@ func (jc *jobCache) Delete(obj *v1alpha1.Job) error {
defer jc.Unlock()

key := JobKey(obj)
if jobInfo, found := jc.jobs[key]; !found {
jobInfo, found := jc.jobs[key]
if !found {
return fmt.Errorf("failed to find job <%v>", key)
} else {
jobInfo.Job = nil
jc.deleteJob(jobInfo)
}
jobInfo.Job = nil
jc.deleteJob(jobInfo)

return nil
}
Expand Down Expand Up @@ -261,7 +265,7 @@ func (jc *jobCache) TaskCompleted(jobKey, taskName string) bool {

for _, pod := range taskPods {
if pod.Status.Phase == v1.PodSucceeded {
completed += 1
completed++
}
}
return completed >= taskReplicas
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"volcano.sh/volcano/pkg/controllers/apis"
)

//Cache Interface
type Cache interface {
Run(stopCh <-chan struct{})

Expand Down
60 changes: 29 additions & 31 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,16 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
}

// Update Job status
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
} else {
if e := cc.cache.Update(job); e != nil {
glog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}
}
if e := cc.cache.Update(job); e != nil {
glog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}

// Delete PodGroup
Expand Down Expand Up @@ -165,7 +165,7 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.Update
return err
}

err, job := cc.createJobIOIfNotExist(job)
job, err := cc.createJobIOIfNotExist(job)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PVCError),
fmt.Sprintf("Failed to create PVC, err: %v", err))
Expand All @@ -178,16 +178,16 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.Update
}
}

if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
job, err = cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
} else {
if err := cc.cache.Update(job); err != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, err)
return err
}
}
if err = cc.cache.Update(job); err != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, err)
return err
}

return nil
Expand Down Expand Up @@ -330,23 +330,22 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
job.Status.State.LastTransitionTime = metav1.Now()
}
}

if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
} else {
if e := cc.cache.Update(job); e != nil {
glog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}
}
if e := cc.cache.Update(job); e != nil {
glog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}

return nil
}

func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) {
func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (*vkv1.Job, error) {
// If PVC does not exist, create them for Job.
var needUpdate, nameExist bool
volumes := job.Spec.Volumes
Expand All @@ -359,7 +358,7 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) {
vcName = vkjobhelpers.MakeVolumeClaimName(job.Name)
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return err, nil
return nil, err
}
if exist {
continue
Expand All @@ -371,7 +370,7 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) {
} else {
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return err, nil
return nil, err
}
nameExist = exist
}
Expand All @@ -382,7 +381,7 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) {
}
if volume.VolumeClaim != nil {
if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil {
return err, nil
return nil, err
}
job.Status.ControlledResources["volume-pvc-"+vcName] = vcName
} else {
Expand All @@ -395,12 +394,11 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) {
if err != nil {
glog.Errorf("Failed to update Job %v/%v for volume claim name: %v ",
job.Namespace, job.Name, err)
return err, nil
} else {
return nil, newJob
return nil, err
}
return newJob, err
}
return nil, job
return job, nil
}

func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func TestCreateJobIOIfNotExistFunc(t *testing.T) {
for i, testcase := range testcases {
fakeController := newFakeController()

err, job := fakeController.createJobIOIfNotExist(testcase.Job)
job, err := fakeController.createJobIOIfNotExist(testcase.Job)
if err != testcase.ExpextVal {
t.Errorf("Expected Return value to be : %s, but got: %s in testcase %d", testcase.ExpextVal, err, i)
}
Expand Down
45 changes: 24 additions & 21 deletions pkg/controllers/job/job_controller_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@ import (
func (cc *Controller) pluginOnPodCreate(job *vkv1.Job, pod *v1.Pod) error {
client := vkinterface.PluginClientset{KubeClients: cc.kubeClients}
for name, args := range job.Spec.Plugins {
if pb, found := vkplugin.GetPluginBuilder(name); !found {
pb, found := vkplugin.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
glog.Error(err)
return err
} else {
glog.Infof("Starting to execute plugin at <pluginOnPodCreate>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnPodCreate(pod, job); err != nil {
glog.Errorf("Failed to process on pod create plugin %s, err %v.", name, err)
return err
}
}
glog.Infof("Starting to execute plugin at <pluginOnPodCreate>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnPodCreate(pod, job); err != nil {
glog.Errorf("Failed to process on pod create plugin %s, err %v.", name, err)
return err
}

}
return nil
}
Expand All @@ -52,17 +53,18 @@ func (cc *Controller) pluginOnJobAdd(job *vkv1.Job) error {
job.Status.ControlledResources = make(map[string]string)
}
for name, args := range job.Spec.Plugins {
if pb, found := vkplugin.GetPluginBuilder(name); !found {
pb, found := vkplugin.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
glog.Error(err)
return err
} else {
glog.Infof("Starting to execute plugin at <pluginOnJobAdd>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobAdd(job); err != nil {
glog.Errorf("Failed to process on job add plugin %s, err %v.", name, err)
return err
}
}
glog.Infof("Starting to execute plugin at <pluginOnJobAdd>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobAdd(job); err != nil {
glog.Errorf("Failed to process on job add plugin %s, err %v.", name, err)
return err
}

}

return nil
Expand All @@ -71,17 +73,18 @@ func (cc *Controller) pluginOnJobAdd(job *vkv1.Job) error {
func (cc *Controller) pluginOnJobDelete(job *vkv1.Job) error {
client := vkinterface.PluginClientset{KubeClients: cc.kubeClients}
for name, args := range job.Spec.Plugins {
if pb, found := vkplugin.GetPluginBuilder(name); !found {
pb, found := vkplugin.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
glog.Error(err)
return err
} else {
glog.Infof("Starting to execute plugin at <pluginOnJobDelete>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobDelete(job); err != nil {
glog.Errorf("failed to process on job delete plugin %s, err %v.", name, err)
return err
}
}
glog.Infof("Starting to execute plugin at <pluginOnJobDelete>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobDelete(job); err != nil {
glog.Errorf("failed to process on job delete plugin %s, err %v.", name, err)
return err
}

}

return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
vkjobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
)

//MakePodName append podname,jobname,taskName and index and returns the string
func MakePodName(jobName string, taskName string, index int) string {
return fmt.Sprintf(vkjobhelpers.PodNameFmt, jobName, taskName, index)
}
Expand Down Expand Up @@ -186,12 +187,14 @@ func addResourceList(list, new v1.ResourceList) {
}
}

//TaskPriority structure
type TaskPriority struct {
priority int32

vkv1.TaskSpec
}

//TasksPriority is a slice of TaskPriority
type TasksPriority []TaskPriority

func (p TasksPriority) Len() int { return len(p) }
Expand Down
Loading

0 comments on commit eba782d

Please sign in to comment.