Skip to content

Commit

Permalink
TEP-0121: Reconciler Implementation
Browse files Browse the repository at this point in the history
Prior to this commit, `retries` logic for TaskRun is handled by
Tekton `PipelineRun` reconciler. This commit delegates the retries
implementation for `TaskRun` to the `TaskRun` reconciler.

The major change is we stopped relying on `len(retriesStatus)` to
decide if a target `TaskRun` failed or not. Instead, we use status
`ConditionSuceeded` to gate the completion of a `TaskRun`. Even a
`TaskRun` failed on one execution, as long as it has remaining
retries, the TaskRun won't be stored in etcd with its status set
as `Failed`. Instead, the status will be:

```yaml
Type: Succeeded
Status: Unknown
Reason: ToBeRetried
```
  • Loading branch information
XinruZhang committed Dec 8, 2022
1 parent 4874def commit ac6d523
Show file tree
Hide file tree
Showing 10 changed files with 478 additions and 538 deletions.
24 changes: 17 additions & 7 deletions pkg/pod/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
ReasonFailedResolution = "TaskRunResolutionFailed"

// ReasonFailedValidation indicated that the reason for failure status is
// that taskrun failed runtime validation
// that TaskRun failed runtime validation
ReasonFailedValidation = "TaskRunValidationFailed"

// ReasonExceededResourceQuota indicates that the TaskRun failed to create a pod due to
Expand Down Expand Up @@ -162,12 +162,12 @@ func setTaskRunStatusBasedOnStepStatus(logger *zap.SugaredLogger, stepStatuses [
} else {
time, err := extractStartedAtTimeFromResults(results)
if err != nil {
logger.Errorf("error setting the start time of step %q in taskrun %q: %v", s.Name, tr.Name, err)
logger.Errorf("error setting the start time of step %q in TaskRun %q: %v", s.Name, tr.Name, err)
merr = multierror.Append(merr, err)
}
exitCode, err := extractExitCodeFromResults(results)
if err != nil {
logger.Errorf("error extracting the exit code of step %q in taskrun %q: %v", s.Name, tr.Name, err)
logger.Errorf("error extracting the exit code of step %q in TaskRun %q: %v", s.Name, tr.Name, err)
merr = multierror.Append(merr, err)
}
taskResults, pipelineResourceResults, filteredResults := filterResultsAndResources(results)
Expand Down Expand Up @@ -335,7 +335,7 @@ func updateIncompleteTaskRunStatus(trs *v1beta1.TaskRunStatus, pod *corev1.Pod)
}
}

