Skip to content

Commit

Permalink
refactoring - pipelinerunstate
Browse files Browse the repository at this point in the history
Creating a new file with all attributes of pipelinerunstate along with the
test file.

Refactored few functions as an attributes of pipelunerunstate
  • Loading branch information
pritidesai committed Sep 15, 2020
1 parent bac2f58 commit 26211bf
Show file tree
Hide file tree
Showing 6 changed files with 1,358 additions and 1,300 deletions.
65 changes: 3 additions & 62 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
return err
}

after := resources.GetPipelineConditionStatus(pr, pipelineState, logger, d, dfinally)
after := pipelineState.GetPipelineConditionStatus(pr, logger, d, dfinally)
switch after.Status {
case corev1.ConditionTrue:
pr.Status.MarkSucceeded(after.Reason, after.Message)
Expand All @@ -480,8 +480,8 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
// Read the condition the way it was set by the Mark* helpers
after = pr.Status.GetCondition(apis.ConditionSucceeded)
pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState)
pr.Status.SkippedTasks = getSkippedTasks(pr, pipelineState, d)
pr.Status.TaskRuns = pipelineState.GetTaskRunsStatus(pr)
pr.Status.SkippedTasks = pipelineState.GetSkippedTasks(pr, d)
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}
Expand Down Expand Up @@ -566,65 +566,6 @@ func getPipelineRunResults(pipelineSpec *v1beta1.PipelineSpec, resolvedResultRef
return results
}

func getTaskRunsStatus(pr *v1beta1.PipelineRun, state []*resources.ResolvedPipelineRunTask) map[string]*v1beta1.PipelineRunTaskRunStatus {
status := make(map[string]*v1beta1.PipelineRunTaskRunStatus)
for _, rprt := range state {
if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil {
continue
}

var prtrs *v1beta1.PipelineRunTaskRunStatus
if rprt.TaskRun != nil {
prtrs = pr.Status.TaskRuns[rprt.TaskRun.Name]
}
if prtrs == nil {
prtrs = &v1beta1.PipelineRunTaskRunStatus{
PipelineTaskName: rprt.PipelineTask.Name,
}
}

if rprt.TaskRun != nil {
prtrs.Status = &rprt.TaskRun.Status
}

if len(rprt.ResolvedConditionChecks) > 0 {
cStatus := make(map[string]*v1beta1.PipelineRunConditionCheckStatus)
for _, c := range rprt.ResolvedConditionChecks {
cStatus[c.ConditionCheckName] = &v1beta1.PipelineRunConditionCheckStatus{
ConditionName: c.ConditionRegisterName,
}
if c.ConditionCheck != nil {
cStatus[c.ConditionCheckName].Status = c.NewConditionCheckStatus()
}
}
prtrs.ConditionChecks = cStatus
if rprt.ResolvedConditionChecks.IsDone() && !rprt.ResolvedConditionChecks.IsSuccess() {
if prtrs.Status == nil {
prtrs.Status = &v1beta1.TaskRunStatus{}
}
prtrs.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: resources.ReasonConditionCheckFailed,
Message: fmt.Sprintf("ConditionChecks failed for Task %s in PipelineRun %s", rprt.TaskRunName, pr.Name),
})
}
}
status[rprt.TaskRunName] = prtrs
}
return status
}

func getSkippedTasks(pr *v1beta1.PipelineRun, state []*resources.ResolvedPipelineRunTask, d *dag.Graph) []v1beta1.SkippedTask {
skipped := []v1beta1.SkippedTask{}
for _, rprt := range state {
if rprt.Skip(state, d) {
skipped = append(skipped, v1beta1.SkippedTask{Name: rprt.PipelineTask.Name})
}
}
return skipped
}

