Skip to content

Commit

Permalink
Validate dependencies between resolved resources in a PipelineRun
Browse files Browse the repository at this point in the history
When a PipelineRun is started there are several error conditions that can be
hit due to invalid Pipeline configuration. These errors have previously
only surfaced halfway through execution of the PipelineRun because validating
them requires a number of different resources to be resolved.

This commit performs more extensive runtime validation of a PipelineRun before
it is allowed to start:
- All result variables used in the Pipeline are checked to be pointing at valid
Tasks and TaskResults.
-  Workspaces marked optional by the Pipeline are confirmed to also be Optional
in the Tasks they're passed to.

This validation required searching for result variables in a PipelineTask in a very
similar way we do in several other places in our codebase. I've refactored all of these
to use a common func `PipelineTaskResultRefs()`.
  • Loading branch information
Scott authored and tekton-robot committed Jan 28, 2021
1 parent 1eca890 commit 08177fa
Show file tree
Hide file tree
Showing 10 changed files with 804 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ spec:
taskSpec:
workspaces:
- name: message-of-the-day
optional: true
steps:
- image: alpine
script: |
Expand Down
32 changes: 4 additions & 28 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,37 +200,13 @@ func (pt PipelineTask) resourceDeps() []string {
for _, rd := range cond.Resources {
resourceDeps = append(resourceDeps, rd.From...)
}
for _, param := range cond.Params {
expressions, ok := GetVarSubstitutionExpressionsForParam(param)
if ok {
resultRefs := NewResultRefs(expressions)
for _, resultRef := range resultRefs {
resourceDeps = append(resourceDeps, resultRef.PipelineTask)
}
}
}
}

// Add any dependents from task results
for _, param := range pt.Params {
expressions, ok := GetVarSubstitutionExpressionsForParam(param)
if ok {
resultRefs := NewResultRefs(expressions)
for _, resultRef := range resultRefs {
resourceDeps = append(resourceDeps, resultRef.PipelineTask)
}
}
}
// Add any dependents from when expressions
for _, whenExpression := range pt.WhenExpressions {
expressions, ok := whenExpression.GetVarSubstitutionExpressions()
if ok {
resultRefs := NewResultRefs(expressions)
for _, resultRef := range resultRefs {
resourceDeps = append(resourceDeps, resultRef.PipelineTask)
}
}
// Add any dependents from result references.
for _, ref := range PipelineTaskResultRefs(&pt) {
resourceDeps = append(resourceDeps, ref.PipelineTask)
}

return resourceDeps
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/apis/pipeline/v1beta1/resultref.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,27 @@ func parseExpression(substitutionExpression string) (string, string, error) {
}
return subExpressions[1], subExpressions[3], nil
}

// PipelineTaskResultRefs walks all the places a result reference can be used
// in a PipelineTask and returns a list of any references that are found.
func PipelineTaskResultRefs(pt *PipelineTask) []*ResultRef {
refs := []*ResultRef{}
for _, condition := range pt.Conditions {
for _, p := range condition.Params {
expressions, _ := GetVarSubstitutionExpressionsForParam(p)
refs = append(refs, NewResultRefs(expressions)...)
}
}

for _, p := range pt.Params {
expressions, _ := GetVarSubstitutionExpressionsForParam(p)
refs = append(refs, NewResultRefs(expressions)...)
}

for _, whenExpression := range pt.WhenExpressions {
expressions, _ := whenExpression.GetVarSubstitutionExpressions()
refs = append(refs, NewResultRefs(expressions)...)
}

return refs
}
41 changes: 41 additions & 0 deletions pkg/apis/pipeline/v1beta1/resultref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,3 +553,44 @@ func TestLooksLikeResultRefWhenExpressionFalse(t *testing.T) {
})
}
}

