Skip to content

Commit

Permalink
Implement fix for PipelineRuns getting stuck in the cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
RafaeLeal committed Apr 17, 2023
1 parent 6f68e80 commit 34e8778
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 11 deletions.
30 changes: 30 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1beta1

import (
"context"
"fmt"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
Expand Down Expand Up @@ -155,6 +156,22 @@ func (pr *PipelineRun) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Namespace: pr.Namespace, Name: pr.Name}
}

// IsTimeoutConditionSet returns true when the pipelinerun has the pipelinerun timed out reason
func (pr *PipelineRun) IsTimeoutConditionSet() bool {
condition := pr.Status.GetCondition(apis.ConditionSucceeded)
return condition.IsFalse() && condition.Reason == PipelineRunReasonTimedOut.String()
}

// SetTimeoutCondition sets the status of the PipelineRun to timed out.
func (pr *PipelineRun) SetTimeoutCondition(ctx context.Context) {
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: PipelineRunReasonTimedOut.String(),
Message: fmt.Sprintf("PipelineRun %q failed to finish within %q", pr.Name, pr.PipelineTimeout(ctx).String()),
})
}

// HasTimedOut returns true if a pipelinerun has exceeded its spec.Timeout based on its status.Timeout
func (pr *PipelineRun) HasTimedOut(ctx context.Context, c clock.PassiveClock) bool {
timeout := pr.PipelineTimeout(ctx)
Expand All @@ -172,6 +189,19 @@ func (pr *PipelineRun) HasTimedOut(ctx context.Context, c clock.PassiveClock) bo
return false
}

// HasTimedOutForALongTime returns true if a pipelinerun has exceeed its spec.Timeout based its status.StartTime
// by a large margin
func (pr *PipelineRun) HasTimedOutForALongTime(ctx context.Context, c clock.PassiveClock) bool {
if !pr.HasTimedOut(ctx, c) {
return false
}
timeout := pr.PipelineTimeout(ctx)
startTime := pr.Status.StartTime
runtime := c.Since(startTime.Time)
// We are arbitrarily defining large margin as doubling the spec.timeout
return runtime >= 2*timeout
}

// HaveTasksTimedOut returns true if a pipelinerun has exceeded its spec.Timeouts.Tasks
func (pr *PipelineRun) HaveTasksTimedOut(ctx context.Context, c clock.PassiveClock) bool {
timeout := pr.TasksTimeout()
Expand Down
176 changes: 176 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/test/diff"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clock "k8s.io/utils/clock/testing"
"knative.dev/pkg/apis"
v1 "knative.dev/pkg/apis/duck/v1"
)

var now = time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC)
Expand Down Expand Up @@ -216,6 +219,179 @@ func TestPipelineRunHasStarted(t *testing.T) {
}
}

func TestPipelineRunIsTimeoutConditionSet(t *testing.T) {
tcs := []struct {
name string
condition apis.Condition
want bool
}{{
name: "should return true when reason is timeout",
condition: apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: v1beta1.PipelineRunReasonTimedOut.String(),
},
want: true,
}, {
name: "should return false if status is not false",
condition: apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: v1beta1.PipelineRunReasonTimedOut.String(),
},
want: false,
}, {
name: "should return false if the reason is not timeout",
condition: apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: v1beta1.PipelineRunReasonFailed.String(),
},
want: false,
}}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
pr := &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline-run"},
Status: v1beta1.PipelineRunStatus{
Status: v1.Status{
Conditions: v1.Conditions{tc.condition},
},
},
}
if got := pr.IsTimeoutConditionSet(); got != tc.want {
t.Errorf("pr.IsTimeoutConditionSet() (-want, +got):\n- %t\n+ %t", tc.want, got)
}
})
}
}