func (c *Reconciler) updateTaskRunsStatusDirectly(pr *v1beta1.PipelineRun) error {
for taskRunName := range pr.Status.TaskRuns {
// TODO(dibyom): Add conditionCheck statuses here
Expand Down
8 changes: 4 additions & 4 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ func TestUpdateTaskRunsState(t *testing.T) {
},
}

state := []*resources.ResolvedPipelineRunTask{{
state := resources.PipelineRunState{{
PipelineTask: &pipelineTask,
TaskRunName: "test-pipeline-run-success-unit-test-1",
TaskRun: taskrun,
Expand All @@ -806,7 +806,7 @@ func TestUpdateTaskRunsState(t *testing.T) {
},
}}
pr.Status.InitializeConditions()
status := getTaskRunsStatus(pr, state)
status := state.GetTaskRunsStatus(pr)
if d := cmp.Diff(status, expectedPipelineRunStatus.TaskRuns); d != "" {
t.Fatalf("Expected PipelineRun status to match TaskRun(s) status, but got a mismatch: %s", diff.PrintWantGot(d))
}
Expand Down Expand Up @@ -939,13 +939,13 @@ func TestUpdateTaskRunStateWithConditionChecks(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
pr := tb.PipelineRun("test-pipeline-run", tb.PipelineRunNamespace("foo"), tb.PipelineRunSpec("test-pipeline"))

state := []*resources.ResolvedPipelineRunTask{{
state := resources.PipelineRunState{{
PipelineTask: &pipelineTask,
TaskRunName: taskrunName,
ResolvedConditionChecks: tc.rcc,
}}
pr.Status.InitializeConditions()
status := getTaskRunsStatus(pr, state)
status := state.GetTaskRunsStatus(pr)
expected := map[string]*v1beta1.PipelineRunTaskRunStatus{
taskrunName: &tc.expectedStatus,
}
Expand Down
234 changes: 0 additions & 234 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package resources
import (
"context"
"fmt"
"reflect"
"strconv"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/apis"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
Expand Down Expand Up @@ -74,10 +71,6 @@ type ResolvedPipelineRunTask struct {
ResolvedConditionChecks TaskConditionCheckState // Could also be a TaskRun or maybe just a Pod?
}

// PipelineRunState is a slice of ResolvedPipelineRunTasks the represents the current execution
// state of the PipelineRun.
type PipelineRunState []*ResolvedPipelineRunTask

func (t ResolvedPipelineRunTask) IsDone() bool {
if t.TaskRun == nil || t.PipelineTask == nil {
return false
Expand Down Expand Up @@ -206,139 +199,6 @@ func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) boo
return false
}

// ToMap returns a map that maps pipeline task name to the resolved pipeline run task
func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineRunTask {
m := make(map[string]*ResolvedPipelineRunTask)
for _, rprt := range state {
m[rprt.PipelineTask.Name] = rprt
}
return m
}

// IsDone returns true when all pipeline tasks have respective taskRun created and
// that taskRun has either succeeded or failed after all possible retry attempts
func (state PipelineRunState) IsDone() bool {
for _, t := range state {
if !t.IsDone() {
return false
}
}
return true
}

// IsBeforeFirstTaskRun returns true if the PipelineRun has not yet started its first TaskRun
func (state PipelineRunState) IsBeforeFirstTaskRun() bool {
for _, t := range state {
if t.TaskRun != nil {
return false
}
}
return true
}

// IsStopping returns true if the PipelineRun won't be scheduling any new Task because
// at least one task already failed or was cancelled in the specified dag
func (state PipelineRunState) IsStopping(d *dag.Graph) bool {
for _, t := range state {
if isTaskInGraph(t.PipelineTask.Name, d) {
if t.IsCancelled() {
return true
}
if t.IsFailure() {
return true
}
}
}
return false
}

// GetNextTasks returns a list of tasks which should be executed next i.e.
// a list of tasks from candidateTasks which aren't yet indicated in state to be running and
// a list of cancelled/failed tasks from candidateTasks which haven't exhausted their retries
func (state PipelineRunState) GetNextTasks(candidateTasks sets.String) []*ResolvedPipelineRunTask {
tasks := []*ResolvedPipelineRunTask{}
for _, t := range state {
if _, ok := candidateTasks[t.PipelineTask.Name]; ok && t.TaskRun == nil {
tasks = append(tasks, t)
}
if _, ok := candidateTasks[t.PipelineTask.Name]; ok && t.TaskRun != nil {
status := t.TaskRun.Status.GetCondition(apis.ConditionSucceeded)
if status != nil && status.IsFalse() {
if !(t.TaskRun.IsCancelled() || status.Reason == v1beta1.TaskRunReasonCancelled.String() || status.Reason == ReasonConditionCheckFailed) {
if len(t.TaskRun.Status.RetriesStatus) < t.PipelineTask.Retries {
tasks = append(tasks, t)
}
}
}
}
}
return tasks
}

// SuccessfulOrSkippedDAGTasks returns a list of the names of all of the PipelineTasks in state
// which have successfully completed or skipped
func (state PipelineRunState) SuccessfulOrSkippedDAGTasks(d *dag.Graph) []string {
tasks := []string{}
for _, t := range state {
if isTaskInGraph(t.PipelineTask.Name, d) {
if t.IsSuccessful() || t.Skip(state, d) {
tasks = append(tasks, t.PipelineTask.Name)
}
}
}
return tasks
}

// checkTasksDone returns true if all tasks from the specified graph are finished executing
// a task is considered done if it has failed/succeeded/skipped
func (state PipelineRunState) checkTasksDone(d *dag.Graph) bool {
for _, t := range state {
if isTaskInGraph(t.PipelineTask.Name, d) {
if t.TaskRun == nil {
// this task might have skipped if taskRun is nil
// continue and ignore if this task was skipped
// skipped task is considered part of done
if t.Skip(state, d) {
continue
}
return false
}
if !t.IsDone() {
return false
}
}
}
return true
}

// GetFinalTasks returns a list of final tasks without any taskRun associated with it
// GetFinalTasks returns final tasks only when all DAG tasks have finished executing successfully or skipped or
// any one DAG task resulted in failure
func (state PipelineRunState) GetFinalTasks(d *dag.Graph, dfinally *dag.Graph) []*ResolvedPipelineRunTask {
tasks := []*ResolvedPipelineRunTask{}
finalCandidates := sets.NewString()
// check either pipeline has finished executing all DAG pipelineTasks
// or any one of the DAG pipelineTask has failed
if state.checkTasksDone(d) {
// return list of tasks with all final tasks
for _, t := range state {
if isTaskInGraph(t.PipelineTask.Name, dfinally) && !t.IsSuccessful() {
finalCandidates.Insert(t.PipelineTask.Name)
}
}
tasks = state.GetNextTasks(finalCandidates)
}
return tasks
}

// Check if a PipelineTask belongs to the specified Graph
func isTaskInGraph(pipelineTaskName string, d *dag.Graph) bool {
if _, ok := d.Nodes[pipelineTaskName]; ok {
return true
}
return false
}

// GetTaskRun is a function that will retrieve the TaskRun name.
type GetTaskRun func(name string) (*v1beta1.TaskRun, error)

Expand Down Expand Up @@ -541,100 +401,6 @@ func GetTaskRunName(taskRunsStatus map[string]*v1beta1.PipelineRunTaskRunStatus,
return names.SimpleNameGenerator.RestrictLengthWithRandomSuffix(fmt.Sprintf("%s-%s", prName, ptName))
}

// GetPipelineConditionStatus will return the Condition that the PipelineRun prName should be
// updated with, based on the status of the TaskRuns in state.
func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState, logger *zap.SugaredLogger, dag *dag.Graph, dfinally *dag.Graph) *apis.Condition {
// We have 4 different states here:
// 1. Timed out -> Failed
// 2. All tasks are done and at least one has failed or has been cancelled -> Failed
// 3. All tasks are done or are skipped (i.e. condition check failed).-> Success
// 4. A Task or Condition is running right now or there are things left to run -> Running
if pr.IsTimedOut() {
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: v1beta1.PipelineRunReasonTimedOut.String(),
Message: fmt.Sprintf("PipelineRun %q failed to finish within %q", pr.Name, pr.Spec.Timeout.Duration.String()),
}
}

allTasks := []string{}
withStatusTasks := []string{}
skipTasks := int(0)
failedTasks := int(0)
cancelledTasks := int(0)
reason := v1beta1.PipelineRunReasonSuccessful.String()

// Check to see if all tasks are success or skipped
//
// The completion reason is also calculated here, but it will only be used
// if all tasks are completed.
//
// The pipeline run completion reason is set from the taskrun completion reason
// according to the following logic:
//
// - All successful: ReasonSucceeded
// - Some successful, some skipped: ReasonCompleted
// - Some cancelled, none failed: ReasonCancelled
// - At least one failed: ReasonFailed
for _, rprt := range state {
allTasks = append(allTasks, rprt.PipelineTask.Name)
switch {
case rprt.IsSuccessful():
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
case rprt.Skip(state, dag):
skipTasks++
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
// At least one is skipped and no failure yet, mark as completed
if reason == v1beta1.PipelineRunReasonSuccessful.String() {
reason = v1beta1.PipelineRunReasonCompleted.String()
}
case rprt.IsCancelled():
cancelledTasks++
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
if reason != v1beta1.PipelineRunReasonFailed.String() {
reason = v1beta1.PipelineRunReasonCancelled.String()
}
case rprt.IsFailure():
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
failedTasks++
reason = v1beta1.PipelineRunReasonFailed.String()
}
}

if reflect.DeepEqual(allTasks, withStatusTasks) {
status := corev1.ConditionTrue
if failedTasks > 0 || cancelledTasks > 0 {
status = corev1.ConditionFalse
}
logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", pr.Name)
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: status,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d (Failed: %d, Cancelled %d), Skipped: %d",
len(allTasks)-skipTasks, failedTasks, cancelledTasks, skipTasks),
}
}

// Hasn't timed out; not all tasks have finished.... Must keep running then....
// transition pipeline into stopping state when one of the tasks(dag/final) cancelled or one of the dag tasks failed
// for a pipeline with final tasks, single dag task failure does not transition to interim stopping state
// pipeline stays in running state until all final tasks are done before transitioning to failed state
if cancelledTasks > 0 || (failedTasks > 0 && state.checkTasksDone(dfinally)) {
reason = v1beta1.PipelineRunReasonStopping.String()
} else {
reason = v1beta1.PipelineRunReasonRunning.String()
}
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d (Failed: %d, Cancelled %d), Incomplete: %d, Skipped: %d",
len(withStatusTasks)-skipTasks, failedTasks, cancelledTasks, len(allTasks)-len(withStatusTasks), skipTasks),
}
}

func resolveConditionChecks(pt *v1beta1.PipelineTask, taskRunStatus map[string]*v1beta1.PipelineRunTaskRunStatus, taskRunName string, getTaskRun resources.GetTaskRun, getCondition GetCondition, providedResources map[string]*resourcev1alpha1.PipelineResource) ([]*ResolvedConditionCheck, error) {
rccs := []*ResolvedConditionCheck{}
for i := range pt.Conditions {
Expand Down
Loading

0 comments on commit 26211bf

Please sign in to comment.