// TestPipelineTaskResultReferences tests that PipelineTaskResultRefs()
// parses all the result variables used in a PipelineTask correctly and
// returns them all in the expected order.
func TestPipelineTaskResultRefs(t *testing.T) {
pt := v1beta1.PipelineTask{
Conditions: []v1beta1.PipelineTaskCondition{{
Params: []v1beta1.Param{{
Name: "foo",
Value: *v1beta1.NewArrayOrString("$(tasks.pt1.results.r1)"),
}},
}},
Params: []v1beta1.Param{{
Value: *v1beta1.NewArrayOrString("$(tasks.pt2.results.r2)"),
}},
WhenExpressions: []v1beta1.WhenExpression{{
Input: "$(tasks.pt3.results.r3)",
Operator: selection.In,
Values: []string{
"$(tasks.pt4.results.r4)",
},
}},
}
refs := v1beta1.PipelineTaskResultRefs(&pt)
expectedRefs := []*v1beta1.ResultRef{{
PipelineTask: "pt1",
Result: "r1",
}, {
PipelineTask: "pt2",
Result: "r2",
}, {
PipelineTask: "pt3",
Result: "r3",
}, {
PipelineTask: "pt4",
Result: "r4",
}}
if d := cmp.Diff(refs, expectedRefs); d != "" {
t.Errorf("%v", d)
}
}
21 changes: 21 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ const (
// ReasonInvalidTaskResultReference indicates a task result was declared
// but was not initialized by that task
ReasonInvalidTaskResultReference = "InvalidTaskResultReference"
// ReasonRequiredWorkspaceMarkedOptional indicates an optional workspace
// has been passed to a Task that is expecting a non-optional workspace
ReasonRequiredWorkspaceMarkedOptional = "RequiredWorkspaceMarkedOptional"
)

// Reconciler implements controller.Reconciler for Configuration resources.
Expand Down Expand Up @@ -502,6 +505,24 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
}

