Skip to content

Commit

Permalink
Adds pipeline metrics
Browse files Browse the repository at this point in the history
Often, as a developer or administrator(ops) I want some insights
about pipeline behavior in terms of time taken to execute pipleinerun/taskrun,
its success or failure ratio, pod latencies etc.
At present tekton pipelines has very limited ways to surface such information
or it's hard to get those details looking at resources yamls.

This patch exposes above mentioned pipelines metrics on '/metrics'
endpoint using knative `pkg/metrics` package. User can collect such
metrics using prometheus, stackdriver or other supported metrics system.

To some extent its solves
 - #540
 - #164
  • Loading branch information
hrishin committed Oct 17, 2019
1 parent 5160e9f commit 34813dc
Show file tree
Hide file tree
Showing 15 changed files with 1,116 additions and 52 deletions.
4 changes: 4 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

const (
// ControllerLogKey is the name of the logger for the controller cmd
ControllerLogKey = "controller"
ControllerLogKey = "tekton"
)

var (
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -290,4 +291,19 @@ func (tr *TaskRun) GetServiceAccountName() string {
name = tr.Spec.DeprecatedServiceAccount
}
return name

}

// IsPartOfPipeline return true if TaskRun is a part of a Pipeline.
// It also return the name of Pipeline and PipelineRun
func (tr *TaskRun) IsPartOfPipeline() (bool, string, string) {
if tr == nil || len(tr.Labels) == 0 {
return false, "", ""
}

if pl, ok := tr.Labels[pipeline.GroupName+pipeline.PipelineLabelKey]; ok {
return true, pl, tr.Labels[pipeline.GroupName+pipeline.PipelineRunLabelKey]
}

return false, "", ""
}
44 changes: 42 additions & 2 deletions pkg/apis/pipeline/v1alpha1/taskrun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
tb "github.com/tektoncd/pipeline/test/builder"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"

tb "github.com/tektoncd/pipeline/test/builder"
)

func TestTaskRun_GetBuildPodRef(t *testing.T) {
Expand Down Expand Up @@ -179,3 +179,43 @@ func TestTaskRunGetServiceAccountName(t *testing.T) {
}
}
}

func TestTaskRunIsOfPipelinerun(t *testing.T) {
tests := []struct {
name string
tr *v1alpha1.TaskRun
expectedValue bool
expetectedPipeline string
expetectedPipelineRun string
}{{
name: "yes",
tr: tb.TaskRun("taskrunname", "testns",
tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineLabelKey, "pipeline"),
tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineRunLabelKey, "pipelinerun"),
),
expectedValue: true,
expetectedPipeline: "pipeline",
expetectedPipelineRun: "pipelinerun",
}, {
name: "no",
tr: tb.TaskRun("taskrunname", "testns"),
expectedValue: false,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
value, pipeline, pipelineRun := test.tr.IsPartOfPipeline()
if value != test.expectedValue {
t.Fatalf("Expecting %v got %v", test.expectedValue, value)
}

if pipeline != test.expetectedPipeline {
t.Fatalf("Mismatch in pipeline: got %s expected %s", pipeline, test.expetectedPipeline)
}

if pipelineRun != test.expetectedPipelineRun {
t.Fatalf("Mismatch in pipelinerun: got %s expected %s", pipelineRun, test.expetectedPipelineRun)
}
})
}
}
5 changes: 5 additions & 0 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch
resourceInformer := resourceinformer.Get(ctx)
conditionInformer := conditioninformer.Get(ctx)
timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger)
metrics, err := NewRecorder()
if err != nil {
logger.Errorf("Failed to create pipelinerun metrics recorder %v", err)
}

opt := reconciler.Options{
KubeClientSet: kubeclientset,
Expand All @@ -75,6 +79,7 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch
resourceLister: resourceInformer.Lister(),
conditionLister: conditionInformer.Lister(),
timeoutHandler: timeoutHandler,
metrics: metrics,
}
impl := controller.NewImpl(c, c.Logger, pipelineRunControllerName)

