Skip to content

Commit

Permalink
Move away from Jobs to Pods
Browse files Browse the repository at this point in the history
The test-operator is using Jobs to spawn test pods even though it does
not use any features of this k8s object. Plus usage of the Jobs requires
creation of ServiceAccount in the target namespaces. In order to be able
to create a new, SA the test-oprator has to have a rights to create new
roles and rolebindings which in our case makes the attack surface
larger.

This patch drops the usage of Jobs and moves to Pods.

Depends-On: openstack-k8s-operators/ci-framework#2604
  • Loading branch information
lpiwowar committed Dec 13, 2024
1 parent b2b1b46 commit 1a21c19
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 290 deletions.
25 changes: 7 additions & 18 deletions controllers/ansibletest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"strconv"
"time"

"reflect"

Expand All @@ -30,12 +29,10 @@ import (
"github.com/openstack-k8s-operators/lib-common/modules/common/condition"
"github.com/openstack-k8s-operators/lib-common/modules/common/env"
"github.com/openstack-k8s-operators/lib-common/modules/common/helper"
"github.com/openstack-k8s-operators/lib-common/modules/common/job"
common_rbac "github.com/openstack-k8s-operators/lib-common/modules/common/rbac"
"github.com/openstack-k8s-operators/test-operator/api/v1beta1"
testv1beta1 "github.com/openstack-k8s-operators/test-operator/api/v1beta1"
v1beta1 "github.com/openstack-k8s-operators/test-operator/api/v1beta1"
"github.com/openstack-k8s-operators/test-operator/pkg/ansibletest"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -161,7 +158,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Log.Info(InfoTestingCompleted)
return ctrl.Result{}, nil

case CreateFirstJob:
case CreateFirstPod:
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
Expand All @@ -170,7 +167,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))

case CreateNextJob:
case CreateNextPod:
// Confirm that we still hold the lock. This is useful to check if for
// example somebody / something deleted the lock and it got claimed by
// another instance. This is considered to be an error state.
Expand Down Expand Up @@ -213,7 +210,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// Create a new job
mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle")
jobName := r.GetJobName(instance, nextWorkflowStep)
jobName := r.GetPodName(instance, nextWorkflowStep)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep)
logsPVCName := r.GetPVCLogsName(instance, 0)
containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance)
Expand Down Expand Up @@ -245,8 +242,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return rbacResult, nil
}
// Service account, role, binding - end

