From 34e87786871eb33ddb14212adaf16586fdc4415e Mon Sep 17 00:00:00 2001 From: Rafael Leal Date: Thu, 16 Feb 2023 19:33:18 -0300 Subject: [PATCH] Implement fix for PipelineRuns getting stuck in the cluster --- .../pipeline/v1beta1/pipelinerun_types.go | 30 +++ .../v1beta1/pipelinerun_types_test.go | 176 ++++++++++++++++++ pkg/reconciler/pipelinerun/pipelinerun.go | 14 ++ .../pipelinerun/pipelinerun_test.go | 152 ++++++++++++++- pkg/reconciler/pipelinerun/timeout.go | 9 +- 5 files changed, 370 insertions(+), 11 deletions(-) diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go index 244d58130e5..83bf2c22e1a 100644 --- a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go +++ b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go @@ -18,6 +18,7 @@ package v1beta1 import ( "context" + "fmt" "time" "github.com/tektoncd/pipeline/pkg/apis/config" @@ -155,6 +156,22 @@ func (pr *PipelineRun) GetNamespacedName() types.NamespacedName { return types.NamespacedName{Namespace: pr.Namespace, Name: pr.Name} } +// IsTimeoutConditionSet returns true when the pipelinerun has the pipelinerun timed out reason +func (pr *PipelineRun) IsTimeoutConditionSet() bool { + condition := pr.Status.GetCondition(apis.ConditionSucceeded) + return condition.IsFalse() && condition.Reason == PipelineRunReasonTimedOut.String() +} + +// SetTimeoutCondition sets the status of the PipelineRun to timed out. +func (pr *PipelineRun) SetTimeoutCondition(ctx context.Context) { + pr.Status.SetCondition(&apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: PipelineRunReasonTimedOut.String(), + Message: fmt.Sprintf("PipelineRun %q failed to finish within %q", pr.Name, pr.PipelineTimeout(ctx).String()), + }) +} + // HasTimedOut returns true if a pipelinerun has exceeded its spec.Timeout based on its status.Timeout func (pr *PipelineRun) HasTimedOut(ctx context.Context, c clock.PassiveClock) bool { timeout := pr.PipelineTimeout(ctx) @@ -172,6 +189,19 @@ func (pr *PipelineRun) HasTimedOut(ctx context.Context, c clock.PassiveClock) bo return false } +// HasTimedOutForALongTime returns true if a pipelinerun has exceeed its spec.Timeout based its status.StartTime +// by a large margin +func (pr *PipelineRun) HasTimedOutForALongTime(ctx context.Context, c clock.PassiveClock) bool { + if !pr.HasTimedOut(ctx, c) { + return false + } + timeout := pr.PipelineTimeout(ctx) + startTime := pr.Status.StartTime + runtime := c.Since(startTime.Time) + // We are arbitrarily defining large margin as doubling the spec.timeout + return runtime >= 2*timeout +} + // HaveTasksTimedOut returns true if a pipelinerun has exceeded its spec.Timeouts.Tasks func (pr *PipelineRun) HaveTasksTimedOut(ctx context.Context, c clock.PassiveClock) bool { timeout := pr.TasksTimeout() diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go b/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go index 8d76ec0bf2c..543861b3575 100644 --- a/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go +++ b/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go @@ -22,6 +22,8 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/test/diff" @@ -29,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clock "k8s.io/utils/clock/testing" "knative.dev/pkg/apis" + v1 "knative.dev/pkg/apis/duck/v1" ) var now = time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC) @@ -216,6 +219,179 @@ func TestPipelineRunHasStarted(t *testing.T) { } } +func TestPipelineRunIsTimeoutConditionSet(t *testing.T) { + tcs := []struct { + name string + condition apis.Condition + want bool + }{{ + name: "should return true when reason is timeout", + condition: apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: v1beta1.PipelineRunReasonTimedOut.String(), + }, + want: true, + }, { + name: "should return false if status is not false", + condition: apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Reason: v1beta1.PipelineRunReasonTimedOut.String(), + }, + want: false, + }, { + name: "should return false if the reason is not timeout", + condition: apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: v1beta1.PipelineRunReasonFailed.String(), + }, + want: false, + }} + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + pr := &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline-run"}, + Status: v1beta1.PipelineRunStatus{ + Status: v1.Status{ + Conditions: v1.Conditions{tc.condition}, + }, + }, + } + if got := pr.IsTimeoutConditionSet(); got != tc.want { + t.Errorf("pr.IsTimeoutConditionSet() (-want, +got):\n- %t\n+ %t", tc.want, got) + } + }) + } +} + +func TestPipelineRunSetTimeoutCondition(t *testing.T) { + ctx := config.ToContext(context.Background(), &config.Config{ + Defaults: &config.Defaults{ + DefaultTimeoutMinutes: 120, + }, + }) + + tcs := []struct { + name string + pipelineRun *v1beta1.PipelineRun + want *apis.Condition + }{{ + name: "set condition to default timeout", + pipelineRun: &v1beta1.PipelineRun{ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline-run"}}, + want: &apis.Condition{ + Type: "Succeeded", + Status: "False", + Reason: "PipelineRunTimeout", + Message: `PipelineRun "test-pipeline-run" failed to finish within "2h0m0s"`, + }, + }, { + name: "set condition to spec.timeout value", + pipelineRun: &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline-run"}, + Spec: v1beta1.PipelineRunSpec{ + Timeout: &metav1.Duration{Duration: time.Hour}, + }, + }, + want: &apis.Condition{ + Type: "Succeeded", + Status: "False", + Reason: "PipelineRunTimeout", + Message: `PipelineRun "test-pipeline-run" failed to finish within "1h0m0s"`, + }, + }, { + name: "set condition to spec.timeouts.pipeline value", + pipelineRun: &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline-run"}, + Spec: v1beta1.PipelineRunSpec{ + Timeouts: &v1beta1.TimeoutFields{ + Pipeline: &metav1.Duration{Duration: time.Hour}, + }, + }, + }, + want: &apis.Condition{ + Type: "Succeeded", + Status: "False", + Reason: "PipelineRunTimeout", + Message: `PipelineRun "test-pipeline-run" failed to finish within "1h0m0s"`, + }, + }} + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + tc.pipelineRun.SetTimeoutCondition(ctx) + + got := tc.pipelineRun.Status.GetCondition(apis.ConditionSucceeded) + if d := cmp.Diff(tc.want, got, cmpopts.IgnoreFields(apis.Condition{}, "LastTransitionTime")); d != "" { + t.Errorf("Unexpected PipelineRun condition: %v", diff.PrintWantGot(d)) + } + }) + } +} + +func TestPipelineRunHasTimedOutForALongTime(t *testing.T) { + tcs := []struct { + name string + timeout time.Duration + starttime time.Time + expected bool + }{{ + name: "has timed out for a long time", + timeout: 1 * time.Hour, + starttime: now.Add(-2 * time.Hour), + expected: true, + }, { + name: "has timed out for not a long time", + timeout: 1 * time.Hour, + starttime: now.Add(-90 * time.Minute), + expected: false, + }, { + name: "has not timed out", + timeout: 1 * time.Hour, + starttime: now.Add(-30 * time.Minute), + expected: false, + }, { + name: "has no timeout specified", + timeout: 0 * time.Second, + starttime: now.Add(-24 * time.Hour), + expected: false, + }} + + for _, tc := range tcs { + t.Run("pipeline.timeout "+tc.name, func(t *testing.T) { + pr := &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: v1beta1.PipelineRunSpec{ + Timeout: &metav1.Duration{Duration: tc.timeout}, + }, + Status: v1beta1.PipelineRunStatus{PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{ + StartTime: &metav1.Time{Time: tc.starttime}, + }}, + } + if pr.HasTimedOutForALongTime(context.Background(), testClock) != tc.expected { + t.Errorf("Expected HasTimedOut to be %t when using pipeline.timeout", tc.expected) + } + }) + t.Run("pipeline.timeouts.pipeline "+tc.name, func(t *testing.T) { + pr := &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: v1beta1.PipelineRunSpec{ + Timeouts: &v1beta1.TimeoutFields{Pipeline: &metav1.Duration{Duration: tc.timeout}}, + }, + Status: v1beta1.PipelineRunStatus{PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{ + StartTime: &metav1.Time{Time: tc.starttime}, + }}, + } + + if pr.HasTimedOutForALongTime(context.Background(), testClock) != tc.expected { + t.Errorf("Expected HasTimedOut to be %t when using pipeline.timeouts.pipeline", tc.expected) + } + }) + } +} + func TestPipelineRunHasTimedOut(t *testing.T) { tcs := []struct { name string diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 4cc2f12ea68..57497a1f975 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -183,6 +183,20 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) // Read the initial condition before := pr.Status.GetCondition(apis.ConditionSucceeded) + // Check if we are failing to mark this as timed out for a while. If we are, mark immediately and finish the + // reconcile. We are assuming here that if the PipelineRun has timed out for a long time, it had time to run + // before and it kept failing. One reason that can happen is exceeding etcd request size limit. Finishing it early + // makes sure the request size is manageable + if pr.HasTimedOutForALongTime(ctx, c.Clock) && !pr.IsTimeoutConditionSet() { + if err := timeoutPipelineRun(ctx, logger, pr, c.PipelineClientSet); err != nil { + return err + } + if err := c.finishReconcileUpdateEmitEvents(ctx, pr, before, nil); err != nil { + return err + } + return controller.NewPermanentError(errors.New("PipelineRun has timed out for a long time")) + } + if !pr.HasStarted() && !pr.IsPending() { pr.Status.InitializeConditions(c.Clock) // In case node time was not synchronized, when controller has been scheduled to other nodes. diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index a0c91bd0f1f..b7db14fcd00 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -28,6 +28,8 @@ import ( "testing" "time" + "sigs.k8s.io/yaml" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-containerregistry/pkg/registry" @@ -1385,7 +1387,7 @@ status: reason: Running status: Unknown type: Succeeded - startTime: "2021-12-31T00:00:00Z" + startTime: "2021-12-31T11:00:00Z" childReferences: - name: test-pipeline-run-custom-task-hello-world-1 pipelineTaskName: hello-world-1 @@ -2039,7 +2041,7 @@ spec: serviceAccountName: test-sa timeout: 12h0m0s status: - startTime: "2021-12-31T00:00:00Z" + startTime: "2021-12-31T11:00:00Z" `)} ts := []*v1beta1.Task{simpleHelloWorldTask} @@ -2101,7 +2103,7 @@ spec: timeouts: pipeline: 12h0m0s status: - startTime: "2021-12-31T00:00:00Z" + startTime: "2021-12-31T11:00:00Z" childReferences: - name: test-pipeline-run-with-timeout-hello-world-1 pipelineTaskName: hello-world-1 @@ -2161,6 +2163,148 @@ spec: } } +func TestReconcileWithTimeoutForALongTimeAndEtcdLimit_Pipeline(t *testing.T) { + timeout := 12 * time.Hour + testCases := []struct { + name string + startTime time.Time + wantError error + }{ + { + name: "pipelinerun has timed out for way too much time", + startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-3 * timeout), + wantError: errors.New("PipelineRun has timed out for a long time"), + }, + { + name: "pipelinerun has timed out for a long time", + startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-2 * timeout), + wantError: errors.New("PipelineRun has timed out for a long time"), + }, + { + name: "pipelinerun has timed out for a while", + startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-(3 / 2) * timeout), + wantError: errors.New("etcdserver: request too large"), + }, + { + name: "pipelinerun has just timed out", + startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-timeout), + wantError: errors.New("etcdserver: request too large"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ps := []*v1beta1.Pipeline{parse.MustParseV1beta1Pipeline(t, ` +metadata: + name: test-pipeline + namespace: foo +spec: + tasks: + - name: hello-world-1 + taskRef: + name: hello-world + - name: hello-world-2 + taskRef: + name: hello-world +`)} + prs := []*v1beta1.PipelineRun{parse.MustParseV1beta1PipelineRun(t, ` +metadata: + name: test-pipeline-run-with-timeout + namespace: foo +spec: + pipelineRef: + name: test-pipeline + serviceAccountName: test-sa + timeouts: + pipeline: 12h0m0s +status: + startTime: "2021-12-30T00:00:00Z" +`)} + ts := []*v1beta1.Task{simpleHelloWorldTask} + + trs := []*v1beta1.TaskRun{mustParseTaskRunWithObjectMeta(t, taskRunObjectMeta("test-pipeline-run-with-timeout-hello-world-1", "foo", "test-pipeline-run-with-timeout", + "test-pipeline", "hello-world-1", false), ` +spec: + resources: {} + serviceAccountName: test-sa + taskRef: + name: hello-world + kind: Task +`)} + start := metav1.NewTime(tc.startTime) + prs[0].Status.StartTime = &start + + d := test.Data{ + PipelineRuns: prs, + Pipelines: ps, + Tasks: ts, + TaskRuns: trs, + } + prt := newPipelineRunTest(t, d) + defer prt.Cancel() + + wantEvents := []string{ + "Warning Failed PipelineRun \"test-pipeline-run-with-timeout\" failed to finish within \"12h0m0s\"", + } + + // this limit is just enough to set the timeout condition, but not enough for extra metadata. + etcdRequestSizeLimit := 650 + prt.TestAssets.Clients.Pipeline.PrependReactor("update", "pipelineruns", withEtcdRequestSizeLimit(t, etcdRequestSizeLimit)) + + c := prt.TestAssets.Controller + clients := prt.TestAssets.Clients + reconcileError := c.Reconciler.Reconcile(prt.TestAssets.Ctx, "foo/test-pipeline-run-with-timeout") + if tc.wantError != nil { + if reconcileError == nil { + t.Fatalf("expected error %q, but got nil", tc.wantError.Error()) + } + if reconcileError.Error() != tc.wantError.Error() { + t.Fatalf("Expected error: %s Got: %s", tc.wantError, reconcileError) + } + return + } + if reconcileError != nil { + t.Fatalf("Reconcile error: %s", reconcileError) + } + prt.Test.Logf("Getting reconciled run") + reconciledRun, err := clients.Pipeline.TektonV1beta1().PipelineRuns("foo").Get(prt.TestAssets.Ctx, "test-pipeline-run-with-timeout", metav1.GetOptions{}) + if err != nil { + prt.Test.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err) + } + prt.Test.Logf("Getting events") + // Check generated events match what's expected + if err := k8sevent.CheckEventsOrdered(prt.Test, prt.TestAssets.Recorder.Events, "test-pipeline-run-with-timeout", wantEvents); err != nil { + prt.Test.Errorf(err.Error()) + } + + // The PipelineRun should be timed out. + if reconciledRun.Status.GetCondition(apis.ConditionSucceeded).Reason != "PipelineRunTimeout" { + t.Errorf("Expected PipelineRun to be timed out, but condition reason is %s", reconciledRun.Status.GetCondition(apis.ConditionSucceeded)) + } + }) + } +} + +// withEtcdRequestSizeLimit calculates the yaml marshal of the payload and gives an `etcdserver: request too large` when +// the limit is reached +func withEtcdRequestSizeLimit(t *testing.T, limitBytes int) ktesting.ReactionFunc { + t.Helper() + return func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + obj := action.(ktesting.UpdateAction).GetObject() + bytes, err := yaml.Marshal(obj) + if err != nil { + t.Fatalf("returned a unserializable status: %+v", obj) + } + + if len(bytes) > limitBytes { + t.Logf("request size: %d\nrequest limit: %d\n", len(bytes), limitBytes) + t.Logf("payload:\n%s\n", string(bytes)) + return true, nil, errors.New("etcdserver: request too large") + } + return false, nil, nil + } +} + func TestReconcileWithTimeouts_Tasks(t *testing.T) { // TestReconcileWithTimeouts_Tasks runs "Reconcile" on a PipelineRun with timeouts.tasks configured. // It verifies that reconcile is successful, no TaskRun is created, the PipelineTask is marked as skipped, and the @@ -6159,11 +6303,13 @@ func (prt PipelineRunTest) reconcileRun(namespace, pipelineRunName string, wantE } else if reconcileError != nil { prt.Test.Fatalf("Error reconciling: %s", reconcileError) } + prt.Test.Logf("Getting reconciled run") // Check that the PipelineRun was reconciled correctly reconciledRun, err := clients.Pipeline.TektonV1beta1().PipelineRuns(namespace).Get(prt.TestAssets.Ctx, pipelineRunName, metav1.GetOptions{}) if err != nil { prt.Test.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err) } + prt.Test.Logf("Getting events") // Check generated events match what's expected if len(wantEvents) > 0 { diff --git a/pkg/reconciler/pipelinerun/timeout.go b/pkg/reconciler/pipelinerun/timeout.go index a9ffd83e203..89a0faa6ab2 100644 --- a/pkg/reconciler/pipelinerun/timeout.go +++ b/pkg/reconciler/pipelinerun/timeout.go @@ -87,14 +87,7 @@ func timeoutPipelineRun(ctx context.Context, logger *zap.SugaredLogger, pr *v1be // If we successfully timed out all the TaskRuns and Runs, we can consider the PipelineRun timed out. if len(errs) == 0 { - reason := v1beta1.PipelineRunReasonTimedOut.String() - - pr.Status.SetCondition(&apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionFalse, - Reason: reason, - Message: fmt.Sprintf("PipelineRun %q failed to finish within %q", pr.Name, pr.PipelineTimeout(ctx).String()), - }) + pr.SetTimeoutCondition(ctx) // update pr completed time pr.Status.CompletionTime = &metav1.Time{Time: time.Now()} } else {