if pipelineRunFacts.State.IsBeforeFirstTaskRun() {
if err := resources.ValidatePipelineTaskResults(pipelineRunFacts.State); err != nil {
logger.Errorf("Failed to resolve task result reference for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(ReasonInvalidTaskResultReference, err.Error())
return controller.NewPermanentError(err)
}

if err := resources.ValidatePipelineResults(pipelineSpec, pipelineRunFacts.State); err != nil {
logger.Errorf("Failed to resolve task result reference for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(ReasonInvalidTaskResultReference, err.Error())
return controller.NewPermanentError(err)
}

if err := resources.ValidateOptionalWorkspaces(pipelineSpec.Workspaces, pipelineRunFacts.State); err != nil {
logger.Errorf("Optional workspace not supported by task: %v", err)
pr.Status.MarkFailed(ReasonRequiredWorkspaceMarkedOptional, err.Error())
return controller.NewPermanentError(err)
}

if pr.HasVolumeClaimTemplate() {
// create workspace PVC from template
if err = c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(ctx, pr.Spec.Workspaces, pr.GetOwnerReference(), pr.Namespace); err != nil {
Expand Down
147 changes: 147 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5153,6 +5153,153 @@ func TestReconcile_OptionalWorkspacesOmitted(t *testing.T) {
}
}

func TestReconcile_DependencyValidationsImmediatelyFailPipelineRun(t *testing.T) {
names.TestingSeed()

ctx := context.Background()
cfg := config.NewStore(logtesting.TestLogger(t))
ctx = cfg.ToContext(ctx)

prs := []*v1beta1.PipelineRun{{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerun-param-invalid-result-variable",
Namespace: "foo",
},
Spec: v1beta1.PipelineRunSpec{
ServiceAccountName: "test-sa",
PipelineSpec: &v1beta1.PipelineSpec{
Tasks: []v1beta1.PipelineTask{{
Name: "pt0",
TaskSpec: &v1beta1.EmbeddedTask{TaskSpec: v1beta1.TaskSpec{
Steps: []v1beta1.Step{{
Container: corev1.Container{
Image: "foo:latest",
},
}},
}},
}, {
Name: "pt1",
Params: []v1beta1.Param{{
Name: "p",
Value: *v1beta1.NewArrayOrString("$(tasks.pt0.results.r1)"),
}},
TaskSpec: &v1beta1.EmbeddedTask{TaskSpec: v1beta1.TaskSpec{
Params: []v1beta1.ParamSpec{{
Name: "p",
}},
Steps: []v1beta1.Step{{
Container: corev1.Container{
Image: "foo:latest",
},
}},
}},
}},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerun-pipeline-result-invalid-result-variable",
Namespace: "foo",
},
Spec: v1beta1.PipelineRunSpec{
ServiceAccountName: "test-sa",
PipelineSpec: &v1beta1.PipelineSpec{
Tasks: []v1beta1.PipelineTask{{
Name: "pt0",
TaskSpec: &v1beta1.EmbeddedTask{TaskSpec: v1beta1.TaskSpec{
Steps: []v1beta1.Step{{
Container: corev1.Container{
Image: "foo:latest",
},
}},
}},
}, {
Name: "pt1",
TaskSpec: &v1beta1.EmbeddedTask{TaskSpec: v1beta1.TaskSpec{
Steps: []v1beta1.Step{{
Container: corev1.Container{
Image: "foo:latest",
},
}},
}},
}},
Results: []v1beta1.PipelineResult{{
Name: "pr",
Value: "$(tasks.pt0.results.r)",
}},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerun-with-optional-workspace-validation",
Namespace: "foo",
},
Spec: v1beta1.PipelineRunSpec{
ServiceAccountName: "test-sa",
PipelineSpec: &v1beta1.PipelineSpec{
Workspaces: []v1beta1.PipelineWorkspaceDeclaration{{
Name: "optional-workspace",
Optional: true,
}},
Tasks: []v1beta1.PipelineTask{{
Name: "unit-test-1",
Workspaces: []v1beta1.WorkspacePipelineTaskBinding{{
Name: "ws",
Workspace: "optional-workspace",
}},
TaskSpec: &v1beta1.EmbeddedTask{TaskSpec: v1beta1.TaskSpec{
Workspaces: []v1beta1.WorkspaceDeclaration{{
Name: "ws",
Optional: false,
}},
Steps: []v1beta1.Step{{
Container: corev1.Container{
Image: "foo:latest",
},
}},
}},
}},
},
},
}}

d := test.Data{
PipelineRuns: prs,
ServiceAccounts: []*corev1.ServiceAccount{{
ObjectMeta: metav1.ObjectMeta{Name: prs[0].Spec.ServiceAccountName, Namespace: "foo"},
}},
}

prt := NewPipelineRunTest(d, t)
defer prt.Cancel()

run1, _ := prt.reconcileRun("foo", "pipelinerun-param-invalid-result-variable", nil, true)
run2, _ := prt.reconcileRun("foo", "pipelinerun-pipeline-result-invalid-result-variable", nil, true)
run3, _ := prt.reconcileRun("foo", "pipelinerun-with-optional-workspace-validation", nil, true)

cond1 := run1.Status.GetCondition(apis.ConditionSucceeded)
cond2 := run2.Status.GetCondition(apis.ConditionSucceeded)
cond3 := run3.Status.GetCondition(apis.ConditionSucceeded)

for _, c := range []*apis.Condition{cond1, cond2, cond3} {
if c.Status != corev1.ConditionFalse {
t.Errorf("expected Succeeded/False condition but saw: %v", c)
}
}

if cond1.Reason != ReasonInvalidTaskResultReference {
t.Errorf("expected invalid task reference condition but saw: %v", cond1)
}

if cond2.Reason != ReasonInvalidTaskResultReference {
t.Errorf("expected invalid task reference condition but saw: %v", cond2)
}

if cond3.Reason != ReasonRequiredWorkspaceMarkedOptional {
t.Errorf("expected optional workspace not supported condition but saw: %v", cond3)
}
}

func getTaskRunWithTaskSpec(tr, pr, p, t string, labels, annotations map[string]string) *v1beta1.TaskRun {
return tb.TaskRun(tr,
tb.TaskRunNamespace("foo"),
Expand Down
56 changes: 7 additions & 49 deletions pkg/reconciler/pipelinerun/resources/resultrefresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,65 +113,23 @@ func removeDup(refs ResolvedResultRefs) ResolvedResultRefs {
return deduped
}

// convertToResultRefs replaces result references for all params and when expressions of the resolved pipeline run task
// convertToResultRefs walks a PipelineTask looking for result references. If any are
// found they are resolved to a value by searching pipelineRunState. The list of resolved
// references are returned. If an error is encountered due to an invalid result reference
// then a nil list and error is returned instead.
func convertToResultRefs(pipelineRunState PipelineRunState, target *ResolvedPipelineRunTask) (ResolvedResultRefs, error) {
var resolvedResultRefs ResolvedResultRefs
for _, condition := range target.PipelineTask.Conditions {
condRefs, err := convertParams(condition.Params, pipelineRunState, condition.ConditionRef)
for _, ref := range v1beta1.PipelineTaskResultRefs(target.PipelineTask) {
resolved, err := resolveResultRef(pipelineRunState, ref)
if err != nil {
return nil, err
}
resolvedResultRefs = append(resolvedResultRefs, condRefs...)
resolvedResultRefs = append(resolvedResultRefs, resolved)
}

taskParamsRefs, err := convertParams(target.PipelineTask.Params, pipelineRunState, target.PipelineTask.Name)
if err != nil {
return nil, err
}
resolvedResultRefs = append(resolvedResultRefs, taskParamsRefs...)

taskWhenExpressionsRefs, err := convertWhenExpressions(target.PipelineTask.WhenExpressions, pipelineRunState, target.PipelineTask.Name)
if err != nil {
return nil, err
}
resolvedResultRefs = append(resolvedResultRefs, taskWhenExpressionsRefs...)

return resolvedResultRefs, nil
}

func convertParams(params []v1beta1.Param, pipelineRunState PipelineRunState, name string) (ResolvedResultRefs, error) {
var resolvedParams ResolvedResultRefs
for _, param := range params {
resolvedResultRefs, err := extractResultRefsForParam(pipelineRunState, param)
if err != nil {
return nil, fmt.Errorf("unable to find result referenced by param %q in %q: %w", param.Name, name, err)
}
if resolvedResultRefs != nil {
resolvedParams = append(resolvedParams, resolvedResultRefs...)
}
}
return resolvedParams, nil
}

func convertWhenExpressions(whenExpressions []v1beta1.WhenExpression, pipelineRunState PipelineRunState, name string) (ResolvedResultRefs, error) {
var resolvedWhenExpressions ResolvedResultRefs
for _, whenExpression := range whenExpressions {
expressions, ok := whenExpression.GetVarSubstitutionExpressions()
if ok {
resolvedResultRefs, err := extractResultRefs(expressions, pipelineRunState)
if err != nil {
return nil, fmt.Errorf("unable to find result referenced by when expression with input %q in task %q: %w", whenExpression.GetInput(), name, err)
}
if resolvedResultRefs != nil {
resolvedWhenExpressions = append(resolvedWhenExpressions, resolvedResultRefs...)
}
}
}
return resolvedWhenExpressions, nil
}

func resolveResultRef(pipelineState PipelineRunState, resultRef *v1beta1.ResultRef) (*ResolvedResultRef, error) {

referencedPipelineTask := pipelineState.ToMap()[resultRef.PipelineTask]
if referencedPipelineTask == nil {
return nil, fmt.Errorf("could not find task %q referenced by result", resultRef.PipelineTask)
Expand Down
Loading

0 comments on commit 08177fa

Please sign in to comment.