Skip to content

Commit

Permalink
Change ordering of ResolveResultRefs/ApplyTaskResults and verify a ta…
Browse files Browse the repository at this point in the history
…sk actually produces results

This refactor changes the ordering of when we resolve references to results and apply task results within the reconciler. Prior to this change, we were waiting until `RunNextScheduableTask()` to resolve and apply task results. This becomes an issue when trying to consume whole array results in a Matrix since the TaskRuns are created prior to `RunNextScheduableTask()` and the number of TaskRuns before resolving references is nondeterministic [*]. Consuming whole array results is a feature required in order to promote Matrix to beta and will be implemented in a subsequent PR.

In this commit, we move this logic before `RunNextScheduableTask()` into `ResolvePipelineTask()` and resolve and apply the task results upfront so that by the time we go to create the TaskRuns, all of the replacements have been applied and we know exactly how many TaskRuns to create. Changing the ordering of the reconciler logic means we also have to change some of the error handling with ResolveResultRefs because we currently fail a PipelineRun if any of the result values are missing, however this could be a valid PR if it's a skipped Task due to `MissingResultsSkip`. This commit removes some of the initial errors that would cause the PipelineRun to fail the first time we ResolveResultRefs so that we can mark all of the task that are supposed to be skipped as skipped.

Lastly, we add validation `verifyResultsProduced()` to ensure that any results defined within a PipelineSpec are actually produced. An example of this is in `TestMissingResultWhenStepErrorIsIgnored()`when Task 1 produces Result A successfully and fails to produce Result B, but has onError continue. Task 2 consumes the results from Task 1, which results in the Failed PipelineRun, and one TaskRun created.
  • Loading branch information