jobDef := ansibletest.Job(
podDef := ansibletest.Pod(
instance,
serviceLabels,
jobName,
Expand All @@ -258,15 +254,8 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
containerImage,
privileged,
)
ansibleTestsJob := job.NewJob(
jobDef,
testv1beta1.ConfigHash,
true,
time.Duration(5)*time.Second,
"",
)

ctrlResult, err = ansibleTestsJob.DoJob(ctx, helper)
ctrlResult, err = r.CreatePod(ctx, *helper, podDef)
if err != nil {
// Creation of the ansibleTests job was not successfull.
// Release the lock and allow other controllers to spawn
Expand Down Expand Up @@ -299,7 +288,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
func (r *AnsibleTestReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&testv1beta1.AnsibleTest{}).
Owns(&batchv1.Job{}).
Owns(&corev1.Pod{}).
Owns(&corev1.Secret{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
Expand Down
118 changes: 80 additions & 38 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/openstack-k8s-operators/lib-common/modules/common/util"
v1beta1 "github.com/openstack-k8s-operators/test-operator/api/v1beta1"
"gopkg.in/yaml.v3"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -27,6 +26,7 @@ import (
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

const (
Expand Down Expand Up @@ -80,13 +80,13 @@ const (
// to change
Wait = iota

// CreateFirstJob indicates that the Reconcile loop should create the first job
// CreateFirstPod indicates that the Reconcile loop should create the first job
// either specified in the .Spec section or in the .Spec.Workflow section.
CreateFirstJob
CreateFirstPod

// CreateNextJob indicates that the Reconcile loop should create a next job
// CreateNextPod indicates that the Reconcile loop should create a next job
// specified in the .Spec.Workflow section (if .Spec.Workflow is defined)
CreateNextJob
CreateNextPod

// EndTesting indicates that all jobs have already finished. The Reconcile
// loop should end the testing and release resources that are required to
Expand All @@ -97,59 +97,101 @@ const (
Failure
)

// GetPod returns pod that has a specific name (podName) in a given namespace
// (podNamespace).
func (r *Reconciler) GetPod(
ctx context.Context,
h helper.Helper,
podName string,
podNamespace string,
) (*corev1.Pod, error) {
pod := &corev1.Pod{}
objectKey := client.ObjectKey{Namespace: podNamespace, Name: podName}
if err := r.Client.Get(ctx, objectKey, pod); err != nil {
return pod, err
}

return pod, nil
}

// CreatePod creates a pod based on a spec provided via PodSpec.
func (r *Reconciler) CreatePod(
ctx context.Context,
h helper.Helper,
podSpec *corev1.Pod,
) (ctrl.Result, error) {
_, err := r.GetPod(ctx, h, podSpec.Name, podSpec.Namespace)
if err == nil {
return ctrl.Result{}, nil
} else if !k8s_errors.IsNotFound(err) {
return ctrl.Result{}, err
}

err = controllerutil.SetControllerReference(h.GetBeforeObject(), podSpec, r.GetScheme())
if err != nil {
return ctrl.Result{}, err
}

if err := r.Client.Create(ctx, podSpec); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

// NextAction indicates what action needs to be performed by the Reconcile loop
// based on the current state of the OpenShift cluster.
func (r *Reconciler) NextAction(
ctx context.Context,
instance client.Object,
workflowLength int,
) (NextAction, int, error) {
// Get the latest job. The latest job is job with the highest value stored
// Get the latest pod. The latest pod is pod with the highest value stored
// in workflowStep label
workflowStepIdx := 0
lastJob, err := r.GetLastJob(ctx, instance)
lastPod, err := r.GetLastPod(ctx, instance)
if err != nil {
return Failure, workflowStepIdx, err
}

// If there is a job associated with the current instance.
if lastJob != nil {
workflowStepIdx, err := strconv.Atoi(lastJob.Labels[workflowStepLabel])
// If there is a pod associated with the current instance.
if lastPod != nil {
workflowStepIdx, err := strconv.Atoi(lastPod.Labels[workflowStepLabel])
if err != nil {
return Failure, workflowStepIdx, err
}

// If the last job is not in Failed or Succeded state -> Wait
lastJobFinished := (lastJob.Status.Failed + lastJob.Status.Succeeded) > 0
if !lastJobFinished {
// If the last pod is not in Failed or Succeded state -> Wait
lastPodFinished := lastPod.Status.Phase == corev1.PodFailed || lastPod.Status.Phase == corev1.PodSucceeded
if !lastPodFinished {
return Wait, workflowStepIdx, nil
}

// If the last job is in Failed or Succeeded state and it is NOT the last
// job which was supposed to be created -> CreateNextJob
if lastJobFinished && !isLastJobIndex(workflowStepIdx, workflowLength) {
// If the last pod is in Failed or Succeeded state and it is NOT the last
// pod which was supposed to be created -> CreateNextPod
if lastPodFinished && !isLastPodIndex(workflowStepIdx, workflowLength) {
workflowStepIdx++
return CreateNextJob, workflowStepIdx, nil
return CreateNextPod, workflowStepIdx, nil
}

// Otherwise if the job is in Failed or Succeded stated and it IS the
// last job -> EndTesting
if lastJobFinished && isLastJobIndex(workflowStepIdx, workflowLength) {
// Otherwise if the pod is in Failed or Succeded stated and it IS the
// last pod -> EndTesting
if lastPodFinished && isLastPodIndex(workflowStepIdx, workflowLength) {
return EndTesting, workflowStepIdx, nil
}
}

// If there is not any job associated with the instance -> createFirstJob
if lastJob == nil {
return CreateFirstJob, workflowStepIdx, nil
// If there is not any pod associated with the instance -> createFirstPod
if lastPod == nil {
return CreateFirstPod, workflowStepIdx, nil
}

return Failure, workflowStepIdx, nil
}

// isLastJobIndex returns true when jobIndex is the index of the last job that
// isLastPodIndex returns true when jobIndex is the index of the last job that
// should be executed. Otherwise the return value is false.
func isLastJobIndex(jobIndex int, workflowLength int) bool {
func isLastPodIndex(jobIndex int, workflowLength int) bool {
switch workflowLength {
case 0:
return jobIndex == workflowLength
Expand All @@ -160,26 +202,26 @@ func isLastJobIndex(jobIndex int, workflowLength int) bool {

// GetLastJob returns job associated with an instance which has the highest value
// stored in the workflowStep label
func (r *Reconciler) GetLastJob(
func (r *Reconciler) GetLastPod(
ctx context.Context,
instance client.Object,
) (*batchv1.Job, error) {
) (*corev1.Pod, error) {
labels := map[string]string{instanceNameLabel: instance.GetName()}
namespaceListOpt := client.InNamespace(instance.GetNamespace())
labelsListOpt := client.MatchingLabels(labels)
jobList := &batchv1.JobList{}
err := r.Client.List(ctx, jobList, namespaceListOpt, labelsListOpt)
podList := &corev1.PodList{}
err := r.Client.List(ctx, podList, namespaceListOpt, labelsListOpt)
if err != nil {
return nil, err
}

var maxJob *batchv1.Job
var maxJob *corev1.Pod
maxJobWorkflowStep := 0

for _, job := range jobList.Items {
for _, job := range podList.Items {
workflowStep, err := strconv.Atoi(job.Labels[workflowStepLabel])
if err != nil {
return &batchv1.Job{}, err
return &corev1.Pod{}, err
}

if workflowStep >= maxJobWorkflowStep {
Expand Down Expand Up @@ -307,7 +349,7 @@ func (r *Reconciler) GetContainerImage(
return "", nil
}

func (r *Reconciler) GetJobName(instance interface{}, workflowStepNum int) string {
func (r *Reconciler) GetPodName(instance interface{}, workflowStepNum int) string {
if typedInstance, ok := instance.(*v1beta1.Tobiko); ok {
if len(typedInstance.Spec.Workflow) == 0 || workflowStepNum == workflowStepNumInvalid {
return typedInstance.Name
Expand Down Expand Up @@ -552,11 +594,11 @@ func (r *Reconciler) ReleaseLock(ctx context.Context, instance client.Object) (b
return false, errors.New("failed to delete test-operator-lock")
}

func (r *Reconciler) JobExists(ctx context.Context, instance client.Object, workflowStepNum int) bool {
job := &batchv1.Job{}
jobName := r.GetJobName(instance, workflowStepNum)
objectKey := client.ObjectKey{Namespace: instance.GetNamespace(), Name: jobName}
err := r.Client.Get(ctx, objectKey, job)
func (r *Reconciler) PodExists(ctx context.Context, instance client.Object, workflowStepNum int) bool {
pod := &corev1.Pod{}
podName := r.GetPodName(instance, workflowStepNum)
objectKey := client.ObjectKey{Namespace: instance.GetNamespace(), Name: podName}
err := r.Client.Get(ctx, objectKey, pod)
if err != nil && k8s_errors.IsNotFound(err) {
return false
}
Expand Down
23 changes: 6 additions & 17 deletions controllers/horizontest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/go-logr/logr"
"github.com/openstack-k8s-operators/lib-common/modules/common"
"github.com/openstack-k8s-operators/lib-common/modules/common/condition"
"github.com/openstack-k8s-operators/lib-common/modules/common/env"
"github.com/openstack-k8s-operators/lib-common/modules/common/helper"
"github.com/openstack-k8s-operators/lib-common/modules/common/job"
common_rbac "github.com/openstack-k8s-operators/lib-common/modules/common/rbac"
testv1beta1 "github.com/openstack-k8s-operators/test-operator/api/v1beta1"
"github.com/openstack-k8s-operators/test-operator/pkg/horizontest"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -154,7 +151,7 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Log.Info(InfoTestingCompleted)
return ctrl.Result{}, nil

case CreateFirstJob:
case CreateFirstPod:
lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel)
if !lockAcquired {
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
Expand All @@ -163,7 +160,7 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))

case CreateNextJob:
case CreateNextPod:
// Confirm that we still hold the lock. This is useful to check if for
// example somebody / something deleted the lock and it got claimed by
// another instance. This is considered to be an error state.
Expand Down Expand Up @@ -224,7 +221,7 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// Prepare HorizonTest env vars
envVars := r.PrepareHorizonTestEnvVars(instance)
jobName := r.GetJobName(instance, 0)
jobName := r.GetPodName(instance, 0)
logsPVCName := r.GetPVCLogsName(instance, 0)
containerImage, err := r.GetContainerImage(ctx, instance.Spec.ContainerImage, instance)
if err != nil {
Expand All @@ -240,8 +237,7 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return rbacResult, nil
}
// Service account, role, binding - end

jobDef := horizontest.Job(
podDef := horizontest.Pod(
instance,
serviceLabels,
jobName,
Expand All @@ -252,15 +248,8 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
envVars,
containerImage,
)
horizontestJob := job.NewJob(
jobDef,
testv1beta1.ConfigHash,
true,
time.Duration(5)*time.Second,
"",
)

ctrlResult, err = horizontestJob.DoJob(ctx, helper)
ctrlResult, err = r.CreatePod(ctx, *helper, podDef)
if err != nil {
instance.Status.Conditions.Set(condition.FalseCondition(
condition.DeploymentReadyCondition,
Expand All @@ -286,7 +275,7 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
func (r *HorizonTestReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&testv1beta1.HorizonTest{}).
Owns(&batchv1.Job{}).
Owns(&corev1.Pod{}).
Owns(&corev1.Secret{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
Expand Down
Loading

0 comments on commit 1a21c19

Please sign in to comment.