// DidTaskRunFail check the status of pod to decide if related taskrun is failed
// DidTaskRunFail check the status of pod to decide if related TaskRun is failed
func DidTaskRunFail(pod *corev1.Pod) bool {
f := pod.Status.Phase == corev1.PodFailed
for _, s := range pod.Status.ContainerStatuses {
Expand All @@ -348,6 +348,16 @@ func DidTaskRunFail(pod *corev1.Pod) bool {
return f
}

// IsPodArchived indicates if a pod is archived in the retriesStatus.
func IsPodArchived(pod *corev1.Pod, trs *v1beta1.TaskRunStatus) bool {
for _, retryStatus := range trs.RetriesStatus {
if retryStatus.PodName == pod.GetName() {
return true
}
}
return false
}

func areStepsComplete(pod *corev1.Pod) bool {
stepsComplete := len(pod.Status.ContainerStatuses) > 0 && pod.Status.Phase == corev1.PodRunning
for _, s := range pod.Status.ContainerStatuses {
Expand Down Expand Up @@ -478,7 +488,7 @@ func getWaitingMessage(pod *corev1.Pod) string {
return "Pending"
}

// markStatusRunning sets taskrun status to running
// markStatusRunning sets TaskRun status to running
func markStatusRunning(trs *v1beta1.TaskRunStatus, reason, message string) {
trs.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Expand All @@ -488,7 +498,7 @@ func markStatusRunning(trs *v1beta1.TaskRunStatus, reason, message string) {
})
}

// markStatusFailure sets taskrun status to failure with specified reason
// markStatusFailure sets TaskRun status to failure with specified reason
func markStatusFailure(trs *v1beta1.TaskRunStatus, reason string, message string) {
trs.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Expand All @@ -498,7 +508,7 @@ func markStatusFailure(trs *v1beta1.TaskRunStatus, reason string, message string
})
}

// markStatusSuccess sets taskrun status to success
// markStatusSuccess sets TaskRun status to success
func markStatusSuccess(trs *v1beta1.TaskRunStatus) {
trs.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Expand Down
37 changes: 36 additions & 1 deletion pkg/pod/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func TestMakeTaskRunStatus(t *testing.T) {
}{{
desc: "empty",
podStatus: corev1.PodStatus{},

want: v1beta1.TaskRunStatus{
Status: statusRunning(),
TaskRunStatusFields: v1beta1.TaskRunStatusFields{
Expand Down Expand Up @@ -1551,6 +1550,42 @@ func TestMarkStatusSuccess(t *testing.T) {
}
}

func TestIsPodArchived(t *testing.T) {
for _, tc := range []struct {
name string
podName string
retriesStatus []v1beta1.TaskRunStatus
wantIsArchived bool
}{{
name: "Pod is not in the retriesStatus",
podName: "pod",
retriesStatus: []v1beta1.TaskRunStatus{},
wantIsArchived: false,
}, {
name: "Pod is in the retriesStatus",
podName: "pod",
retriesStatus: []v1beta1.TaskRunStatus{{
TaskRunStatusFields: v1beta1.TaskRunStatusFields{
PodName: "pod",
}},
},
wantIsArchived: true,
}} {
t.Run(tc.name, func(t *testing.T) {
trs := v1beta1.TaskRunStatus{
TaskRunStatusFields: v1beta1.TaskRunStatusFields{
PodName: "pod",
RetriesStatus: tc.retriesStatus,
},
}
gotArchived := IsPodArchived(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: tc.podName}}, &trs)
if tc.wantIsArchived != gotArchived {
t.Errorf("IsPodArchived(): %v, expect %v", gotArchived, tc.wantIsArchived)
}
})
}
}

func statusRunning() duckv1.Status {
var trs v1beta1.TaskRunStatus
markStatusRunning(&trs, v1beta1.TaskRunReasonRunning.String(), "Not all Steps in the Task have finished executing")
Expand Down
31 changes: 2 additions & 29 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,26 +850,10 @@ func (c *Reconciler) createTaskRuns(ctx context.Context, rpt *resources.Resolved
func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, params []v1beta1.Param, rpt *resources.ResolvedPipelineTask, pr *v1beta1.PipelineRun, storageBasePath string) (*v1beta1.TaskRun, error) {
logger := logging.FromContext(ctx)

tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(taskRunName)
if tr != nil {
// retry should happen only when the taskrun has failed
if !tr.Status.GetCondition(apis.ConditionSucceeded).IsFalse() {
return tr, nil
}
// Don't modify the lister cache's copy.
tr = tr.DeepCopy()
// is a retry
addRetryHistory(tr)
clearStatus(tr)
tr.Status.MarkResourceOngoing("", "")
logger.Infof("Updating taskrun %s with cleared status and retry history (length: %d).", tr.GetName(), len(tr.Status.RetriesStatus))
return c.PipelineClientSet.TektonV1beta1().TaskRuns(pr.Namespace).UpdateStatus(ctx, tr, metav1.UpdateOptions{})
}

rpt.PipelineTask = resources.ApplyPipelineTaskContexts(rpt.PipelineTask)
taskRunSpec := pr.GetTaskRunSpec(rpt.PipelineTask.Name)
params = append(params, rpt.PipelineTask.Params...)
tr = &v1beta1.TaskRun{
tr := &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: taskRunName,
Namespace: pr.Namespace,
Expand All @@ -878,6 +862,7 @@ func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, para
Annotations: combineTaskRunAndTaskSpecAnnotations(pr, rpt.PipelineTask),
},
Spec: v1beta1.TaskRunSpec{
Retries: rpt.PipelineTask.Retries,
Params: params,
ServiceAccountName: taskRunSpec.TaskServiceAccountName,
PodTemplate: taskRunSpec.TaskPodTemplate,
Expand Down Expand Up @@ -1092,18 +1077,6 @@ func combinedSubPath(workspaceSubPath string, pipelineTaskSubPath string) string
return filepath.Join(workspaceSubPath, pipelineTaskSubPath)
}

func addRetryHistory(tr *v1beta1.TaskRun) {
newStatus := *tr.Status.DeepCopy()
newStatus.RetriesStatus = nil
tr.Status.RetriesStatus = append(tr.Status.RetriesStatus, newStatus)
}

func clearStatus(tr *v1beta1.TaskRun) {
tr.Status.StartTime = nil
tr.Status.CompletionTime = nil
tr.Status.PodName = ""
}

func getTaskrunAnnotations(pr *v1beta1.PipelineRun) map[string]string {
// Propagate annotations from PipelineRun to TaskRun.
annotations := make(map[string]string, len(pr.ObjectMeta.Annotations)+1)
Expand Down
Loading

0 comments on commit ac6d523

Please sign in to comment.