func TestPipelineRunSetTimeoutCondition(t *testing.T) {
ctx := config.ToContext(context.Background(), &config.Config{
Defaults: &config.Defaults{
DefaultTimeoutMinutes: 120,
},
})

tcs := []struct {
name string
pipelineRun *v1beta1.PipelineRun
want *apis.Condition
}{{
name: "set condition to default timeout",
pipelineRun: &v1beta1.PipelineRun{ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline-run"}},
want: &apis.Condition{
Type: "Succeeded",
Status: "False",
Reason: "PipelineRunTimeout",
Message: `PipelineRun "test-pipeline-run" failed to finish within "2h0m0s"`,
},
}, {
name: "set condition to spec.timeout value",
pipelineRun: &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline-run"},
Spec: v1beta1.PipelineRunSpec{
Timeout: &metav1.Duration{Duration: time.Hour},
},
},
want: &apis.Condition{
Type: "Succeeded",
Status: "False",
Reason: "PipelineRunTimeout",
Message: `PipelineRun "test-pipeline-run" failed to finish within "1h0m0s"`,
},
}, {
name: "set condition to spec.timeouts.pipeline value",
pipelineRun: &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline-run"},
Spec: v1beta1.PipelineRunSpec{
Timeouts: &v1beta1.TimeoutFields{
Pipeline: &metav1.Duration{Duration: time.Hour},
},
},
},
want: &apis.Condition{
Type: "Succeeded",
Status: "False",
Reason: "PipelineRunTimeout",
Message: `PipelineRun "test-pipeline-run" failed to finish within "1h0m0s"`,
},
}}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
tc.pipelineRun.SetTimeoutCondition(ctx)

got := tc.pipelineRun.Status.GetCondition(apis.ConditionSucceeded)
if d := cmp.Diff(tc.want, got, cmpopts.IgnoreFields(apis.Condition{}, "LastTransitionTime")); d != "" {
t.Errorf("Unexpected PipelineRun condition: %v", diff.PrintWantGot(d))
}
})
}
}

func TestPipelineRunHasTimedOutForALongTime(t *testing.T) {
tcs := []struct {
name string
timeout time.Duration
starttime time.Time
expected bool
}{{
name: "has timed out for a long time",
timeout: 1 * time.Hour,
starttime: now.Add(-2 * time.Hour),
expected: true,
}, {
name: "has timed out for not a long time",
timeout: 1 * time.Hour,
starttime: now.Add(-90 * time.Minute),
expected: false,
}, {
name: "has not timed out",
timeout: 1 * time.Hour,
starttime: now.Add(-30 * time.Minute),
expected: false,
}, {
name: "has no timeout specified",
timeout: 0 * time.Second,
starttime: now.Add(-24 * time.Hour),
expected: false,
}}

for _, tc := range tcs {
t.Run("pipeline.timeout "+tc.name, func(t *testing.T) {
pr := &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: v1beta1.PipelineRunSpec{
Timeout: &metav1.Duration{Duration: tc.timeout},
},
Status: v1beta1.PipelineRunStatus{PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{
StartTime: &metav1.Time{Time: tc.starttime},
}},
}
if pr.HasTimedOutForALongTime(context.Background(), testClock) != tc.expected {
t.Errorf("Expected HasTimedOut to be %t when using pipeline.timeout", tc.expected)
}
})
t.Run("pipeline.timeouts.pipeline "+tc.name, func(t *testing.T) {
pr := &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: v1beta1.PipelineRunSpec{
Timeouts: &v1beta1.TimeoutFields{Pipeline: &metav1.Duration{Duration: tc.timeout}},
},
Status: v1beta1.PipelineRunStatus{PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{
StartTime: &metav1.Time{Time: tc.starttime},
}},
}

if pr.HasTimedOutForALongTime(context.Background(), testClock) != tc.expected {
t.Errorf("Expected HasTimedOut to be %t when using pipeline.timeouts.pipeline", tc.expected)
}
})
}
}

func TestPipelineRunHasTimedOut(t *testing.T) {
tcs := []struct {
name string
Expand Down
14 changes: 14 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
// Read the initial condition
before := pr.Status.GetCondition(apis.ConditionSucceeded)

// Check if we are failing to mark this as timed out for a while. If we are, mark immediately and finish the
// reconcile. We are assuming here that if the PipelineRun has timed out for a long time, it had time to run
// before and it kept failing. One reason that can happen is exceeding etcd request size limit. Finishing it early
// makes sure the request size is manageable
if pr.HasTimedOutForALongTime(ctx, c.Clock) && !pr.IsTimeoutConditionSet() {
if err := timeoutPipelineRun(ctx, logger, pr, c.PipelineClientSet); err != nil {
return err
}
if err := c.finishReconcileUpdateEmitEvents(ctx, pr, before, nil); err != nil {
return err
}
return controller.NewPermanentError(errors.New("PipelineRun has timed out for a long time"))
}

if !pr.HasStarted() && !pr.IsPending() {
pr.Status.InitializeConditions(c.Clock)
// In case node time was not synchronized, when controller has been scheduled to other nodes.
Expand Down
Loading

0 comments on commit 34e8778

Please sign in to comment.