From ccaca8b280e935d254d35771fe14b3eaa55b72d8 Mon Sep 17 00:00:00 2001 From: Cyril Corbon Date: Wed, 16 Jun 2021 23:59:58 +0200 Subject: [PATCH] feat: add methods to handle jobs --- modules/k8s/errors.go | 16 +++++ modules/k8s/job.go | 93 ++++++++++++++++++++++++++++ modules/k8s/job_test.go | 130 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 239 insertions(+) create mode 100644 modules/k8s/job.go create mode 100644 modules/k8s/job_test.go diff --git a/modules/k8s/errors.go b/modules/k8s/errors.go index 7ecd986bf6..2352da8c34 100644 --- a/modules/k8s/errors.go +++ b/modules/k8s/errors.go @@ -3,6 +3,7 @@ package k8s import ( "fmt" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" networkingv1beta1 "k8s.io/api/networking/v1beta1" @@ -75,6 +76,21 @@ func NewPodNotAvailableError(pod *corev1.Pod) PodNotAvailable { return PodNotAvailable{pod} } +//JobNotSucceeded is returned when a Kubernetes job is not Succeeded +type JobNotSucceeded struct { + job *batchv1.Job +} + +// Error is a simple function to return a formatted error message as a string +func (err JobNotSucceeded) Error() string { + return fmt.Sprintf("Job %s is not Succeeded", err.job.Name) +} + +// NewJobNotSucceeded returnes a JobNotSucceeded when the status of the job is not Succeeded +func NewJobNotSucceeded(job *batchv1.Job) JobNotSucceeded { + return JobNotSucceeded{job} +} + // ServiceNotAvailable is returned when a Kubernetes service is not yet available to accept traffic. type ServiceNotAvailable struct { service *corev1.Service diff --git a/modules/k8s/job.go b/modules/k8s/job.go new file mode 100644 index 0000000000..ac441a625d --- /dev/null +++ b/modules/k8s/job.go @@ -0,0 +1,93 @@ +package k8s + +import ( + "context" + "fmt" + "time" + + "github.com/stretchr/testify/require" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/gruntwork-io/terratest/modules/logger" + "github.com/gruntwork-io/terratest/modules/retry" + "github.com/gruntwork-io/terratest/modules/testing" +) + +// ListJobs will look for Jobs in the given namespace that match the given filters and return them. This will fail the +// test if there is an error. +func ListJobs(t testing.TestingT, options *KubectlOptions, filters metav1.ListOptions) []batchv1.Job { + jobs, err := ListJobsE(t, options, filters) + require.NoError(t, err) + return jobs +} + +// ListJobsE will look for jobs in the given namespace that match the given filters and return them. +func ListJobsE(t testing.TestingT, options *KubectlOptions, filters metav1.ListOptions) ([]batchv1.Job, error) { + clientset, err := GetKubernetesClientFromOptionsE(t, options) + if err != nil { + return nil, err + } + + resp, err := clientset.BatchV1().Jobs(options.Namespace).List(context.Background(), filters) + if err != nil { + return nil, err + } + return resp.Items, nil +} + +// GetJob returns a Kubernetes job resource in the provided namespace with the given name. This will +// fail the test if there is an error. +func GetJob(t testing.TestingT, options *KubectlOptions, jobName string) *batchv1.Job { + job, err := GetJobE(t, options, jobName) + require.NoError(t, err) + return job +} + +// GetJobE returns a Kubernetes job resource in the provided namespace with the given name. +func GetJobE(t testing.TestingT, options *KubectlOptions, jobName string) (*batchv1.Job, error) { + clientset, err := GetKubernetesClientFromOptionsE(t, options) + if err != nil { + return nil, err + } + return clientset.BatchV1().Jobs(options.Namespace).Get(context.Background(), jobName, metav1.GetOptions{}) +} + +// WaitUntilJobSucceed waits until job suceeded is superior to 0, retrying the check for the specified amount of times, sleeping +// for the provided duration between each try. This will fail the test if there is an error or if the check times out. +func WaitUntilJobSucceed(t testing.TestingT, options *KubectlOptions, jobName string, retries int, sleepBetweenRetries time.Duration) { + require.NoError(t, WaitUntilJobSucceedE(t, options, jobName, retries, sleepBetweenRetries)) +} + +// WaitUntilJobSucceedE waits until job suceeded is superior to 0, retrying the check for the specified amount of times, sleeping +// for the provided duration between each try. +func WaitUntilJobSucceedE(t testing.TestingT, options *KubectlOptions, jobName string, retries int, sleepBetweenRetries time.Duration) error { + statusMsg := fmt.Sprintf("Wait for job %s to be provisioned.", jobName) + message, err := retry.DoWithRetryE( + t, + statusMsg, + retries, + sleepBetweenRetries, + func() (string, error) { + job, err := GetJobE(t, options, jobName) + if err != nil { + return "", err + } + if !IsJobSucceeded(job) { + return "", NewJobNotSucceeded(job) + } + return "Job is now Succeeded", nil + }, + ) + if err != nil { + logger.Logf(t, "Timedout waiting for Job to be provisioned: %s", err) + return err + } + logger.Logf(t, message) + return nil +} + +// IsJobSucceeded returns true if the Status Succeeded of the job is superior to 0 +func IsJobSucceeded(job *batchv1.Job) bool { + return job.Status.Succeeded > 0 +} diff --git a/modules/k8s/job_test.go b/modules/k8s/job_test.go new file mode 100644 index 0000000000..efc7d3e67d --- /dev/null +++ b/modules/k8s/job_test.go @@ -0,0 +1,130 @@ +// +build kubeall kubernetes + +// NOTE: we have build tags to differentiate kubernetes tests from non-kubernetes tests. This is done because minikube +// is heavy and can interfere with docker related tests in terratest. Specifically, many of the tests start to fail with +// `connection refused` errors from `minikube`. To avoid overloading the system, we run the kubernetes tests and helm +// tests separately from the others. This may not be necessary if you have a sufficiently powerful machine. We +// recommend at least 4 cores and 16GB of RAM if you want to run all the tests together. + +package k8s + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/gruntwork-io/terratest/modules/random" +) + +func TestListJobsReturnsJobsInNamespace(t *testing.T) { + t.Parallel() + + uniqueID := strings.ToLower(random.UniqueId()) + options := NewKubectlOptions("", "", uniqueID) + configData := fmt.Sprintf(EXAMPLE_JOB_YAML_TEMPLATE, uniqueID, uniqueID) + defer KubectlDeleteFromString(t, options, configData) + KubectlApplyFromString(t, options, configData) + + jobs := ListJobs(t, options, metav1.ListOptions{}) + require.Equal(t, len(jobs), 1) + job := jobs[0] + require.Equal(t, job.Name, "pi-job") + require.Equal(t, job.Namespace, uniqueID) +} + +func TestGetJobEReturnsErrorForNonExistantJob(t *testing.T) { + t.Parallel() + + options := NewKubectlOptions("", "", "default") + _, err := GetJobE(t, options, "pi-job") + require.Error(t, err) +} +func TestGetJobEReturnsCorrectJobInCorrectNamespace(t *testing.T) { + t.Parallel() + + uniqueID := strings.ToLower(random.UniqueId()) + options := NewKubectlOptions("", "", uniqueID) + configData := fmt.Sprintf(EXAMPLE_JOB_YAML_TEMPLATE, uniqueID, uniqueID) + defer KubectlDeleteFromString(t, options, configData) + KubectlApplyFromString(t, options, configData) + + job := GetJob(t, options, "pi-job") + require.Equal(t, job.Name, "pi-job") + require.Equal(t, job.Namespace, uniqueID) +} + +func TestWaitUntilJobSucceedReturnsSuccessfully(t *testing.T) { + t.Parallel() + + uniqueID := strings.ToLower(random.UniqueId()) + options := NewKubectlOptions("", "", uniqueID) + configData := fmt.Sprintf(EXAMPLE_JOB_YAML_TEMPLATE, uniqueID, uniqueID) + defer KubectlDeleteFromString(t, options, configData) + KubectlApplyFromString(t, options, configData) + + WaitUntilJobSucceed(t, options, "pi-job", 60, 1*time.Second) +} + +func TestIsJobSucceeded(t *testing.T) { + t.Parallel() + + cases := []struct { + title string + job *batchv1.Job + expectedResult bool + }{ + { + title: "TestIsJobSucceeded", + job: &batchv1.Job{ + Status: batchv1.JobStatus{ + Succeeded: 1, + }, + }, + expectedResult: true, + }, + { + title: "TestIsJobFailed", + job: &batchv1.Job{ + Status: batchv1.JobStatus{ + Failed: 0, + }, + }, + expectedResult: false, + }, + } + for _, tc := range cases { + tc := tc + t.Run(tc.title, func(t *testing.T) { + t.Parallel() + actualResult := IsJobSucceeded(tc.job) + require.Equal(t, tc.expectedResult, actualResult) + }) + } +} + +const EXAMPLE_JOB_YAML_TEMPLATE = `--- +apiVersion: v1 +kind: Namespace +metadata: + name: %s +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: pi-job + namespace: %s +spec: + template: + spec: + containers: + - name: pi + image: perl + command: ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"] + restartPolicy: Never + backoffLimit: 4 +`