Expand Down
181 changes: 181 additions & 0 deletions pkg/reconciler/pipelinerun/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
Copyright 2019 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pipelinerun

import (
"context"
"errors"
"fmt"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"knative.dev/pkg/metrics"
)

var (
prDuration = stats.Float64(
"pipelinerun_duration_seconds",
"The pipelinerun execution time in seconds",
stats.UnitDimensionless)
prDistributions = view.Distribution(10, 30, 60, 300, 900, 1800, 3600, 5400, 10800, 21600, 43200, 86400)

prCount = stats.Float64("pipelinerun_count",
"number of pipelineruns",
stats.UnitDimensionless)

runningPRsCount = stats.Float64("running_pipelineruns_count",
"Number of pipelineruns executing currently",
stats.UnitDimensionless)
)

type Recorder struct {
initialized bool

pipeline tag.Key
pipelineRun tag.Key
namespace tag.Key
status tag.Key
}

// NewRecorder creates a new metrics recorder instance
// to log the PipelineRun related metrics
func NewRecorder() (*Recorder, error) {
r := &Recorder{
initialized: true,
}

pipeline, err := tag.NewKey("pipeline")
if err != nil {
return nil, err
}
r.pipeline = pipeline

pipelineRun, err := tag.NewKey("pipelinerun")
if err != nil {
return nil, err
}
r.pipelineRun = pipelineRun

namespace, err := tag.NewKey("namespace")
if err != nil {
return nil, err
}
r.namespace = namespace

status, err := tag.NewKey("status")
if err != nil {
return nil, err
}
r.status = status

err = view.Register(
&view.View{
Description: prDuration.Description(),
Measure: prDuration,
Aggregation: prDistributions,
TagKeys: []tag.Key{r.pipeline, r.pipelineRun, r.namespace, r.status},
},
&view.View{
Description: prCount.Description(),
Measure: prCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{r.status},
},
&view.View{
Description: runningPRsCount.Description(),
Measure: runningPRsCount,
Aggregation: view.LastValue(),
},
)

if err != nil {
r.initialized = false
return r, err
}

return r, nil
}

// DurationAndCount logs the duration of PipelineRun execution and
// count for number of PipelineRuns succeed or failed
// returns an error if its failed to log the metrics
func (r *Recorder) DurationAndCount(pr *v1alpha1.PipelineRun) error {
if !r.initialized {
return fmt.Errorf("ignoring the metrics recording for %s , failed to initialize the metrics recorder", pr.Name)
}

duration := time.Since(pr.Status.StartTime.Time)
if pr.Status.CompletionTime != nil {
duration = pr.Status.CompletionTime.Sub(pr.Status.StartTime.Time)
}

status := "success"
if pr.Status.Conditions[0].Status == corev1.ConditionFalse {
status = "failed"
}

ctx, err := tag.New(
context.Background(),
tag.Insert(r.pipeline, pr.Spec.PipelineRef.Name),
tag.Insert(r.pipelineRun, pr.Name),
tag.Insert(r.namespace, pr.Namespace),
tag.Insert(r.status, status),
)

if err != nil {
return err
}

metrics.Record(ctx, prDuration.M(float64(duration/time.Second)))
metrics.Record(ctx, prCount.M(1))

return nil
}

// RunningPipelineRuns logs the number of PipelineRuns running right now
// returns an error if its failed to log the metrics
func (r *Recorder) RunningPipelineRuns(lister listers.PipelineRunLister) error {
if !r.initialized {
return errors.New("ignoring the metrics recording, failed to initialize the metrics recorder")
}

prs, err := lister.List(labels.Everything())
if err != nil {
return fmt.Errorf("failed to list pipelineruns while generating metrics : %v", err)
}

var runningPRs int
for _, pr := range prs {
if !pr.IsDone() {
runningPRs++
}
}

ctx, err := tag.New(context.Background())
if err != nil {
return err
}
metrics.Record(ctx, runningPRsCount.M(float64(runningPRs)))

return nil
}
Loading

0 comments on commit 34813dc

Please sign in to comment.