Skip to content

Commit

Permalink
Add support for more cloud events
Browse files Browse the repository at this point in the history
Change the event type string for taskruns from dev.tekton.event.task.*
to dev.tekton.event.taskrun.*. Since cloud events are only sent by
pipeline resources - which are alpha - we don't need to comply with
the beta deprecation policy for this.

Extend the cloudevents module to include more event types for TaskRuns
and event types for PipelineRuns, with unit test coverage.
This is achieved by introducing the RunsToCompletion and
RunsToCompletionStatus interface which are met by TaskRun, PipelineRun
and their respective status types.

At this stage there is no event producer that generates these new
events. This is preparing the stage for the next changes where
we will start to emit cloud events in addition to the k8s events
we emit today.

Partially addresses #2082
Partially addresses #2684
  • Loading branch information
afrittoli committed May 25, 2020
1 parent fc24674 commit 252c038
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 23 deletions.
20 changes: 20 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ func (pr *PipelineRun) GetTaskRunRef() corev1.ObjectReference {
}
}

// GetTypeMeta returns the task run type meta
func (pr *PipelineRun) GetTypeMeta() *metav1.TypeMeta {
return &pr.TypeMeta
}

// GetObjectMeta returns the task run type meta
func (pr *PipelineRun) GetObjectMeta() *metav1.ObjectMeta {
return &pr.ObjectMeta
}

// GetStatus returns the task run status as a RunsToCompletionStatus
func (pr *PipelineRun) GetStatus() RunsToCompletionStatus {
return &pr.Status
}

