Skip to content

Commit

Permalink
feat: add methods to handle jobs (#940)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyril-corbon authored Jul 16, 2021
1 parent 3c88557 commit 46fba20
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 0 deletions.
16 changes: 16 additions & 0 deletions modules/k8s/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package k8s
import (
"fmt"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
networkingv1beta1 "k8s.io/api/networking/v1beta1"
Expand Down Expand Up @@ -75,6 +76,21 @@ func NewPodNotAvailableError(pod *corev1.Pod) PodNotAvailable {
return PodNotAvailable{pod}
}

// JobNotSucceeded is returned when a Kubernetes job is not Succeeded
type JobNotSucceeded struct {
job *batchv1.Job
}

// Error is a simple function to return a formatted error message as a string
func (err JobNotSucceeded) Error() string {
return fmt.Sprintf("Job %s is not Succeeded", err.job.Name)
}

// NewJobNotSucceeded returnes a JobNotSucceeded when the status of the job is not Succeeded
func NewJobNotSucceeded(job *batchv1.Job) JobNotSucceeded {
return JobNotSucceeded{job}
}

// ServiceNotAvailable is returned when a Kubernetes service is not yet available to accept traffic.
type ServiceNotAvailable struct {
service *corev1.Service
Expand Down
93 changes: 93 additions & 0 deletions modules/k8s/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package k8s

import (
"context"
"fmt"
"time"

"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/gruntwork-io/terratest/modules/logger"
"github.com/gruntwork-io/terratest/modules/retry"
"github.com/gruntwork-io/terratest/modules/testing"
)

// ListJobs will look for Jobs in the given namespace that match the given filters and return them. This will fail the
// test if there is an error.
func ListJobs(t testing.TestingT, options *KubectlOptions, filters metav1.ListOptions) []batchv1.Job {
jobs, err := ListJobsE(t, options, filters)
require.NoError(t, err)
return jobs
}

// ListJobsE will look for jobs in the given namespace that match the given filters and return them.
func ListJobsE(t testing.TestingT, options *KubectlOptions, filters metav1.ListOptions) ([]batchv1.Job, error) {
clientset, err := GetKubernetesClientFromOptionsE(t, options)
if err != nil {
return nil, err
}

resp, err := clientset.BatchV1().Jobs(options.Namespace).List(context.Background(), filters)
if err != nil {
return nil, err
}
return resp.Items, nil
}

// GetJob returns a Kubernetes job resource in the provided namespace with the given name. This will
// fail the test if there is an error.
func GetJob(t testing.TestingT, options *KubectlOptions, jobName string) *batchv1.Job {
job, err := GetJobE(t, options, jobName)
require.NoError(t, err)
return job
}

// GetJobE returns a Kubernetes job resource in the provided namespace with the given name.
func GetJobE(t testing.TestingT, options *KubectlOptions, jobName string) (*batchv1.Job, error) {
clientset, err := GetKubernetesClientFromOptionsE(t, options)
if err != nil {
return nil, err
}
return clientset.BatchV1().Jobs(options.Namespace).Get(context.Background(), jobName, metav1.GetOptions{})
}

// WaitUntilJobSucceed waits until requested job is suceeded, retrying the check for the specified amount of times, sleeping
// for the provided duration between each try. This will fail the test if there is an error or if the check times out.
func WaitUntilJobSucceed(t testing.TestingT, options *KubectlOptions, jobName string, retries int, sleepBetweenRetries time.Duration) {
require.NoError(t, WaitUntilJobSucceedE(t, options, jobName, retries, sleepBetweenRetries))
}

// WaitUntilJobSucceedE waits until requested job is succeeded, retrying the check for the specified amount of times, sleeping
// for the provided duration between each try.
func WaitUntilJobSucceedE(t testing.TestingT, options *KubectlOptions, jobName string, retries int, sleepBetweenRetries time.Duration) error {
statusMsg := fmt.Sprintf("Wait for job %s to be provisioned.", jobName)
message, err := retry.DoWithRetryE(
t,
statusMsg,
retries,
sleepBetweenRetries,
func() (string, error) {
job, err := GetJobE(t, options, jobName)
if err != nil {
return "", err
}
if !IsJobSucceeded(job) {
return "", NewJobNotSucceeded(job)
}
return "Job is now Succeeded", nil
},
)
if err != nil {
logger.Logf(t, "Timed out waiting for Job to be provisioned: %s", err)
return err
}
logger.Logf(t, message)
return nil
}

// IsJobSucceeded returns true if all containers in the job are completed & succeeded
func IsJobSucceeded(job *batchv1.Job) bool {
return job.Status.Active == 0 && job.Status.Failed == 0
}
135 changes: 135 additions & 0 deletions modules/k8s/job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// +build kubeall kubernetes

// NOTE: we have build tags to differentiate kubernetes tests from non-kubernetes tests. This is done because minikube
// is heavy and can interfere with docker related tests in terratest. Specifically, many of the tests start to fail with
// `connection refused` errors from `minikube`. To avoid overloading the system, we run the kubernetes tests and helm
// tests separately from the others. This may not be necessary if you have a sufficiently powerful machine. We
// recommend at least 4 cores and 16GB of RAM if you want to run all the tests together.

package k8s

import (
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/gruntwork-io/terratest/modules/random"
)

func TestListJobsReturnsJobsInNamespace(t *testing.T) {
t.Parallel()

uniqueID := strings.ToLower(random.UniqueId())
options := NewKubectlOptions("", "", uniqueID)
configData := fmt.Sprintf(EXAMPLE_JOB_YAML_TEMPLATE, uniqueID, uniqueID)
defer KubectlDeleteFromString(t, options, configData)
KubectlApplyFromString(t, options, configData)

jobs := ListJobs(t, options, metav1.ListOptions{})
require.Equal(t, len(jobs), 1)
job := jobs[0]
require.Equal(t, job.Name, "pi-job")
require.Equal(t, job.Namespace, uniqueID)
}

func TestGetJobEReturnsErrorForNonExistantJob(t *testing.T) {
t.Parallel()

options := NewKubectlOptions("", "", "default")
_, err := GetJobE(t, options, "pi-job")
require.Error(t, err)
}

func TestGetJobEReturnsCorrectJobInCorrectNamespace(t *testing.T) {
t.Parallel()

uniqueID := strings.ToLower(random.UniqueId())
options := NewKubectlOptions("", "", uniqueID)
configData := fmt.Sprintf(EXAMPLE_JOB_YAML_TEMPLATE, uniqueID, uniqueID)
defer KubectlDeleteFromString(t, options, configData)
KubectlApplyFromString(t, options, configData)

job := GetJob(t, options, "pi-job")
require.Equal(t, job.Name, "pi-job")
require.Equal(t, job.Namespace, uniqueID)
}

func TestWaitUntilJobSucceedReturnsSuccessfully(t *testing.T) {
t.Parallel()

uniqueID := strings.ToLower(random.UniqueId())
options := NewKubectlOptions("", "", uniqueID)
configData := fmt.Sprintf(EXAMPLE_JOB_YAML_TEMPLATE, uniqueID, uniqueID)
defer KubectlDeleteFromString(t, options, configData)
KubectlApplyFromString(t, options, configData)

WaitUntilJobSucceed(t, options, "pi-job", 60, 1*time.Second)
}

func TestIsJobSucceeded(t *testing.T) {
t.Parallel()

cases := []struct {
title string
job *batchv1.Job
expectedResult bool
}{
{
title: "TestIsJobSucceeded",
job: &batchv1.Job{
Status: batchv1.JobStatus{
Succeeded: 1,
Failed: 0,
Active: 0,
},
},
expectedResult: true,
},
{
title: "TestIsJobFailed",
job: &batchv1.Job{
Status: batchv1.JobStatus{
Failed: 1,
Active: 0,
Succeeded: 1,
},
},
expectedResult: false,
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.title, func(t *testing.T) {
t.Parallel()
actualResult := IsJobSucceeded(tc.job)
require.Equal(t, tc.expectedResult, actualResult)
})
}
}

const EXAMPLE_JOB_YAML_TEMPLATE = `---
apiVersion: v1
kind: Namespace
metadata:
name: %s
---
apiVersion: batch/v1
kind: Job
metadata:
name: pi-job
namespace: %s
spec:
template:
spec:
containers:
- name: pi
image: perl
command: ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"]
restartPolicy: Never
backoffLimit: 4
`

0 comments on commit 46fba20

Please sign in to comment.