From 963ae48a3fb2d98092e017d56b44f0658ce31ff3 Mon Sep 17 00:00:00 2001 From: Nikola Jokic Date: Tue, 16 Apr 2024 12:55:25 +0200 Subject: [PATCH] Include self correction on empty batch and avoid removing pending runners when cluster is busy (#3426) --- cmd/ghalistener/listener/listener.go | 5 + cmd/ghalistener/worker/worker.go | 8 +- .../autoscalingrunnerset_controller.go | 1 + .../ephemeralrunnerset_controller.go | 14 +- .../ephemeralrunnerset_controller_test.go | 672 +++++++++++++----- 5 files changed, 519 insertions(+), 181 deletions(-) diff --git a/cmd/ghalistener/listener/listener.go b/cmd/ghalistener/listener/listener.go index c9fc680190..56da0a8f8e 100644 --- a/cmd/ghalistener/listener/listener.go +++ b/cmd/ghalistener/listener/listener.go @@ -164,6 +164,11 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error { } if msg == nil { + _, err := handler.HandleDesiredRunnerCount(ctx, 0, 0) + if err != nil { + return fmt.Errorf("handling nil message failed: %w", err) + } + continue } diff --git a/cmd/ghalistener/worker/worker.go b/cmd/ghalistener/worker/worker.go index 3622753524..25fb90e183 100644 --- a/cmd/ghalistener/worker/worker.go +++ b/cmd/ghalistener/worker/worker.go @@ -177,12 +177,12 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo "jobsCompleted", jobsCompleted, } - if w.lastPatch == targetRunnerCount && jobsCompleted == 0 { - w.logger.Info("Skipping patch", logValues...) - return targetRunnerCount, nil + if count == 0 && jobsCompleted == 0 { + w.lastPatchID = 0 + } else { + w.lastPatchID++ } - w.lastPatchID++ w.lastPatch = targetRunnerCount original, err := json.Marshal( diff --git a/controllers/actions.github.com/autoscalingrunnerset_controller.go b/controllers/actions.github.com/autoscalingrunnerset_controller.go index 1d476e3705..2ed654e819 100644 --- a/controllers/actions.github.com/autoscalingrunnerset_controller.go +++ b/controllers/actions.github.com/autoscalingrunnerset_controller.go @@ -277,6 +277,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl // need to scale down to 0 err := patch(ctx, r.Client, latestRunnerSet, func(obj *v1alpha1.EphemeralRunnerSet) { obj.Spec.Replicas = 0 + obj.Spec.PatchID = 0 }) if err != nil { log.Error(err, "Failed to patch runner set to set desired count to 0") diff --git a/controllers/actions.github.com/ephemeralrunnerset_controller.go b/controllers/actions.github.com/ephemeralrunnerset_controller.go index 12a22f1dc3..91319de8ac 100644 --- a/controllers/actions.github.com/ephemeralrunnerset_controller.go +++ b/controllers/actions.github.com/ephemeralrunnerset_controller.go @@ -197,7 +197,6 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R log.Error(err, "failed to cleanup finished ephemeral runners") } }() - log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) switch { case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up @@ -208,8 +207,16 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R return ctrl.Result{}, err } - case total > ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario. + case ephemeralRunnerSet.Spec.PatchID > 0 && total >= ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario. + // If ephemeral runner did not yet update the phase to succeeded, but the scale down + // request is issued, we should ignore the scale down request. + // Eventually, the ephemeral runner will be cleaned up on the next patch request, which happens + // on the next batch + case ephemeralRunnerSet.Spec.PatchID == 0 && total > ephemeralRunnerSet.Spec.Replicas: count := total - ephemeralRunnerSet.Spec.Replicas + if count <= 0 { + break + } log.Info("Deleting ephemeral runners (scale down)", "count", count) if err := r.deleteIdleEphemeralRunners( ctx, @@ -428,6 +435,9 @@ func (r *EphemeralRunnerSetReconciler) createProxySecret(ctx context.Context, ep // When this happens, the next reconcile loop will try to delete the remaining ephemeral runners // after we get notified by any of the `v1alpha1.EphemeralRunner.Status` updates. func (r *EphemeralRunnerSetReconciler) deleteIdleEphemeralRunners(ctx context.Context, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, pendingEphemeralRunners, runningEphemeralRunners []*v1alpha1.EphemeralRunner, count int, log logr.Logger) error { + if count <= 0 { + return nil + } runners := newEphemeralRunnerStepper(pendingEphemeralRunners, runningEphemeralRunners) if runners.len() == 0 { log.Info("No pending or running ephemeral runners running at this time for scale down") diff --git a/controllers/actions.github.com/ephemeralrunnerset_controller_test.go b/controllers/actions.github.com/ephemeralrunnerset_controller_test.go index 271a352879..79ad2d6e4e 100644 --- a/controllers/actions.github.com/ephemeralrunnerset_controller_test.go +++ b/controllers/actions.github.com/ephemeralrunnerset_controller_test.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" "encoding/base64" - "errors" "fmt" "net/http" "net/http/httptest" @@ -275,21 +274,18 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { }) Context("When a new EphemeralRunnerSet scale up and down", func() { - It("Should scale only on patch ID change", func() { - created := new(actionsv1alpha1.EphemeralRunnerSet) - err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, created) + It("Should scale up with patch ID 0", func() { + ers := new(actionsv1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") - patchID := 1 - - // Scale up the EphemeralRunnerSet - updated := created.DeepCopy() + updated := ers.DeepCopy() updated.Spec.Replicas = 5 - updated.Spec.PatchID = patchID - err = k8sClient.Update(ctx, updated) + updated.Spec.PatchID = 0 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") - // Wait for the EphemeralRunnerSet to be scaled up runnerList := new(actionsv1alpha1.EphemeralRunnerList) Eventually( func() (int, error) { @@ -298,77 +294,128 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return -1, err } - // Set status to simulate a configured EphemeralRunner - refetch := false - for i, runner := range runnerList.Items { - if runner.Status.RunnerId == 0 { - updatedRunner := runner.DeepCopy() - updatedRunner.Status.Phase = corev1.PodRunning - updatedRunner.Status.RunnerId = i + 100 - err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner)) - Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - refetch = true - } + return len(runnerList.Items), nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") + }) + + It("Should scale up when patch ID changes", func() { + ers := new(actionsv1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + + updated := ers.DeepCopy() + updated.Spec.Replicas = 1 + updated.Spec.PatchID = 0 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") + + runnerList := new(actionsv1alpha1.EphemeralRunnerList) + Eventually( + func() (int, error) { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return -1, err } - if refetch { - err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) - if err != nil { - return -1, err - } + return len(runnerList.Items), nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(1), "1 EphemeralRunner should be created") + + ers = new(actionsv1alpha1.EphemeralRunnerSet) + err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + + updated = ers.DeepCopy() + updated.Spec.Replicas = 2 + updated.Spec.PatchID = 1 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") + + runnerList = new(actionsv1alpha1.EphemeralRunnerList) + Eventually( + func() (int, error) { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return -1, err } return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestInterval, - ).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") + ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") + }) - // Mark one of the EphemeralRunner as finished - finishedRunner := runnerList.Items[4].DeepCopy() - finishedRunner.Status.Phase = corev1.PodSucceeded - err = k8sClient.Status().Patch(ctx, finishedRunner, client.MergeFrom(&runnerList.Items[4])) - Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + It("Should clean up finished ephemeral runner when scaling down", func() { + ers := new(actionsv1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + + updated := ers.DeepCopy() + updated.Spec.Replicas = 2 + updated.Spec.PatchID = 1 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") - // Wait for the finished EphemeralRunner to be set to succeeded + runnerList := new(actionsv1alpha1.EphemeralRunnerList) Eventually( - func() error { - runnerList := new(actionsv1alpha1.EphemeralRunnerList) + func() (int, error) { err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) if err != nil { - return err + return -1, err } - for _, runner := range runnerList.Items { - if runner.Name != finishedRunner.Name { - continue - } + return len(runnerList.Items), nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") - if runner.Status.Phase != corev1.PodSucceeded { - return fmt.Errorf("EphemeralRunner is not finished") - } - // found pod succeeded - return nil + updatedRunner := runnerList.Items[0].DeepCopy() + updatedRunner.Status.Phase = corev1.PodSucceeded + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + + updatedRunner = runnerList.Items[1].DeepCopy() + updatedRunner.Status.Phase = corev1.PodRunning + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + + // Keep the ephemeral runner until the next patch + runnerList = new(actionsv1alpha1.EphemeralRunnerList) + Eventually( + func() (int, error) { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return -1, err } - return errors.New("Finished ephemeral runner is not found") + return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestInterval, - ).Should(Succeed(), "Finished EphemeralRunner should be deleted") + ).Should(BeEquivalentTo(2), "1 EphemeralRunner should be up") - // After one ephemeral runner is finished, simulate job done patch - patchID++ - original := new(actionsv1alpha1.EphemeralRunnerSet) - err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) + // The listener was slower to patch the completed, but we should still have 1 running + ers = new(actionsv1alpha1.EphemeralRunnerSet) + err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") - updated = original.DeepCopy() - updated.Spec.PatchID = patchID - updated.Spec.Replicas = 4 - err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) + + updated = ers.DeepCopy() + updated.Spec.Replicas = 1 + updated.Spec.PatchID = 2 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") - // Only finished ephemeral runner should be deleted runnerList = new(actionsv1alpha1.EphemeralRunnerList) Eventually( func() (int, error) { @@ -377,31 +424,26 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return -1, err } - for _, runner := range runnerList.Items { - if runner.Status.Phase == corev1.PodSucceeded { - return -1, fmt.Errorf("Finished EphemeralRunner should be deleted") - } - } - return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestInterval, - ).Should(BeEquivalentTo(4), "4 EphemeralRunner should be created") + ).Should(BeEquivalentTo(1), "1 Ephemeral runner should be up") + }) - // Scaling down the EphemeralRunnerSet - patchID++ - original = new(actionsv1alpha1.EphemeralRunnerSet) - err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) + It("Should keep finished ephemeral runners until patch id changes", func() { + ers := new(actionsv1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") - updated = original.DeepCopy() - updated.Spec.PatchID = patchID - updated.Spec.Replicas = 3 - err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) + + updated := ers.DeepCopy() + updated.Spec.Replicas = 2 + updated.Spec.PatchID = 1 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") - // Wait for the EphemeralRunnerSet to be scaled down - runnerList = new(actionsv1alpha1.EphemeralRunnerList) + runnerList := new(actionsv1alpha1.EphemeralRunnerList) Eventually( func() (int, error) { err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) @@ -409,96 +451,218 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return -1, err } - // Set status to simulate a configured EphemeralRunner - refetch := false - for i, runner := range runnerList.Items { - if runner.Status.RunnerId == 0 { - updatedRunner := runner.DeepCopy() - updatedRunner.Status.Phase = corev1.PodRunning - updatedRunner.Status.RunnerId = i + 100 - err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner)) - Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - refetch = true - } + return len(runnerList.Items), nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") + + updatedRunner := runnerList.Items[0].DeepCopy() + updatedRunner.Status.Phase = corev1.PodSucceeded + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + + updatedRunner = runnerList.Items[1].DeepCopy() + updatedRunner.Status.Phase = corev1.PodPending + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + + // confirm they are not deleted + runnerList = new(actionsv1alpha1.EphemeralRunnerList) + Consistently( + func() (int, error) { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return -1, err } - if refetch { - err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) - if err != nil { - return -1, err - } + return len(runnerList.Items), nil + }, + 5*time.Second, + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") + }) + + It("Should handle double scale up", func() { + ers := new(actionsv1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + + updated := ers.DeepCopy() + updated.Spec.Replicas = 2 + updated.Spec.PatchID = 1 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") + + runnerList := new(actionsv1alpha1.EphemeralRunnerList) + Eventually( + func() (int, error) { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return -1, err } return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestInterval, - ).Should(BeEquivalentTo(3), "3 EphemeralRunner should be created") + ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") - // We will not scale down runner that is running jobs - runningRunner := runnerList.Items[0].DeepCopy() - runningRunner.Status.JobRequestId = 1000 - err = k8sClient.Status().Patch(ctx, runningRunner, client.MergeFrom(&runnerList.Items[0])) + updatedRunner := runnerList.Items[0].DeepCopy() + updatedRunner.Status.Phase = corev1.PodSucceeded + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0])) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - runningRunner = runnerList.Items[1].DeepCopy() - runningRunner.Status.JobRequestId = 1001 - err = k8sClient.Status().Patch(ctx, runningRunner, client.MergeFrom(&runnerList.Items[1])) + updatedRunner = runnerList.Items[1].DeepCopy() + updatedRunner.Status.Phase = corev1.PodRunning + + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1])) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - // Scale down to 1 while 2 are running - patchID++ - original = new(actionsv1alpha1.EphemeralRunnerSet) - err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) + ers = new(actionsv1alpha1.EphemeralRunnerSet) + err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") - updated = original.DeepCopy() - updated.Spec.PatchID = patchID - updated.Spec.Replicas = 1 - err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) + + updated = ers.DeepCopy() + updated.Spec.Replicas = 3 + updated.Spec.PatchID = 2 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") - // Wait for the EphemeralRunnerSet to be scaled down to 2 since we still have 2 runner running jobs runnerList = new(actionsv1alpha1.EphemeralRunnerList) + // We should have 3 runners, and have no Succeeded ones Eventually( - func() (int, error) { + func() error { err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) if err != nil { - return -1, err + return err } - // Set status to simulate a configured EphemeralRunner - refetch := false - for i, runner := range runnerList.Items { - if runner.Status.RunnerId == 0 { - updatedRunner := runner.DeepCopy() - updatedRunner.Status.Phase = corev1.PodRunning - updatedRunner.Status.RunnerId = i + 100 - err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner)) - Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - refetch = true - } + if len(runnerList.Items) != 3 { + return fmt.Errorf("Expected 3 runners, got %d", len(runnerList.Items)) } - if refetch { - err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) - if err != nil { - return -1, err + for _, runner := range runnerList.Items { + if runner.Status.Phase == corev1.PodSucceeded { + return fmt.Errorf("Runner %s is in Succeeded phase", runner.Name) } } + return nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeNil(), "3 EphemeralRunner should be created and none should be in Succeeded phase") + }) + + It("Should handle scale down without removing pending runners", func() { + ers := new(actionsv1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + + updated := ers.DeepCopy() + updated.Spec.Replicas = 2 + updated.Spec.PatchID = 1 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") + + runnerList := new(actionsv1alpha1.EphemeralRunnerList) + Eventually( + func() (int, error) { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return -1, err + } + return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestInterval, ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") - // We will not scale down failed runner - failedRunner := runnerList.Items[0].DeepCopy() - failedRunner.Status.Phase = corev1.PodFailed - err = k8sClient.Status().Patch(ctx, failedRunner, client.MergeFrom(&runnerList.Items[0])) + updatedRunner := runnerList.Items[0].DeepCopy() + updatedRunner.Status.Phase = corev1.PodSucceeded + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + + updatedRunner = runnerList.Items[1].DeepCopy() + updatedRunner.Status.Phase = corev1.PodPending + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1])) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + // Wait for these statuses to actually be updated + runnerList = new(actionsv1alpha1.EphemeralRunnerList) + Eventually( + func() error { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return err + } + pending := 0 + succeeded := 0 + for _, runner := range runnerList.Items { + switch runner.Status.Phase { + case corev1.PodSucceeded: + succeeded++ + case corev1.PodPending: + pending++ + } + } + + if pending != 1 && succeeded != 1 { + return fmt.Errorf("Expected 1 runner in Pending and 1 in Succeeded, got %d in Pending and %d in Succeeded", pending, succeeded) + } + + return nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeNil(), "1 EphemeralRunner should be in Pending and 1 in Succeeded phase") + + // Scale down to 0, while 1 is still pending. This simulates the difference between the desired and actual state + ers = new(actionsv1alpha1.EphemeralRunnerSet) + err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + + updated = ers.DeepCopy() + updated.Spec.Replicas = 0 + updated.Spec.PatchID = 2 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") + runnerList = new(actionsv1alpha1.EphemeralRunnerList) + // We should have 1 runner up and pending + Eventually( + func() error { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return err + } + + if len(runnerList.Items) != 1 { + return fmt.Errorf("Expected 1 runner, got %d", len(runnerList.Items)) + } + + if runnerList.Items[0].Status.Phase != corev1.PodPending { + return fmt.Errorf("Expected runner to be in Pending, got %s", runnerList.Items[0].Status.Phase) + } + + return nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeNil(), "1 EphemeralRunner should be created and in Pending phase") + + // Now, the ephemeral runner finally is done and we can scale down to 0 + updatedRunner = runnerList.Items[0].DeepCopy() + updatedRunner.Status.Phase = corev1.PodSucceeded + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + Eventually( func() (int, error) { err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) @@ -506,24 +670,31 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return -1, err } - // Set status to simulate a configured EphemeralRunner - refetch := false - for i, runner := range runnerList.Items { - if runner.Status.RunnerId == 0 { - updatedRunner := runner.DeepCopy() - updatedRunner.Status.Phase = corev1.PodRunning - updatedRunner.Status.RunnerId = i + 100 - err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner)) - Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - refetch = true - } - } + return len(runnerList.Items), nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(0), "2 EphemeralRunner should be created") + }) - if refetch { - err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) - if err != nil { - return -1, err - } + It("Should kill pending and running runners if they are up for some reason and the batch contains no jobs", func() { + ers := new(actionsv1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + + updated := ers.DeepCopy() + updated.Spec.Replicas = 2 + updated.Spec.PatchID = 1 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") + + runnerList := new(actionsv1alpha1.EphemeralRunnerList) + Eventually( + func() (int, error) { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return -1, err } return len(runnerList.Items), nil @@ -532,27 +703,89 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { ephemeralRunnerSetTestInterval, ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") - // We will scale down to 0 when the running job is completed and the failed runner is deleted - runningRunner = runnerList.Items[1].DeepCopy() - runningRunner.Status.Phase = corev1.PodSucceeded - err = k8sClient.Status().Patch(ctx, runningRunner, client.MergeFrom(&runnerList.Items[1])) + // Put one runner in Pending and one in Running + updatedRunner := runnerList.Items[0].DeepCopy() + updatedRunner.Status.Phase = corev1.PodPending + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0])) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - err = k8sClient.Delete(ctx, &runnerList.Items[0]) - Expect(err).NotTo(HaveOccurred(), "failed to delete EphemeralRunner") + updatedRunner = runnerList.Items[1].DeepCopy() + updatedRunner.Status.Phase = corev1.PodRunning + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - // Scale down to 0 while 1 ephemeral runner is failed - patchID++ - original = new(actionsv1alpha1.EphemeralRunnerSet) - err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) + // Wait for these statuses to actually be updated + runnerList = new(actionsv1alpha1.EphemeralRunnerList) + Eventually( + func() error { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return err + } + + pending := 0 + running := 0 + + for _, runner := range runnerList.Items { + switch runner.Status.Phase { + case corev1.PodPending: + pending++ + case corev1.PodRunning: + running++ + + } + } + + if pending != 1 && running != 1 { + return fmt.Errorf("Expected 1 runner in Pending and 1 in Running, got %d in Pending and %d in Running", pending, running) + } + + return nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeNil(), "1 EphemeralRunner should be in Pending and 1 in Running phase") + + // Scale down to 0 with patch ID 0. This forces the scale down to self correct on empty batch + + ers = new(actionsv1alpha1.EphemeralRunnerSet) + err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") - updated = original.DeepCopy() - updated.Spec.PatchID = patchID + + updated = ers.DeepCopy() updated.Spec.Replicas = 0 - err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) + updated.Spec.PatchID = 0 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") - // Wait for the EphemeralRunnerSet to be scaled down to 0 + runnerList = new(actionsv1alpha1.EphemeralRunnerList) + Consistently( + func() (int, error) { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return -1, err + } + + return len(runnerList.Items), nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be up since they don't have an ID yet") + + // Now, let's say ephemeral runner controller patched these ephemeral runners with the registration. + + updatedRunner = runnerList.Items[0].DeepCopy() + updatedRunner.Status.RunnerId = 1 + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + + updatedRunner = runnerList.Items[1].DeepCopy() + updatedRunner.Status.RunnerId = 2 + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + + // Now, eventually, they should be deleted runnerList = new(actionsv1alpha1.EphemeralRunnerList) Eventually( func() (int, error) { @@ -561,31 +794,120 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return -1, err } - // Set status to simulate a configured EphemeralRunner - refetch := false - for i, runner := range runnerList.Items { - if runner.Status.RunnerId == 0 { - updatedRunner := runner.DeepCopy() - updatedRunner.Status.Phase = corev1.PodRunning - updatedRunner.Status.RunnerId = i + 100 - err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner)) - Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - refetch = true + return len(runnerList.Items), nil + + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(0), "0 EphemeralRunner should exist") + }) + + It("Should replace finished ephemeral runners with new ones", func() { + ers := new(actionsv1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + + updated := ers.DeepCopy() + updated.Spec.Replicas = 2 + updated.Spec.PatchID = 1 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") + + runnerList := new(actionsv1alpha1.EphemeralRunnerList) + Eventually( + func() (int, error) { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return -1, err + } + + return len(runnerList.Items), nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") + + // Put one runner in Succeeded and one in Running + updatedRunner := runnerList.Items[0].DeepCopy() + updatedRunner.Status.Phase = corev1.PodSucceeded + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + + updatedRunner = runnerList.Items[1].DeepCopy() + updatedRunner.Status.Phase = corev1.PodRunning + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + + // Wait for these statuses to actually be updated + + runnerList = new(actionsv1alpha1.EphemeralRunnerList) + Eventually( + func() error { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return err + } + + succeeded := 0 + running := 0 + + for _, runner := range runnerList.Items { + switch runner.Status.Phase { + case corev1.PodSucceeded: + succeeded++ + case corev1.PodRunning: + running++ } } - if refetch { - err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) - if err != nil { - return -1, err + if succeeded != 1 && running != 1 { + return fmt.Errorf("Expected 1 runner in Succeeded and 1 in Running, got %d in Succeeded and %d in Running", succeeded, running) + } + + return nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeNil(), "1 EphemeralRunner should be in Succeeded and 1 in Running phase") + + // Now, let's simulate replacement. The desired count is still 2. + // This simulates that we got 1 job assigned, and 1 job completed. + + ers = new(actionsv1alpha1.EphemeralRunnerSet) + err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + + updated = ers.DeepCopy() + updated.Spec.Replicas = 2 + updated.Spec.PatchID = 2 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") + + runnerList = new(actionsv1alpha1.EphemeralRunnerList) + Eventually( + func() error { + err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) + if err != nil { + return err + } + + if len(runnerList.Items) != 2 { + return fmt.Errorf("Expected 2 runners, got %d", len(runnerList.Items)) + } + + for _, runner := range runnerList.Items { + if runner.Status.Phase == corev1.PodSucceeded { + return fmt.Errorf("Expected no runners in Succeeded phase, got one") } } - return len(runnerList.Items), nil + return nil }, ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestInterval, - ).Should(BeEquivalentTo(0), "0 EphemeralRunner should be created") + ).Should(BeNil(), "2 EphemeralRunner should be created and none should be in Succeeded phase") }) It("Should update status on Ephemeral Runner state changes", func() {