Skip to content

Commit

Permalink
Introducing InternalTektonResultType as a ResultType
Browse files Browse the repository at this point in the history
In light of #3087 the need for a ResultType that is not exposed
as a TaskRunResult or PipelineResourceResult arises.
In #3087, a Step can emit a result indicating a Step timeout
has occurred. This is a result that should not be exposed hence
the need for a new ResultType called InternalTektonResultType.
This commit ensures results of this type are filtered out.

Introducing an InternalTektonResultType ensures a future proof
solution to internal results that should not be exposed.
Aside from the example in #3087, a present candidate is the
result written out by a Step containing a "StartedAt" key.
Currently this result is filtered out with a specific function.
Marking it as an InternalTektonResultType now allows for
this result to automatically be filtered out.

Additionally this commit brings about refactoring (and sometimes
renaming) of functions related to converting pod statuses to
taskrun statuses from pkg/reconciler/taskrun/taskrun.go to
pkg/pod/status/status.go. This is accompanied with moving unit
test cases from taskrun_test.go to status_test.go.
  • Loading branch information
Peaorl authored and tekton-robot committed Sep 22, 2020
1 parent d59d594 commit a9e7b54
Show file tree
Hide file tree
Showing 13 changed files with 540 additions and 423 deletions.
4 changes: 2 additions & 2 deletions cmd/git-init/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func main() {
{
Key: "commit",
Value: commit,
ResourceRef: v1beta1.PipelineResourceRef{
ResourceRef: &v1beta1.PipelineResourceRef{
Name: resourceName,
},
ResourceName: resourceName,
},
{
Key: "url",
Value: fetchSpec.URL,
ResourceRef: v1beta1.PipelineResourceRef{
ResourceRef: &v1beta1.PipelineResourceRef{
Name: resourceName,
},
ResourceName: resourceName,
Expand Down
4 changes: 2 additions & 2 deletions cmd/imagedigestexporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ func main() {
Key: "digest",
Value: digest.String(),
ResourceName: imageResource.Name,
ResourceRef: v1beta1.PipelineResourceRef{
ResourceRef: &v1beta1.PipelineResourceRef{
Name: imageResource.Name,
},
})
output = append(output, v1beta1.PipelineResourceResult{
Key: "url",
Value: imageResource.URL,
ResourceName: imageResource.Name,
ResourceRef: v1beta1.PipelineResourceRef{
ResourceRef: &v1beta1.PipelineResourceRef{
Name: imageResource.Name,
},
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/pipeline/v1beta1/resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ type PipelineResourceResult struct {
Key string `json:"key"`
Value string `json:"value"`
ResourceName string `json:"resourceName,omitempty"`
// This field should be deprecated and removed in the next API version.
// The field ResourceRef should be deprecated and removed in the next API version.
// See https://github.com/tektoncd/pipeline/issues/2694 for more information.
ResourceRef PipelineResourceRef `json:"resourceRef,omitempty"`
ResultType ResultType `json:"type,omitempty"`
ResourceRef *PipelineResourceRef `json:"resourceRef,omitempty"`
ResultType ResultType `json:"type,omitempty"`
}

// ResultType used to find out whether a PipelineResourceResult is from a task result or not
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/pipeline/v1beta1/task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
TaskRunResultType ResultType = "TaskRunResult"
// PipelineResourceResultType default pipeline result value
PipelineResourceResultType ResultType = "PipelineResourceResult"
// InternalTektonResultType default internal tekton result value
InternalTektonResultType ResultType = "InternalTektonResult"
// UnknownResultType default unknown result type value
UnknownResultType ResultType = ""
)
Expand Down
10 changes: 8 additions & 2 deletions pkg/apis/pipeline/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pkg/entrypoint/entrypointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func (e Entrypointer) Go() error {
// *but* we write postfile to make next steps bail too.
e.WritePostFile(e.PostFile, err)
output = append(output, v1beta1.PipelineResourceResult{
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
ResultType: v1beta1.InternalTektonResultType,
})

return err
Expand All @@ -114,8 +115,9 @@ func (e Entrypointer) Go() error {
e.Args = append([]string{e.Entrypoint}, e.Args...)
}
output = append(output, v1beta1.PipelineResourceResult{
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
ResultType: v1beta1.InternalTektonResultType,
})

err := e.Runner.Run(e.Args...)
Expand Down
185 changes: 137 additions & 48 deletions pkg/pod/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

"github.com/hashicorp/go-multierror"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/names"
"github.com/tektoncd/pipeline/pkg/termination"
Expand Down Expand Up @@ -97,89 +98,177 @@ func SidecarsReady(podStatus corev1.PodStatus) bool {
}

// MakeTaskRunStatus returns a TaskRunStatus based on the Pod's status.
func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod, taskSpec v1beta1.TaskSpec) v1beta1.TaskRunStatus {
func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod, taskSpec v1beta1.TaskSpec) (v1beta1.TaskRunStatus, error) {
trs := &tr.Status
if trs.GetCondition(apis.ConditionSucceeded) == nil || trs.GetCondition(apis.ConditionSucceeded).Status == corev1.ConditionUnknown {
// If the taskRunStatus doesn't exist yet, it's because we just started running
MarkStatusRunning(trs, v1beta1.TaskRunReasonRunning.String(), "Not all Steps in the Task have finished executing")
}

complete := areStepsComplete(pod) || pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed

if complete {
updateCompletedTaskRun(trs, pod)
} else {
updateIncompleteTaskRun(trs, pod)
}

trs.PodName = pod.Name
trs.Steps = []v1beta1.StepState{}
trs.Sidecars = []v1beta1.SidecarState{}

var stepStatuses []corev1.ContainerStatus
var sidecarStatuses []corev1.ContainerStatus
for _, s := range pod.Status.ContainerStatuses {
if IsContainerStep(s.Name) {
if s.State.Terminated != nil && len(s.State.Terminated.Message) != 0 {
message, time, err := removeStartInfoFromTerminationMessage(s)
stepStatuses = append(stepStatuses, s)
} else if isContainerSidecar(s.Name) {
sidecarStatuses = append(sidecarStatuses, s)
}
}

var merr *multierror.Error
if err := setTaskRunStatusBasedOnStepStatus(logger, stepStatuses, &tr); err != nil {
merr = multierror.Append(merr, err)
}

setTaskRunStatusBasedOnSidecarStatus(sidecarStatuses, trs)

trs.TaskRunResults = removeDuplicateResults(trs.TaskRunResults)

// Sort step states according to the order specified in the TaskRun spec's steps.
trs.Steps = sortTaskRunStepOrder(trs.Steps, taskSpec.Steps)

return *trs, merr.ErrorOrNil()
}

func setTaskRunStatusBasedOnStepStatus(logger *zap.SugaredLogger, stepStatuses []corev1.ContainerStatus, tr *v1beta1.TaskRun) *multierror.Error {
trs := &tr.Status
var merr *multierror.Error

for _, s := range stepStatuses {
if s.State.Terminated != nil && len(s.State.Terminated.Message) != 0 {
msg := s.State.Terminated.Message

results, err := termination.ParseMessage(logger, msg)
if err != nil {
logger.Errorf("termination message could not be parsed as JSON: %v", err)
merr = multierror.Append(merr, err)
} else {
time, err := extractStartedAtTimeFromResults(results)
if err != nil {
logger.Errorf("error setting the start time of step %q in taskrun %q: %w", s.Name, tr.Name, err)
logger.Errorf("error setting the start time of step %q in taskrun %q: %v", s.Name, tr.Name, err)
merr = multierror.Append(merr, err)
}
taskResults, pipelineResourceResults, filteredResults := filterResultsAndResources(results)
if tr.IsSuccessful() {
trs.TaskRunResults = append(trs.TaskRunResults, taskResults...)
trs.ResourcesResult = append(trs.ResourcesResult, pipelineResourceResults...)
}
msg, err = createMessageFromResults(filteredResults)
if err != nil {
logger.Errorf("%v", err)
err = multierror.Append(merr, err)
} else {
s.State.Terminated.Message = msg
}
if time != nil {
s.State.Terminated.StartedAt = *time
s.State.Terminated.Message = message
}
}
trs.Steps = append(trs.Steps, v1beta1.StepState{
ContainerState: *s.State.DeepCopy(),
Name: trimStepPrefix(s.Name),
ContainerName: s.Name,
ImageID: s.ImageID,
})
} else if isContainerSidecar(s.Name) {
trs.Sidecars = append(trs.Sidecars, v1beta1.SidecarState{
ContainerState: *s.State.DeepCopy(),
Name: TrimSidecarPrefix(s.Name),
ContainerName: s.Name,
ImageID: s.ImageID,
})
}
trs.Steps = append(trs.Steps, v1beta1.StepState{
ContainerState: *s.State.DeepCopy(),
Name: trimStepPrefix(s.Name),
ContainerName: s.Name,
ImageID: s.ImageID,
})
}

// Complete if we did not find a step that is not complete, or the pod is in a definitely complete phase
complete := areStepsComplete(pod) || pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed
return merr

if complete {
updateCompletedTaskRun(trs, pod)
} else {
updateIncompleteTaskRun(trs, pod)
}

func setTaskRunStatusBasedOnSidecarStatus(sidecarStatuses []corev1.ContainerStatus, trs *v1beta1.TaskRunStatus) {
for _, s := range sidecarStatuses {
trs.Sidecars = append(trs.Sidecars, v1beta1.SidecarState{
ContainerState: *s.State.DeepCopy(),
Name: TrimSidecarPrefix(s.Name),
ContainerName: s.Name,
ImageID: s.ImageID,
})
}
}

// Sort step states according to the order specified in the TaskRun spec's steps.
trs.Steps = sortTaskRunStepOrder(trs.Steps, taskSpec.Steps)
func createMessageFromResults(results []v1beta1.PipelineResourceResult) (string, error) {
if len(results) == 0 {
return "", nil
}
bytes, err := json.Marshal(results)
if err != nil {
return "", fmt.Errorf("error marshalling remaining results back into termination message: %w", err)
}
return string(bytes), nil
}

return *trs
func filterResultsAndResources(results []v1beta1.PipelineResourceResult) ([]v1beta1.TaskRunResult, []v1beta1.PipelineResourceResult, []v1beta1.PipelineResourceResult) {
var taskResults []v1beta1.TaskRunResult
var pipelineResourceResults []v1beta1.PipelineResourceResult
var filteredResults []v1beta1.PipelineResourceResult
for _, r := range results {
switch r.ResultType {
case v1beta1.TaskRunResultType:
taskRunResult := v1beta1.TaskRunResult{
Name: r.Key,
Value: r.Value,
}
taskResults = append(taskResults, taskRunResult)
filteredResults = append(filteredResults, r)
case v1beta1.InternalTektonResultType:
// Internal messages are ignored because they're not used as external result
continue
case v1beta1.PipelineResourceResultType:
fallthrough
default:
pipelineResourceResults = append(pipelineResourceResults, r)
filteredResults = append(filteredResults, r)
}
}

return taskResults, pipelineResourceResults, filteredResults
}

// removeStartInfoFromTerminationMessage searches for a result called "StartedAt" in the JSON-formatted
// termination message of a step and returns the values to use for sets State.Terminated if it's
// found. The "StartedAt" result is also removed from the list of results in the container status.
func removeStartInfoFromTerminationMessage(s corev1.ContainerStatus) (string, *metav1.Time, error) {
r, err := termination.ParseMessage(s.State.Terminated.Message)
if err != nil {
return "", nil, fmt.Errorf("termination message could not be parsed as JSON: %w", err)
func removeDuplicateResults(taskRunResult []v1beta1.TaskRunResult) []v1beta1.TaskRunResult {
if len(taskRunResult) == 0 {
return nil
}

uniq := make([]v1beta1.TaskRunResult, 0)
latest := make(map[string]v1beta1.TaskRunResult, 0)
for _, res := range taskRunResult {
if _, seen := latest[res.Name]; !seen {
uniq = append(uniq, res)
}
latest[res.Name] = res
}
for index, result := range r {
for i, res := range uniq {
uniq[i] = latest[res.Name]
}
return uniq
}

func extractStartedAtTimeFromResults(results []v1beta1.PipelineResourceResult) (*metav1.Time, error) {
for _, result := range results {
if result.Key == "StartedAt" {
t, err := time.Parse(timeFormat, result.Value)
if err != nil {
return "", nil, fmt.Errorf("could not parse time value %q in StartedAt field: %w", result.Value, err)
return nil, fmt.Errorf("could not parse time value %q in StartedAt field: %w", result.Value, err)
}
message := ""
startedAt := metav1.NewTime(t)
// remove the entry for the starting time
r = append(r[:index], r[index+1:]...)
if len(r) == 0 {
message = ""
} else if bytes, err := json.Marshal(r); err != nil {
return "", nil, fmt.Errorf("error marshalling remaining results back into termination message: %w", err)
} else {
message = string(bytes)
}
return message, &startedAt, nil
return &startedAt, nil
}
}
return "", nil, nil
return nil, nil
}

func updateCompletedTaskRun(trs *v1beta1.TaskRunStatus, pod *corev1.Pod) {
Expand Down
Loading

0 comments on commit a9e7b54

Please sign in to comment.