// GetOwnerReference gets the pipeline run as owner reference for any related objects
func (pr *PipelineRun) GetOwnerReference() metav1.OwnerReference {
return *metav1.NewControllerRef(pr, groupVersionKind)
Expand Down Expand Up @@ -100,6 +115,11 @@ func (pr *PipelineRun) GetRunKey() string {

// IsTimedOut returns true if a pipelinerun has exceeded its spec.Timeout based on its status.Timeout
func (pr *PipelineRun) IsTimedOut() bool {
return pr.HasTimedOut()
}

// HasTimedOut returns true if a pipelinerun has exceeded its spec.Timeout based on its status.Timeout
func (pr *PipelineRun) HasTimedOut() bool {
pipelineTimeout := pr.Spec.Timeout
startTime := pr.Status.StartTime

Expand Down
42 changes: 42 additions & 0 deletions pkg/apis/pipeline/v1beta1/run_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2020 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1beta1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"
)

// RunsToCompletionStatus is implemented by TaskRun.Status and PipelineRun.Status
type RunsToCompletionStatus interface {
GetCondition(t apis.ConditionType) *apis.Condition
InitializeConditions()
SetCondition(newCond *apis.Condition)
}

// RunsToCompletion is implemented by TaskRun and PipelineRun
type RunsToCompletion interface {
GetTypeMeta() *metav1.TypeMeta
GetObjectMeta() *metav1.ObjectMeta
GetOwnerReference() metav1.OwnerReference
GetStatus() RunsToCompletionStatus
IsDone() bool
HasStarted() bool
IsCancelled() bool
HasTimedOut() bool
GetRunKey() string
}
15 changes: 15 additions & 0 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,26 @@ type TaskRunResult struct {
Value string `json:"value"`
}

// GetTypeMeta returns the task run type meta
func (tr *TaskRun) GetTypeMeta() *metav1.TypeMeta {
return &tr.TypeMeta
}

// GetObjectMeta returns the task run type meta
func (tr *TaskRun) GetObjectMeta() *metav1.ObjectMeta {
return &tr.ObjectMeta
}

// GetOwnerReference gets the task run as owner reference for any related objects
func (tr *TaskRun) GetOwnerReference() metav1.OwnerReference {
return *metav1.NewControllerRef(tr, taskRunGroupVersionKind)
}

// GetStatus returns the task run status as a RunsToCompletionStatus
func (tr *TaskRun) GetStatus() RunsToCompletionStatus {
return &tr.Status
}

// GetCondition returns the Condition matching the given type.
func (trs *TaskRunStatus) GetCondition(t apis.ConditionType) *apis.Condition {
return taskRunCondSet.Manage(trs).GetCondition(t)
Expand Down
113 changes: 91 additions & 22 deletions pkg/reconciler/events/cloudevent/cloudevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,32 @@ import (
type TektonEventType string

const (
// TektonTaskRunStartedV1 is sent for TaskRuns with "ConditionSucceeded" "Unknown"
// the first time they are picked up by the reconciler
TektonTaskRunStartedV1 TektonEventType = "dev.tekton.event.taskrun.started.v1"
// TektonTaskRunRunningV1 is sent for TaskRuns with "ConditionSucceeded" "Unknown"
// once the TaskRun is validated and Pod created
TektonTaskRunRunningV1 TektonEventType = "dev.tekton.event.taskrun.running.v1"
// TektonTaskRunUnknownV1 is sent for TaskRuns with "ConditionSucceeded" "Unknown"
TektonTaskRunUnknownV1 TektonEventType = "dev.tekton.event.task.unknown.v1"
// It can be used as a confirmation that the TaskRun is still running.
TektonTaskRunUnknownV1 TektonEventType = "dev.tekton.event.taskrun.unknown.v1"
// TektonTaskRunSuccessfulV1 is sent for TaskRuns with "ConditionSucceeded" "True"
TektonTaskRunSuccessfulV1 TektonEventType = "dev.tekton.event.task.successful.v1"
TektonTaskRunSuccessfulV1 TektonEventType = "dev.tekton.event.taskrun.successful.v1"
// TektonTaskRunFailedV1 is sent for TaskRuns with "ConditionSucceeded" "False"
TektonTaskRunFailedV1 TektonEventType = "dev.tekton.event.task.failed.v1"
TektonTaskRunFailedV1 TektonEventType = "dev.tekton.event.taskrun.failed.v1"
// TektonPipelineRunStartedV1 is sent for PipelineRuns with "ConditionSucceeded" "Unknown"
// the first time they are picked up by the reconciler
TektonPipelineRunStartedV1 TektonEventType = "dev.tekton.event.pipelinerun.started.v1"
// TektonPipelineRunRunningV1 is sent for PipelineRuns with "ConditionSucceeded" "Unknown"
// once the PipelineRun is validated and Pod created
TektonPipelineRunRunningV1 TektonEventType = "dev.tekton.event.pipelinerun.running.v1"
// TektonPipelineRunUnknownV1 is sent for PipelineRuns with "ConditionSucceeded" "Unknown"
// It can be used as a confirmation that the PipelineRun is still running.
TektonPipelineRunUnknownV1 TektonEventType = "dev.tekton.event.pipelinerun.unknown.v1"
// TektonPipelineRunSuccessfulV1 is sent for PipelineRuns with "ConditionSucceeded" "True"
TektonPipelineRunSuccessfulV1 TektonEventType = "dev.tekton.event.pipelinerun.successful.v1"
// TektonPipelineRunFailedV1 is sent for PipelineRuns with "ConditionSucceeded" "False"
TektonPipelineRunFailedV1 TektonEventType = "dev.tekton.event.pipelinerun.failed.v1"
)

func (t TektonEventType) String() string {
Expand All @@ -51,17 +71,41 @@ func (t TektonEventType) String() string {
type CEClient cloudevents.Client

// TektonCloudEventData type is used to marshal and unmarshal the payload of
// a Tekton cloud event. It only includes a TaskRun for now. Using a type opens
// the possibility for the future to add more data to the payload
// a Tekton cloud event. It can include a PipelineRun or a PipelineRun
type TektonCloudEventData struct {
TaskRun *v1beta1.TaskRun `json:"taskRun"`
TaskRun *v1beta1.TaskRun `json:"taskRun,omitempty"`
PipelineRun *v1beta1.PipelineRun `json:"pipelineRun,omitempty"`
}

// NewTektonCloudEventData returns a new instance of NewTektonCloudEventData
func NewTektonCloudEventData(taskRun *v1beta1.TaskRun) TektonCloudEventData {
return TektonCloudEventData{
TaskRun: taskRun,
func NewTektonCloudEventData(runObject v1beta1.RunsToCompletion) TektonCloudEventData {
tektonCloudEventData := TektonCloudEventData{}
switch runObject.(type) {
case *v1beta1.TaskRun:
tektonCloudEventData.TaskRun = runObject.(*v1beta1.TaskRun)
case *v1beta1.PipelineRun:
tektonCloudEventData.PipelineRun = runObject.(*v1beta1.PipelineRun)
}
return tektonCloudEventData
}

// EventForRunsToCompletion creates a new event based for a RunsToCompletion,
// or return an error if not possible.
func EventForRunsToCompletion(runObject v1beta1.RunsToCompletion) (*cloudevents.Event, error) {
event := cloudevents.NewEvent()
event.SetID(uuid.New().String())
event.SetSubject(runObject.GetObjectMeta().Name)
event.SetSource(runObject.GetObjectMeta().SelfLink) // TODO: SelfLink is deprecated https://github.com/tektoncd/pipeline/issues/2676
eventType, err := getEventType(runObject)
if err != nil {
return nil, err
}
event.SetType(eventType.String())

if err := event.SetData(cloudevents.ApplicationJSON, NewTektonCloudEventData(runObject)); err != nil {
return nil, err
}
return &event, nil
}

// EventForTaskRun will create a new event based on a TaskRun,
Expand All @@ -71,27 +115,52 @@ func EventForTaskRun(taskRun *v1beta1.TaskRun) (*cloudevents.Event, error) {
if taskRun == nil {
return nil, errors.New("Cannot send an event for an empty TaskRun")
}
event := cloudevents.NewEvent()
event.SetID(uuid.New().String())
event.SetSubject(taskRun.ObjectMeta.Name)
event.SetSource(taskRun.ObjectMeta.SelfLink) // TODO: SelfLink is deprecated
return EventForRunsToCompletion(taskRun)
}

c := taskRun.Status.GetCondition(apis.ConditionSucceeded)
// EventForPipelineRun will create a new event based on a TaskRun,
// or return an error if not possible.
func EventForPipelineRun(pipelineRun *v1beta1.PipelineRun) (*cloudevents.Event, error) {
// Check if the TaskRun is defined
if pipelineRun == nil {
return nil, errors.New("Cannot send an event for an empty PipelineRun")
}
return EventForRunsToCompletion(pipelineRun)
}

func getEventType(runObject v1beta1.RunsToCompletion) (*TektonEventType, error) {
c := runObject.GetStatus().GetCondition(apis.ConditionSucceeded)
t := runObject.GetTypeMeta()
var eventType TektonEventType
switch {
case c.IsUnknown():
event.SetType(TektonTaskRunUnknownV1.String())
// TBD We should have different event types here, e.g. started, running
// That requires having either knowledge about the previous condition or
// TaskRun and PipelineRun using dedicated "Reasons" or "Conditions"
switch t.Kind {
case "TaskRun":
eventType = TektonTaskRunUnknownV1
case "PipelineRun":
eventType = TektonPipelineRunUnknownV1
}
case c.IsFalse():
event.SetType(TektonTaskRunFailedV1.String())
switch t.Kind {
case "TaskRun":
eventType = TektonTaskRunFailedV1
case "PipelineRun":
eventType = TektonPipelineRunFailedV1
}
case c.IsTrue():
event.SetType(TektonTaskRunSuccessfulV1.String())
switch t.Kind {
case "TaskRun":
eventType = TektonTaskRunSuccessfulV1
case "PipelineRun":
eventType = TektonPipelineRunSuccessfulV1
}
default:
return nil, fmt.Errorf("unknown condition for in TaskRun.Status %s", c.Status)
}

if err := event.SetData(cloudevents.ApplicationJSON, NewTektonCloudEventData(taskRun)); err != nil {
return nil, err
}
return &event, nil
return &eventType, nil
}

// GetCloudEventDeliveryCompareOptions returns compare options to sort
Expand Down
79 changes: 78 additions & 1 deletion pkg/reconciler/events/cloudevent/cloudevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ import (
)

const (
defaultEventSourceURI = "/taskrun/1234"
defaultEventSourceURI = "/runtocompletion/1234"
taskRunName = "faketaskrunname"
pipelineRunName = "fakepipelinerunname"
)

func getTaskRunByCondition(status corev1.ConditionStatus) *v1beta1.TaskRun {
return &v1beta1.TaskRun{
TypeMeta: metav1.TypeMeta{
Kind: "TaskRun",
APIVersion: "v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: taskRunName,
Namespace: "marshmallow",
Expand All @@ -53,6 +58,29 @@ func getTaskRunByCondition(status corev1.ConditionStatus) *v1beta1.TaskRun {
}
}

func getPipelineRunByCondition(status corev1.ConditionStatus) *v1beta1.PipelineRun {
return &v1beta1.PipelineRun{
TypeMeta: metav1.TypeMeta{
Kind: "PipelineRun",
APIVersion: "v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: pipelineRunName,
Namespace: "marshmallow",
SelfLink: defaultEventSourceURI,
},
Spec: v1beta1.PipelineRunSpec{},
Status: v1beta1.PipelineRunStatus{
Status: duckv1beta1.Status{
Conditions: []apis.Condition{{
Type: apis.ConditionSucceeded,
Status: status,
}},
},
},
}
}

func TestEventForTaskRun(t *testing.T) {
for _, c := range []struct {
desc string
Expand Down Expand Up @@ -101,3 +129,52 @@ func TestEventForTaskRun(t *testing.T) {
})
}
}

func TestEventForPipelineRun(t *testing.T) {
for _, c := range []struct {
desc string
pipelineRun *v1beta1.PipelineRun
wantEventType TektonEventType
}{{
desc: "send a cloud event with unknown status taskrun",
pipelineRun: getPipelineRunByCondition(corev1.ConditionUnknown),
wantEventType: TektonPipelineRunUnknownV1,
}, {
desc: "send a cloud event with successful status taskrun",
pipelineRun: getPipelineRunByCondition(corev1.ConditionTrue),
wantEventType: TektonPipelineRunSuccessfulV1,
}, {
desc: "send a cloud event with unknown status taskrun",
pipelineRun: getPipelineRunByCondition(corev1.ConditionFalse),
wantEventType: TektonPipelineRunFailedV1,
}} {
t.Run(c.desc, func(t *testing.T) {
names.TestingSeed()

got, err := EventForPipelineRun(c.pipelineRun)
if err != nil {
t.Fatalf("I did not expect an error but I got %s", err)
} else {
wantSubject := pipelineRunName
if d := cmp.Diff(wantSubject, got.Subject()); d != "" {
t.Errorf("Wrong Event ID %s", diff.PrintWantGot(d))
}
if d := cmp.Diff(string(c.wantEventType), got.Type()); d != "" {
t.Errorf("Wrong Event Type %s", diff.PrintWantGot(d))
}
wantData := NewTektonCloudEventData(c.pipelineRun)
gotData := TektonCloudEventData{}
if err := got.DataAs(&gotData); err != nil {
t.Errorf("Unexpected error from DataAsl; %s", err)
}
if d := cmp.Diff(wantData, gotData); d != "" {
t.Errorf("Wrong Event data %s", diff.PrintWantGot(d))
}

if err := got.Validate(); err != nil {
t.Errorf("Expected event to be valid; %s", err)
}
}
})
}
}

0 comments on commit 252c038

Please sign in to comment.