EmmaMunley committed Jun 12, 2023
1 parent 98d9ac7 commit d2ecb93
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 73 deletions.
50 changes: 38 additions & 12 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ const (
// ReasonParameterMissing indicates that the reason for the failure status is that the
// associated PipelineRun didn't provide all the required parameters
ReasonParameterMissing = "ParameterMissing"
// ReasonResultMissing indicates that the reason for the failure status is that the
// associated PipelineRun didn't produce the results that was declared in the PipelineSpec
ReasonResultMissing = "ReasonResultMissing"
// ReasonFailedValidation indicates that the reason for failure status is
// that pipelinerun failed runtime validation
ReasonFailedValidation = "PipelineValidationFailed"
Expand Down Expand Up @@ -349,6 +352,7 @@ func (c *Reconciler) resolvePipelineState(
},
getCustomRunFunc,
task,
pst,
)
if err != nil {
if tresources.IsGetTaskErrTransient(err) {
Expand Down Expand Up @@ -717,18 +721,6 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
return controller.NewPermanentError(err)
}

resolvedResultRefs, _, err := resources.ResolveResultRefs(pipelineRunFacts.State, nextRpts)
if err != nil {
logger.Infof("Failed to resolve task result reference for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(ReasonInvalidTaskResultReference, err.Error())
return controller.NewPermanentError(err)
}

resources.ApplyTaskResults(nextRpts, resolvedResultRefs)
// After we apply Task Results, we may be able to evaluate more
// when expressions, so reset the skipped cache
pipelineRunFacts.ResetSkippedCache()

// GetFinalTasks only returns final tasks when a DAG is complete
fNextRpts := pipelineRunFacts.GetFinalTasks()
if len(fNextRpts) != 0 {
Expand All @@ -752,6 +744,15 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
if rpt.IsFinalTask(pipelineRunFacts) {
c.setFinallyStartedTimeIfNeeded(pr, pipelineRunFacts)
}

// Check for Missing Result References
err := rpt.MissingResultReferences(pipelineRunFacts)
if err != nil {
logger.Infof("Failed to resolve task result reference for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(ReasonInvalidTaskResultReference, err.Error())
return controller.NewPermanentError(err)
}

if rpt == nil || rpt.Skip(pipelineRunFacts).IsSkipped || rpt.IsFinallySkipped(pipelineRunFacts).IsSkipped {
continue
}
Expand Down Expand Up @@ -1409,6 +1410,31 @@ func updatePipelineRunStatusFromChildRefs(logger *zap.SugaredLogger, pr *v1beta1
pr.Status.ChildReferences = newChildRefs
}

// verifyResultsProduced checks that any results defined within a PipelineSpec are actually
// produced. If the TaskRun that produces a results ends up failing, the result will never
// be produced so this ensures that all of the results defined are actually emitted.
func verifyResultsProduced(pipelineRunState resources.PipelineRunState, taskRunResults map[string][]v1beta1.TaskRunResult) error {
if len(taskRunResults) > 0 {
for _, rpt := range pipelineRunState {
name := rpt.PipelineTask.Name
if rpt.PipelineTask.TaskSpec != nil && len(rpt.PipelineTask.TaskSpec.TaskSpec.Results) > 0 {
for _, result := range rpt.PipelineTask.TaskSpec.TaskSpec.Results {
if taskResults, ok := taskRunResults[name]; ok {
for _, taskResult := range taskResults {
if taskResult.Name != result.Name {
err := fmt.Errorf("Could not find result with name %s for task %s", result.Name, rpt.PipelineTask.Name)
return err
}
}
}
}
}
}
}

return nil
}

// conditionFromVerificationResult returns the ConditionTrustedResourcesVerified condition based on the VerificationResult, err is returned when the VerificationResult type is VerificationError
func conditionFromVerificationResult(verificationResult *trustedresources.VerificationResult, pr *v1beta1.PipelineRun, resourceName string) (*apis.Condition, error) {
var condition *apis.Condition
Expand Down
173 changes: 173 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,179 @@ spec:
}
}

func TestMissingResultWhenStepErrorIsIgnored(t *testing.T) {
prs := []*v1beta1.PipelineRun{parse.MustParseV1beta1PipelineRun(t, `
metadata:
name: test-pipeline-missing-results
namespace: foo
spec:
serviceAccountName: test-sa-0
pipelineSpec:
tasks:
- name: task1
taskSpec:
results:
- name: result1
type: string
- name: result2
type: string
steps:
- name: failing-step
onError: continue
image: busybox
script: 'echo -n 123 | tee $(results.result1.path); exit 1; echo -n 456 | tee $(results.result2.path)'
- name: task2
runAfter: [ task1 ]
params:
- name: param1
value: $(tasks.task1.results.result1)
- name: param2
value: $(tasks.task1.results.result2)
taskSpec:
params:
- name: param1
type: string
- name: param2
type: string
steps:
- name: foo
image: busybox
script: 'exit 0'
`)}

trs := []*v1beta1.TaskRun{mustParseTaskRunWithObjectMeta(t,
taskRunObjectMeta("test-pipeline-missing-results-task1", "foo",
"test-pipeline-missing-results", "test-pipeline", "task1", true),
`
spec:
serviceAccountName: test-sa
timeout: 1h0m0s
status:
conditions:
- status: "True"
type: Succeeded
taskResults:
- name: result1
value: 123
`)}

expectedPipelineRun :=
parse.MustParseV1beta1PipelineRun(t, `
metadata:
name: test-pipeline-missing-results
namespace: foo
annotations: {}
labels:
tekton.dev/pipeline: test-pipeline-missing-results
spec:
serviceAccountName: test-sa-0
pipelineSpec:
tasks:
- name: task1
taskSpec:
results:
- name: result1
type: string
- name: result2
type: string
steps:
- name: failing-step
onError: continue
image: busybox
script: 'echo -n 123 | tee $(results.result1.path); exit 1; echo -n 456 | tee $(results.result2.path)'
- name: task2
runAfter: [ task1 ]
params:
- name: param1
value: $(tasks.task1.results.result1)
- name: param2
value: $(tasks.task1.results.result2)
taskSpec:
params:
- name: param1
type: string
- name: param2
type: string
steps:
- name: foo
image: busybox
script: 'exit 0'
status:
pipelineSpec:
tasks:
- name: task1
taskSpec:
results:
- name: result1
type: string
- name: result2
type: string
steps:
- name: failing-step
onError: continue
image: busybox
script: 'echo -n 123 | tee $(results.result1.path); exit 1; echo -n 456 | tee $(results.result2.path)'
- name: task2
runAfter: [ task1 ]
params:
- name: param1
value: $(tasks.task1.results.result1)
- name: param2
value: $(tasks.task1.results.result2)
taskSpec:
params:
- name: param1
type: string
- name: param2
type: string
steps:
- name: foo
image: busybox
script: 'exit 0'
conditions:
- message: "Invalid task result reference: Could not find result with name result2 for task task1"
reason: InvalidTaskResultReference
status: "False"
type: Succeeded
childReferences:
- apiVersion: tekton.dev/v1beta1
kind: TaskRun
name: test-pipeline-missing-results-task1
pipelineTaskName: task1
provenance:
featureFlags:
RunningInEnvWithInjectedSidecars: true
EnableAPIFields: "beta"
AwaitSidecarReadiness: true
VerificationNoMatchPolicy: "ignore"
EnableProvenanceInStatus: true
ResultExtractionMethod: "termination-message"
MaxResultSize: 4096
`)
d := test.Data{
PipelineRuns: prs,
TaskRuns: trs,
}
prt := newPipelineRunTest(t, d)
defer prt.Cancel()

reconciledRun, clients := prt.reconcileRun("foo", "test-pipeline-missing-results", []string{}, true)
if reconciledRun.Status.CompletionTime == nil {
t.Errorf("Expected a CompletionTime on invalid PipelineRun but was nil")
}

// The PipelineRun should be marked as failed due to InvalidTaskResultReference.
if d := cmp.Diff(reconciledRun, expectedPipelineRun, ignoreResourceVersion, ignoreLastTransitionTime, ignoreTypeMeta, ignoreStartTime, ignoreCompletionTime); d != "" {
t.Errorf("Expected to see PipelineRun run marked as failed with the reason: InvalidTaskResultReference. Diff %s", diff.PrintWantGot(d))
}

// Check that the expected TaskRun was created
taskRuns := getTaskRunsForPipelineRun(prt.TestAssets.Ctx, t, clients, "foo", "test-pipeline-missing-results")

// We expect only 1 TaskRun to be created, since the PipelineRun should fail before creating the 2nd TaskRun due to the ReasonResultMissing
validateTaskRunsCount(t, taskRuns, 1)
}

func TestReconcile_InvalidPipelineRunNames(t *testing.T) {
// TestReconcile_InvalidPipelineRunNames runs "Reconcile" on several PipelineRuns that have invalid names.
// It verifies that reconcile fails, how it fails and which events are triggered.
Expand Down
25 changes: 25 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ const (
ReasonConditionCheckFailed = "ConditionCheckFailed"
)

var (
// ErrInvalidTaskResultReference indicates that the reason for the failure status is that there
// is an invalid task result reference
ErrInvalidTaskResultReference = errors.New("Invalid task result reference")
)

// TaskSkipStatus stores whether a task was skipped and why
type TaskSkipStatus struct {
IsSkipped bool
Expand Down Expand Up @@ -345,6 +351,17 @@ func (t *ResolvedPipelineTask) Skip(facts *PipelineRunFacts) TaskSkipStatus {
return facts.SkipCache[t.PipelineTask.Name]
}

// MissingResultReferences returns an error if it is missing result references
func (t *ResolvedPipelineTask) MissingResultReferences(facts *PipelineRunFacts) error {
if t.checkParentsDone(facts) && t.hasResultReferences() {
_, _, err := ResolveResultRefs(facts.State, PipelineRunState{t})
if err != nil {
return err
}
}
return nil
}

// skipBecauseWhenExpressionsEvaluatedToFalse confirms that the when expressions have completed evaluating, and
// it returns true if any of the when expressions evaluate to false
func (t *ResolvedPipelineTask) skipBecauseWhenExpressionsEvaluatedToFalse(facts *PipelineRunFacts) bool {
Expand Down Expand Up @@ -541,12 +558,20 @@ func ResolvePipelineTask(
getTaskRun resources.GetTaskRun,
getRun GetRun,
pipelineTask v1beta1.PipelineTask,
pst PipelineRunState,
) (*ResolvedPipelineTask, error) {
rpt := ResolvedPipelineTask{
PipelineTask: &pipelineTask,
}
rpt.CustomTask = rpt.PipelineTask.TaskRef.IsCustomTask() || rpt.PipelineTask.TaskSpec.IsCustomTask()
numCombinations := 1

// We want to resolve all of the result references and ignore any errors at this point since there could be
// instances where result references are missing here, but will be later skipped or resolved in a subsequent
// TaskRun. The final validation is handled in skipBecauseResultReferencesAreMissing.
resolvedResultRefs, _, _ := ResolveResultRefs(pst, PipelineRunState{&rpt})
ApplyTaskResults(PipelineRunState{&rpt}, resolvedResultRefs)

if rpt.PipelineTask.IsMatrixed() {
numCombinations = pipelineTask.Matrix.CountCombinations()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2512,7 +2512,7 @@ func TestResolvePipelineRun_CustomTask(t *testing.T) {
cfg := config.NewStore(logtesting.TestLogger(t))
ctx = cfg.ToContext(ctx)
for _, task := range pts {
ps, err := ResolvePipelineTask(ctx, pr, nopGetTask, nopGetTaskRun, getRun, task)
ps, err := ResolvePipelineTask(ctx, pr, nopGetTask, nopGetTaskRun, getRun, task, nil)
if err != nil {
t.Fatalf("ResolvePipelineTask: %v", err)
}
Expand Down Expand Up @@ -2563,7 +2563,7 @@ func TestResolvePipelineRun_PipelineTaskHasNoResources(t *testing.T) {
}
pipelineState := PipelineRunState{}
for _, task := range pts {
ps, err := ResolvePipelineTask(context.Background(), pr, getTask, getTaskRun, nopGetCustomRun, task)
ps, err := ResolvePipelineTask(context.Background(), pr, getTask, getTaskRun, nopGetCustomRun, task, nil)
if err != nil {
t.Errorf("Error getting tasks for fake pipeline %s: %s", p.ObjectMeta.Name, err)
}
Expand Down Expand Up @@ -2614,7 +2614,7 @@ func TestResolvePipelineRun_TaskDoesntExist(t *testing.T) {
},
}
for _, pt := range pts {
_, err := ResolvePipelineTask(context.Background(), pr, getTask, getTaskRun, nopGetCustomRun, pt)
_, err := ResolvePipelineTask(context.Background(), pr, getTask, getTaskRun, nopGetCustomRun, pt, nil)
var tnf *TaskNotFoundError
switch {
case err == nil:
Expand Down Expand Up @@ -2654,7 +2654,7 @@ func TestResolvePipelineRun_VerificationFailed(t *testing.T) {
},
}
for _, pt := range pts {
rt, _ := ResolvePipelineTask(context.Background(), pr, getTask, getTaskRun, nopGetCustomRun, pt)
rt, _ := ResolvePipelineTask(context.Background(), pr, getTask, getTaskRun, nopGetCustomRun, pt, nil)
if d := cmp.Diff(verificationResult, rt.ResolvedTask.VerificationResult, cmpopts.EquateErrors()); d != "" {
t.Errorf(diff.PrintWantGot(d))
}
Expand Down Expand Up @@ -2898,7 +2898,7 @@ func TestResolvePipeline_WhenExpressions(t *testing.T) {
}

t.Run("When Expressions exist", func(t *testing.T) {
_, err := ResolvePipelineTask(context.Background(), pr, getTask, getTaskRun, nopGetCustomRun, pt)
_, err := ResolvePipelineTask(context.Background(), pr, getTask, getTaskRun, nopGetCustomRun, pt, nil)
if err != nil {
t.Fatalf("Did not expect error when resolving PipelineRun: %v", err)
}
Expand Down Expand Up @@ -3000,7 +3000,7 @@ func TestIsCustomTask(t *testing.T) {
ctx := context.Background()
cfg := config.NewStore(logtesting.TestLogger(t))
ctx = cfg.ToContext(ctx)
rpt, err := ResolvePipelineTask(ctx, pr, getTask, getTaskRun, getRun, tc.pt)
rpt, err := ResolvePipelineTask(ctx, pr, getTask, getTaskRun, getRun, tc.pt, nil)
if err != nil {
t.Fatalf("Did not expect error when resolving PipelineRun: %v", err)
}
Expand Down Expand Up @@ -3743,7 +3743,7 @@ func TestIsMatrixed(t *testing.T) {
},
})
ctx = cfg.ToContext(ctx)
rpt, err := ResolvePipelineTask(ctx, pr, getTask, getTaskRun, getRun, tc.pt)
rpt, err := ResolvePipelineTask(ctx, pr, getTask, getTaskRun, getRun, tc.pt, nil)
if err != nil {
t.Fatalf("Did not expect error when resolving PipelineRun: %v", err)
}
Expand Down Expand Up @@ -3851,7 +3851,7 @@ func TestResolvePipelineRunTask_WithMatrix(t *testing.T) {
},
})
ctx = cfg.ToContext(ctx)
rpt, err := ResolvePipelineTask(ctx, pr, getTask, getTaskRun, getRun, tc.pt)
rpt, err := ResolvePipelineTask(ctx, pr, getTask, getTaskRun, getRun, tc.pt, nil)
if err != nil {
t.Fatalf("Did not expect error when resolving PipelineRun: %v", err)
}
Expand Down Expand Up @@ -3971,7 +3971,7 @@ func TestResolvePipelineRunTask_WithMatrixedCustomTask(t *testing.T) {
if tc.getRun == nil {
tc.getRun = getRun
}
rpt, err := ResolvePipelineTask(ctx, pr, getTask, getTaskRun, tc.getRun, tc.pt)
rpt, err := ResolvePipelineTask(ctx, pr, getTask, getTaskRun, tc.getRun, tc.pt, nil)
if err != nil {
t.Fatalf("Did not expect error when resolving PipelineRun: %v", err)
}
Expand Down
Loading

0 comments on commit d2ecb93

Please sign in to comment.