diff --git a/Gopkg.lock b/Gopkg.lock index b264b39da12..edfea824257 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1346,6 +1346,9 @@ "github.com/mitchellh/go-homedir", "github.com/mohae/deepcopy", "github.com/tektoncd/plumbing/scripts", + "go.opencensus.io/stats", + "go.opencensus.io/stats/view", + "go.opencensus.io/tag", "go.opencensus.io/trace", "go.uber.org/zap", "go.uber.org/zap/zaptest", @@ -1408,6 +1411,7 @@ "knative.dev/pkg/logging", "knative.dev/pkg/logging/logkey", "knative.dev/pkg/logging/testing", + "knative.dev/pkg/metrics", "knative.dev/pkg/reconciler/testing", "knative.dev/pkg/signals", "knative.dev/pkg/test", diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 034ffd0408e..35dbff6e2a2 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -28,7 +28,7 @@ import ( const ( // ControllerLogKey is the name of the logger for the controller cmd - ControllerLogKey = "controller" + ControllerLogKey = "tekton" ) var ( diff --git a/pkg/apis/pipeline/v1alpha1/taskrun_types.go b/pkg/apis/pipeline/v1alpha1/taskrun_types.go index 4b7ea38dda3..8a3fe0c8145 100644 --- a/pkg/apis/pipeline/v1alpha1/taskrun_types.go +++ b/pkg/apis/pipeline/v1alpha1/taskrun_types.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + "github.com/tektoncd/pipeline/pkg/apis/pipeline" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" @@ -290,4 +291,19 @@ func (tr *TaskRun) GetServiceAccountName() string { name = tr.Spec.DeprecatedServiceAccount } return name + +} + +// IsPartOfPipeline return true if TaskRun is a part of a Pipeline. +// It also return the name of Pipeline and PipelineRun +func (tr *TaskRun) IsPartOfPipeline() (bool, string, string) { + if tr == nil || len(tr.Labels) == 0 { + return false, "", "" + } + + if pl, ok := tr.Labels[pipeline.GroupName+pipeline.PipelineLabelKey]; ok { + return true, pl, tr.Labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] + } + + return false, "", "" } diff --git a/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go b/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go index 0ff772a7800..9e1b8a95a24 100644 --- a/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go +++ b/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go @@ -22,12 +22,12 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + tb "github.com/tektoncd/pipeline/test/builder" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" - - tb "github.com/tektoncd/pipeline/test/builder" ) func TestTaskRun_GetBuildPodRef(t *testing.T) { @@ -179,3 +179,43 @@ func TestTaskRunGetServiceAccountName(t *testing.T) { } } } + +func TestTaskRunIsOfPipelinerun(t *testing.T) { + tests := []struct { + name string + tr *v1alpha1.TaskRun + expectedValue bool + expetectedPipeline string + expetectedPipelineRun string + }{{ + name: "yes", + tr: tb.TaskRun("taskrunname", "testns", + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineLabelKey, "pipeline"), + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineRunLabelKey, "pipelinerun"), + ), + expectedValue: true, + expetectedPipeline: "pipeline", + expetectedPipelineRun: "pipelinerun", + }, { + name: "no", + tr: tb.TaskRun("taskrunname", "testns"), + expectedValue: false, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + value, pipeline, pipelineRun := test.tr.IsPartOfPipeline() + if value != test.expectedValue { + t.Fatalf("Expecting %v got %v", test.expectedValue, value) + } + + if pipeline != test.expetectedPipeline { + t.Fatalf("Mismatch in pipeline: got %s expected %s", pipeline, test.expetectedPipeline) + } + + if pipelineRun != test.expetectedPipelineRun { + t.Fatalf("Mismatch in pipelinerun: got %s expected %s", pipelineRun, test.expetectedPipelineRun) + } + }) + } +} diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index c4211d5c163..a39f556deb0 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -56,6 +56,10 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch resourceInformer := resourceinformer.Get(ctx) conditionInformer := conditioninformer.Get(ctx) timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger) + metrics, err := NewRecorder() + if err != nil { + logger.Errorf("Failed to create pipelinerun metrics recorder %v", err) + } opt := reconciler.Options{ KubeClientSet: kubeclientset, @@ -75,6 +79,7 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch resourceLister: resourceInformer.Lister(), conditionLister: conditionInformer.Lister(), timeoutHandler: timeoutHandler, + metrics: metrics, } impl := controller.NewImpl(c, c.Logger, pipelineRunControllerName) diff --git a/pkg/reconciler/pipelinerun/metrics.go b/pkg/reconciler/pipelinerun/metrics.go new file mode 100644 index 00000000000..6c9ea52c233 --- /dev/null +++ b/pkg/reconciler/pipelinerun/metrics.go @@ -0,0 +1,181 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pipelinerun + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "knative.dev/pkg/metrics" +) + +var ( + prDuration = stats.Float64( + "pipelinerun_duration_seconds", + "The pipelinerun execution time in seconds", + stats.UnitDimensionless) + prDistributions = view.Distribution(10, 30, 60, 300, 900, 1800, 3600, 5400, 10800, 21600, 43200, 86400) + + prCount = stats.Float64("pipelinerun_count", + "number of pipelineruns", + stats.UnitDimensionless) + + runningPRsCount = stats.Float64("running_pipelineruns_count", + "Number of pipelineruns executing currently", + stats.UnitDimensionless) +) + +type Recorder struct { + initialized bool + + pipeline tag.Key + pipelineRun tag.Key + namespace tag.Key + status tag.Key +} + +// NewRecorder creates a new metrics recorder instance +// to log the PipelineRun related metrics +func NewRecorder() (*Recorder, error) { + r := &Recorder{ + initialized: true, + } + + pipeline, err := tag.NewKey("pipeline") + if err != nil { + return nil, err + } + r.pipeline = pipeline + + pipelineRun, err := tag.NewKey("pipelinerun") + if err != nil { + return nil, err + } + r.pipelineRun = pipelineRun + + namespace, err := tag.NewKey("namespace") + if err != nil { + return nil, err + } + r.namespace = namespace + + status, err := tag.NewKey("status") + if err != nil { + return nil, err + } + r.status = status + + err = view.Register( + &view.View{ + Description: prDuration.Description(), + Measure: prDuration, + Aggregation: prDistributions, + TagKeys: []tag.Key{r.pipeline, r.pipelineRun, r.namespace, r.status}, + }, + &view.View{ + Description: prCount.Description(), + Measure: prCount, + Aggregation: view.Count(), + TagKeys: []tag.Key{r.status}, + }, + &view.View{ + Description: runningPRsCount.Description(), + Measure: runningPRsCount, + Aggregation: view.LastValue(), + }, + ) + + if err != nil { + r.initialized = false + return r, err + } + + return r, nil +} + +// DurationAndCount logs the duration of PipelineRun execution and +// count for number of PipelineRuns succeed or failed +// returns an error if its failed to log the metrics +func (r *Recorder) DurationAndCount(pr *v1alpha1.PipelineRun) error { + if !r.initialized { + return fmt.Errorf("ignoring the metrics recording for %s , failed to initialize the metrics recorder", pr.Name) + } + + duration := time.Since(pr.Status.StartTime.Time) + if pr.Status.CompletionTime != nil { + duration = pr.Status.CompletionTime.Sub(pr.Status.StartTime.Time) + } + + status := "success" + if pr.Status.Conditions[0].Status == corev1.ConditionFalse { + status = "failed" + } + + ctx, err := tag.New( + context.Background(), + tag.Insert(r.pipeline, pr.Spec.PipelineRef.Name), + tag.Insert(r.pipelineRun, pr.Name), + tag.Insert(r.namespace, pr.Namespace), + tag.Insert(r.status, status), + ) + + if err != nil { + return err + } + + metrics.Record(ctx, prDuration.M(float64(duration/time.Second))) + metrics.Record(ctx, prCount.M(1)) + + return nil +} + +// RunningPipelineRuns logs the number of PipelineRuns running right now +// returns an error if its failed to log the metrics +func (r *Recorder) RunningPipelineRuns(lister listers.PipelineRunLister) error { + if !r.initialized { + return errors.New("ignoring the metrics recording, failed to initialize the metrics recorder") + } + + prs, err := lister.List(labels.Everything()) + if err != nil { + return fmt.Errorf("failed to list pipelineruns while generating metrics : %v", err) + } + + var runningPRs int + for _, pr := range prs { + if !pr.IsDone() { + runningPRs++ + } + } + + ctx, err := tag.New(context.Background()) + if err != nil { + return err + } + metrics.Record(ctx, runningPRsCount.M(float64(runningPRs))) + + return nil +} diff --git a/pkg/reconciler/pipelinerun/metrics_test.go b/pkg/reconciler/pipelinerun/metrics_test.go new file mode 100644 index 00000000000..73338c061ef --- /dev/null +++ b/pkg/reconciler/pipelinerun/metrics_test.go @@ -0,0 +1,159 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pipelinerun + +import ( + "testing" + "time" + + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + alpha1 "github.com/tektoncd/pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1" + fakepipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/pipelinerun/fake" + tb "github.com/tektoncd/pipeline/test/builder" + corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/apis" + "knative.dev/pkg/metrics/metricstest" + rtesting "knative.dev/pkg/reconciler/testing" +) + +func TestUninitializedMetrics(t *testing.T) { + metrics := Recorder{} + + durationCountError := metrics.DurationAndCount(&v1alpha1.PipelineRun{}) + prCountError := metrics.RunningPipelineRuns(nil) + + assertErrNotNil(durationCountError, "DurationAndCount recording expected to return error but got nil", t) + assertErrNotNil(prCountError, "Current PR count recording expected to return error but got nil", t) +} + +func TestRecordPipelineRunDurationCount(t *testing.T) { + startTime := time.Now() + + testData := []struct { + name string + taskRun *v1alpha1.PipelineRun + expectedTags map[string]string + expectedDuration float64 + expectedCount int64 + }{{ + name: "for_succeeded_pipeline", + taskRun: tb.PipelineRun("pipelinerun-1", "ns", + tb.PipelineRunSpec("pipeline-1"), + tb.PipelineRunStatus( + tb.PipelineRunStartTime(startTime), + tb.PipelineRunCompletionTime(startTime.Add(1*time.Minute)), + tb.PipelineRunStatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + )), + expectedTags: map[string]string{ + "pipeline": "pipeline-1", + "pipelinerun": "pipelinerun-1", + "namespace": "ns", + "status": "success", + }, + expectedDuration: 60, + expectedCount: 1, + }, { + name: "for_failed_pipeline", + taskRun: tb.PipelineRun("pipelinerun-1", "ns", + tb.PipelineRunSpec("pipeline-1"), + tb.PipelineRunStatus( + tb.PipelineRunStartTime(startTime), + tb.PipelineRunCompletionTime(startTime.Add(1*time.Minute)), + tb.PipelineRunStatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }), + )), + expectedTags: map[string]string{ + "pipeline": "pipeline-1", + "pipelinerun": "pipelinerun-1", + "namespace": "ns", + "status": "failed", + }, + expectedDuration: 60, + expectedCount: 1, + }} + + for _, test := range testData { + t.Run(test.name, func(t *testing.T) { + defer unregisterMetrics() + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.DurationAndCount(test.taskRun) + assertErrIsNil(err, "DurationAndCount recording recording got an error", t) + metricstest.CheckDistributionData(t, "pipelinerun_duration_seconds", test.expectedTags, 1, test.expectedDuration, test.expectedDuration) + metricstest.CheckCountData(t, "pipelinerun_count", test.expectedTags, test.expectedCount) + }) + } +} + +func TestRecordRunningPipelineRunsCount(t *testing.T) { + defer unregisterMetrics() + + ctx, _ := rtesting.SetupFakeContext(t) + informer := fakepipelineruninformer.Get(ctx) + addPipelineRun(informer, "pipelinerun-1", "pipeline-1", "ns", corev1.ConditionTrue, t) + addPipelineRun(informer, "pipelinerun-2", "pipeline-2", "ns", corev1.ConditionFalse, t) + addPipelineRun(informer, "pipelinerun-3", "pipeline-3", "ns", corev1.ConditionUnknown, t) + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.RunningPipelineRuns(informer.Lister()) + assertErrIsNil(err, "RunningPrsCount recording expected to return nil but got error", t) + metricstest.CheckLastValueData(t, "running_pipelineruns_count", map[string]string{}, 1) +} + +func addPipelineRun(informer alpha1.PipelineRunInformer, run, pipeline, ns string, status corev1.ConditionStatus, t *testing.T) { + t.Helper() + + err := informer.Informer().GetIndexer().Add(tb.PipelineRun(run, ns, + tb.PipelineRunSpec(pipeline), + tb.PipelineRunStatus( + tb.PipelineRunStatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: status, + }), + ))) + + if err != nil { + t.Errorf("Failed to add the pipelinerun") + } +} + +func assertErrNotNil(err error, message string, t *testing.T) { + t.Helper() + if err == nil { + t.Errorf(message) + } +} + +func assertErrIsNil(err error, message string, t *testing.T) { + t.Helper() + if err != nil { + t.Errorf(message) + } +} + +func unregisterMetrics() { + metricstest.Unregister("pipelinerun_duration_seconds", "pipelinerun_count", "running_pipelineruns_count") +} diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 59113dcb43f..509d0d84bbc 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -100,6 +100,7 @@ type Reconciler struct { tracker tracker.Interface configStore configStore timeoutHandler *reconciler.TimeoutSet + metrics *Recorder } // Check that our Reconciler implements controller.Reconciler @@ -109,7 +110,6 @@ var _ controller.Reconciler = (*Reconciler)(nil) // converge the two. It then updates the Status block of the Pipeline Run // resource with the current status of the resource. func (c *Reconciler) Reconcile(ctx context.Context, key string) error { - c.Logger.Infof("Reconciling %v", time.Now()) // Convert the namespace/name string into a distinct namespace and name @@ -156,6 +156,12 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { c.Logger.Errorf("Failed to update TaskRun status for PipelineRun %s: %v", pr.Name, err) return err } + go func(metrics *Recorder) { + err := metrics.DurationAndCount(pr) + if err != nil { + c.Logger.Warnf("Failed to log the metrics : %v", err) + } + }(c.metrics) } else { if err := c.tracker.Track(pr.GetTaskRunRef(), pr); err != nil { c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err) @@ -171,16 +177,16 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } } - if equality.Semantic.DeepEqual(original.Status, pr.Status) { - // If we didn't change anything then don't call updateStatus. - // This is important because the copy we loaded from the informer's - // cache may be stale and we don't want to overwrite a prior update - // to status with this stale state. - } else if _, err := c.updateStatus(pr); err != nil { - c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err)) - c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update") - return err + var updated bool + if !equality.Semantic.DeepEqual(original.Status, pr.Status) { + if _, err := c.updateStatus(pr); err != nil { + c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err)) + c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update") + return err + } + updated = true } + // Since we are using the status subresource, it is not possible to update // the status and labels/annotations simultaneously. if !reflect.DeepEqual(original.ObjectMeta.Labels, pr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, pr.ObjectMeta.Annotations) { @@ -189,6 +195,16 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update labels/annotations") return err } + updated = true + } + + if updated { + go func(metrics *Recorder) { + err := metrics.RunningPipelineRuns(c.pipelineRunLister) + if err != nil { + c.Logger.Warnf("Failed to log the metrics : %v", err) + } + }(c.metrics) } return err diff --git a/pkg/reconciler/stats.go b/pkg/reconciler/stats.go deleted file mode 100644 index f158989aada..00000000000 --- a/pkg/reconciler/stats.go +++ /dev/null @@ -1,31 +0,0 @@ -/* -Copyright 2019 The Tekton Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package reconciler - -import ( - "go.uber.org/zap" - "knative.dev/pkg/controller" -) - -// MustNewStatsReporter creates a new instance of StatsReporter. Panics if creation fails. -func MustNewStatsReporter(reconciler string, logger *zap.SugaredLogger) controller.StatsReporter { - stats, err := controller.NewStatsReporter(reconciler) - if err != nil { - logger.Fatal("Failed to initialize the stats reporter.", zap.Error(err)) - } - return stats -} diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index e8f37e8e30a..0bd9308709d 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -54,6 +54,10 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch podInformer := podinformer.Get(ctx) resourceInformer := resourceinformer.Get(ctx) timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger) + metrics, err := NewRecorder() + if err != nil { + logger.Errorf("Failed to create taskrun metrics recorder %v", err) + } opt := reconciler.Options{ KubeClientSet: kubeclientset, @@ -71,6 +75,7 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch resourceLister: resourceInformer.Lister(), timeoutHandler: timeoutHandler, cloudEventClient: cloudeventclient.Get(ctx), + metrics: metrics, } impl := controller.NewImpl(c, c.Logger, taskRunControllerName) diff --git a/pkg/reconciler/taskrun/metrics.go b/pkg/reconciler/taskrun/metrics.go new file mode 100644 index 00000000000..0ec5f3df626 --- /dev/null +++ b/pkg/reconciler/taskrun/metrics.go @@ -0,0 +1,295 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskrun + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "knative.dev/pkg/metrics" +) + +var ( + trDuration = stats.Float64( + "taskrun_duration_seconds", + "The taskrun's execution time in seconds", + stats.UnitDimensionless) + trDistribution = view.Distribution(10, 30, 60, 300, 900, 1800, 3600, 5400, 10800, 21600, 43200, 86400) + + prTRDuration = stats.Float64( + "pipelinerun_taskrun_duration_seconds", + "The pipelinerun's taskrun execution time in seconds", + stats.UnitDimensionless) + prTRLatencyDistribution = view.Distribution(10, 30, 60, 300, 900, 1800, 3600, 5400, 10800, 21600, 43200, 86400) + + trCount = stats.Float64("taskrun_count", + "number of taskruns", + stats.UnitDimensionless) + + runningTRsCount = stats.Float64("running_taskruns_count", + "Number of taskruns executing currently", + stats.UnitDimensionless) + + podLatency = stats.Float64("taskruns_pod_latency", + "scheduling latency for the taskruns pods", + stats.UnitMilliseconds) +) + +type Recorder struct { + initialized bool + + task tag.Key + taskRun tag.Key + namespace tag.Key + status tag.Key + pipeline tag.Key + pipelineRun tag.Key + pod tag.Key +} + +// NewRecorder creates a new metrics recorder instance +// to log the TaskRun related metrics +func NewRecorder() (*Recorder, error) { + r := &Recorder{ + initialized: true, + } + + task, err := tag.NewKey("task") + if err != nil { + return nil, err + } + r.task = task + + taskRun, err := tag.NewKey("taskrun") + if err != nil { + return nil, err + } + r.taskRun = taskRun + + namespace, err := tag.NewKey("namespace") + if err != nil { + return nil, err + } + r.namespace = namespace + + status, err := tag.NewKey("status") + if err != nil { + return nil, err + } + r.status = status + + pipeline, err := tag.NewKey("pipeline") + if err != nil { + return nil, err + } + r.pipeline = pipeline + + pipelineRun, err := tag.NewKey("pipelinerun") + if err != nil { + return nil, err + } + r.pipelineRun = pipelineRun + + pod, err := tag.NewKey("pod") + if err != nil { + return nil, err + } + r.pod = pod + + err = view.Register( + &view.View{ + Description: trDuration.Description(), + Measure: trDuration, + Aggregation: trDistribution, + TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status}, + }, + &view.View{ + Description: prTRDuration.Description(), + Measure: prTRDuration, + Aggregation: prTRLatencyDistribution, + TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun}, + }, + &view.View{ + Description: trCount.Description(), + Measure: trCount, + Aggregation: view.Count(), + TagKeys: []tag.Key{r.status}, + }, + &view.View{ + Description: runningTRsCount.Description(), + Measure: runningTRsCount, + Aggregation: view.LastValue(), + }, + &view.View{ + Description: podLatency.Description(), + Measure: podLatency, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.pod}, + }, + ) + + if err != nil { + r.initialized = false + return r, err + } + + return r, nil +} + +// DurationAndCount logs the duration of TaskRun execution and +// count for number of TaskRuns succeed or failed +// returns an error if its failed to log the metrics +func (r *Recorder) DurationAndCount(tr *v1alpha1.TaskRun) error { + if !r.initialized { + return fmt.Errorf("ignoring the metrics recording for %s , failed to initialize the metrics recorder", tr.Name) + } + + duration := time.Since(tr.Status.StartTime.Time) + if tr.Status.CompletionTime != nil { + duration = tr.Status.CompletionTime.Sub(tr.Status.StartTime.Time) + } + + taskName := "anonymous" + if tr.Spec.TaskRef != nil { + taskName = tr.Spec.TaskRef.Name + } + + status := "success" + if tr.Status.Conditions[0].Status == corev1.ConditionFalse { + status = "failed" + } + + if ok, pipeline, pipelinerun := tr.IsPartOfPipeline(); ok { + ctx, err := tag.New( + context.Background(), + tag.Insert(r.task, taskName), + tag.Insert(r.taskRun, tr.Name), + tag.Insert(r.namespace, tr.Namespace), + tag.Insert(r.status, status), + tag.Insert(r.pipeline, pipeline), + tag.Insert(r.pipelineRun, pipelinerun), + ) + + if err != nil { + return err + } + + stats.Record(ctx, prTRDuration.M(float64(duration/time.Second))) + metrics.Record(ctx, trCount.M(1)) + return nil + } + + ctx, err := tag.New( + context.Background(), + tag.Insert(r.task, taskName), + tag.Insert(r.taskRun, tr.Name), + tag.Insert(r.namespace, tr.Namespace), + tag.Insert(r.status, status), + ) + if err != nil { + return err + } + + metrics.Record(ctx, trDuration.M(float64(duration/time.Second))) + metrics.Record(ctx, trCount.M(1)) + + return nil +} + +// RunningTaskRuns logs the number of TaskRuns running right now +// returns an error if its failed to log the metrics +func (r *Recorder) RunningTaskRuns(lister listers.TaskRunLister) error { + if !r.initialized { + return errors.New("ignoring the metrics recording, failed to initialize the metrics recorder") + } + + trs, err := lister.List(labels.Everything()) + if err != nil { + return err + } + + var runningTrs int + for _, pr := range trs { + if !pr.IsDone() { + runningTrs++ + } + } + + ctx, err := tag.New( + context.Background(), + ) + if err != nil { + return err + } + metrics.Record(ctx, runningTRsCount.M(float64(runningTrs))) + + return nil +} + +// RecordPodLatency logs the duration required to schedule the pod for TaskRun +// returns an error if its failed to log the metrics +func (r *Recorder) RecordPodLatency(pod *corev1.Pod, tr *v1alpha1.TaskRun) error { + if !r.initialized { + return errors.New("ignoring the metrics recording for pod , failed to initialize the metrics recorder") + } + + scheduledTime := getScheduledTime(pod) + if scheduledTime.IsZero() { + return errors.New("pod has never got scheduled") + } + + latency := scheduledTime.Sub(pod.CreationTimestamp.Time) + taskName := "anonymous" + if tr.Spec.TaskRef != nil { + taskName = tr.Spec.TaskRef.Name + } + + ctx, err := tag.New( + context.Background(), + tag.Insert(r.task, taskName), + tag.Insert(r.taskRun, tr.Name), + tag.Insert(r.namespace, tr.Namespace), + tag.Insert(r.pod, pod.Name), + ) + if err != nil { + return err + } + + metrics.Record(ctx, podLatency.M(float64(latency))) + + return nil +} + +func getScheduledTime(pod *corev1.Pod) metav1.Time { + for _, c := range pod.Status.Conditions { + if c.Type == corev1.PodScheduled { + return c.LastTransitionTime + } + } + + return metav1.Time{} +} diff --git a/pkg/reconciler/taskrun/metrics_test.go b/pkg/reconciler/taskrun/metrics_test.go new file mode 100644 index 00000000000..de4e9e53777 --- /dev/null +++ b/pkg/reconciler/taskrun/metrics_test.go @@ -0,0 +1,308 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskrun + +import ( + "testing" + "time" + + "github.com/tektoncd/pipeline/pkg/apis/pipeline" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + alpha1 "github.com/tektoncd/pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1" + faketaskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/taskrun/fake" + tb "github.com/tektoncd/pipeline/test/builder" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/apis" + "knative.dev/pkg/metrics/metricstest" + rtesting "knative.dev/pkg/reconciler/testing" +) + +func TestUninitializedMetrics(t *testing.T) { + metrics := Recorder{} + + durationCountError := metrics.DurationAndCount(&v1alpha1.TaskRun{}) + taskrunsCountError := metrics.RunningTaskRuns(nil) + podLatencyError := metrics.RecordPodLatency(nil, nil) + + assertErrNotNil(durationCountError, "DurationCount recording expected to return error but got nil", t) + assertErrNotNil(taskrunsCountError, "Current TaskrunsCount recording expected to return error but got nil", t) + assertErrNotNil(podLatencyError, "Pod Latency recording expected to return error but got nil", t) +} + +func TestRecordTaskrunDurationCount(t *testing.T) { + startTime := time.Now() + + testData := []struct { + name string + taskRun *v1alpha1.TaskRun + expectedTags map[string]string + expectedDuration float64 + expectedCount int64 + }{{ + name: "for_succeeded_task", + taskRun: tb.TaskRun("taskrun-1", "ns", + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + tb.TaskRunStatus( + tb.TaskRunStartTime(startTime), + tb.TaskRunCompletionTime(startTime.Add(1*time.Minute)), + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + )), + expectedTags: map[string]string{ + "task": "task-1", + "taskrun": "taskrun-1", + "namespace": "ns", + "status": "success", + }, + expectedDuration: 60, + expectedCount: 1, + }, { + name: "for_failed_task", + taskRun: tb.TaskRun("taskrun-1", "ns", + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + tb.TaskRunStatus( + tb.TaskRunStartTime(startTime), + tb.TaskRunCompletionTime(startTime.Add(1*time.Minute)), + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }), + )), + expectedTags: map[string]string{ + "task": "task-1", + "taskrun": "taskrun-1", + "namespace": "ns", + "status": "failed", + }, + expectedDuration: 60, + expectedCount: 1, + }} + + for _, test := range testData { + t.Run(test.name, func(t *testing.T) { + defer unregisterMetrics() + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.DurationAndCount(test.taskRun) + assertErrIsNil(err, "DurationAndCount recording got an error", t) + metricstest.CheckDistributionData(t, "taskrun_duration_seconds", test.expectedTags, 1, test.expectedDuration, test.expectedDuration) + metricstest.CheckCountData(t, "taskrun_count", test.expectedTags, test.expectedCount) + }) + } +} + +func TestRecordPipelinerunTaskrunDurationCount(t *testing.T) { + startTime := time.Now() + + testData := []struct { + name string + taskRun *v1alpha1.TaskRun + expectedTags map[string]string + expectedDuration float64 + expectedCount int64 + }{{ + name: "for_succeeded_task", + taskRun: tb.TaskRun("taskrun-1", "ns", + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineLabelKey, "pipeline-1"), + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineRunLabelKey, "pipelinerun-1"), + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + tb.TaskRunStatus( + tb.TaskRunStartTime(startTime), + tb.TaskRunCompletionTime(startTime.Add(1*time.Minute)), + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + )), + expectedTags: map[string]string{ + "pipeline": "pipeline-1", + "pipelinerun": "pipelinerun-1", + "task": "task-1", + "taskrun": "taskrun-1", + "namespace": "ns", + "status": "success", + }, + expectedDuration: 60, + expectedCount: 1, + }, { + name: "for_failed_task", + taskRun: tb.TaskRun("taskrun-1", "ns", + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineLabelKey, "pipeline-1"), + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineRunLabelKey, "pipelinerun-1"), + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + tb.TaskRunStatus( + tb.TaskRunStartTime(startTime), + tb.TaskRunCompletionTime(startTime.Add(1*time.Minute)), + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }), + )), + expectedTags: map[string]string{ + "pipeline": "pipeline-1", + "pipelinerun": "pipelinerun-1", + "task": "task-1", + "taskrun": "taskrun-1", + "namespace": "ns", + "status": "failed", + }, + expectedDuration: 60, + expectedCount: 1, + }} + + for _, test := range testData { + t.Run(test.name, func(t *testing.T) { + defer unregisterMetrics() + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.DurationAndCount(test.taskRun) + assertErrIsNil(err, "DurationAndCount recording got an error", t) + metricstest.CheckDistributionData(t, "pipelinerun_taskrun_duration_seconds", test.expectedTags, 1, test.expectedDuration, test.expectedDuration) + metricstest.CheckCountData(t, "taskrun_count", test.expectedTags, test.expectedCount) + }) + } +} + +func TestRecordRunningTaskrunsCount(t *testing.T) { + defer unregisterMetrics() + + ctx, _ := rtesting.SetupFakeContext(t) + informer := faketaskruninformer.Get(ctx) + addTaskruns(informer, "taskrun-1", "task-1", "ns", corev1.ConditionTrue, t) + addTaskruns(informer, "taskrun-2", "task-3", "ns", corev1.ConditionUnknown, t) + addTaskruns(informer, "taskrun-3", "task-3", "ns", corev1.ConditionFalse, t) + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.RunningTaskRuns(informer.Lister()) + assertErrIsNil(err, "RunningTaskRuns recording expected to return nil but got error", t) + metricstest.CheckLastValueData(t, "running_taskruns_count", map[string]string{}, 1) +} + +func TestRecordPodLatency(t *testing.T) { + creationTime := time.Now() + testData := []struct { + name string + pod *corev1.Pod + taskRun *v1alpha1.TaskRun + expectedTags map[string]string + expectedValue float64 + expectingError bool + }{{ + name: "for_scheduled_pod", + pod: tb.Pod("test-taskrun-pod-123456", "foo", + tb.PodCreationTimestamp(creationTime), + tb.PodStatus( + tb.PodStatusConditions(corev1.PodCondition{ + Type: corev1.PodScheduled, + LastTransitionTime: metav1.Time{Time: creationTime.Add(4 * time.Second)}, + }), + )), + taskRun: tb.TaskRun("test-taskrun", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + ), + expectedTags: map[string]string{ + "pod": "test-taskrun-pod-123456", + "task": "task-1", + "taskrun": "test-taskrun", + "namespace": "foo", + }, + expectedValue: 4e+09, + }, { + name: "for_non_scheduled_pod", + pod: tb.Pod("test-taskrun-pod-123456", "foo", + tb.PodCreationTimestamp(creationTime), + ), + taskRun: tb.TaskRun("test-taskrun", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + ), + expectingError: true, + }} + + for _, td := range testData { + t.Run(td.name, func(t *testing.T) { + defer unregisterMetrics() + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.RecordPodLatency(td.pod, td.taskRun) + if td.expectingError { + assertErrNotNil(err, "Pod Latency recording expected to return error but got nil", t) + return + } + assertErrIsNil(err, "RecordPodLatency recording expected to return nil but got error", t) + metricstest.CheckLastValueData(t, "taskruns_pod_latency", td.expectedTags, td.expectedValue) + }) + } + +} + +func addTaskruns(informer alpha1.TaskRunInformer, taskrun, task, ns string, status corev1.ConditionStatus, t *testing.T) { + err := informer.Informer().GetIndexer().Add(tb.TaskRun(taskrun, ns, + tb.TaskRunSpec( + tb.TaskRunTaskRef(task), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: status, + }), + ))) + + if err != nil { + t.Error("Failed to add the taskrun") + } +} + +func assertErrIsNil(err error, message string, t *testing.T) { + t.Helper() + if err != nil { + t.Errorf(message) + } +} + +func assertErrNotNil(err error, message string, t *testing.T) { + t.Helper() + if err == nil { + t.Errorf(message) + } +} + +func unregisterMetrics() { + metricstest.Unregister("taskrun_duration_seconds", "pipelinerun_taskrun_duration_seconds", "taskrun_count", "running_taskruns_count", "taskruns_pod_latency") +} diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 2d83d041364..9b0337b788b 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -71,6 +71,7 @@ type Reconciler struct { tracker tracker.Interface cache *entrypoint.Cache timeoutHandler *reconciler.TimeoutSet + metrics *Recorder } // Check that our Reconciler implements controller.Reconciler @@ -94,7 +95,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { c.Logger.Infof("task run %q in work queue no longer exists", key) return nil } else if err != nil { - c.Logger.Errorf("Error retreiving TaskRun %q: %s", name, err) + c.Logger.Errorf("Error retrieving TaskRun %q: %s", name, err) return err } @@ -111,6 +112,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } if tr.IsDone() { + c.Logger.Infof("taskrun done : %s \n", tr.Name) var merr *multierror.Error // Try to send cloud events first cloudEventErr := cloudevent.SendCloudEvents(tr, c.cloudEventClient, c.Logger) @@ -134,6 +136,18 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { c.Logger.Errorf("Error stopping sidecars for TaskRun %q: %v", name, err) merr = multierror.Append(merr, err) } + + go func(metrics *Recorder) { + err := metrics.DurationAndCount(tr) + if err != nil { + c.Logger.Warnf("Failed to log the metrics : %v", err) + } + err = metrics.RecordPodLatency(pod, tr) + if err != nil { + c.Logger.Warnf("Failed to log the metrics : %v", err) + } + }(c.metrics) + return merr.ErrorOrNil() } // Reconcile this copy of the task run and then write back any status @@ -146,25 +160,40 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1alpha1.TaskRun) error { - var err error - if equality.Semantic.DeepEqual(original.Status, tr.Status) { + var updated bool + + if !equality.Semantic.DeepEqual(original.Status, tr.Status) { // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the informer's // cache may be stale and we don't want to overwrite a prior update // to status with this stale state. - } else if _, err := c.updateStatus(tr); err != nil { - c.Logger.Warn("Failed to update taskRun status", zap.Error(err)) - return err + if _, err := c.updateStatus(tr); err != nil { + c.Logger.Warn("Failed to update taskRun status", zap.Error(err)) + return err + } + updated = true } + // Since we are using the status subresource, it is not possible to update // the status and labels/annotations simultaneously. - if !reflect.DeepEqual(original.ObjectMeta.Labels, tr.ObjectMeta.Labels) { + if !reflect.DeepEqual(original.ObjectMeta.Labels, tr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, tr.ObjectMeta.Annotations) { if _, err := c.updateLabelsAndAnnotations(tr); err != nil { c.Logger.Warn("Failed to update TaskRun labels/annotations", zap.Error(err)) return err } + updated = true + } + + if updated { + go func(metrics *Recorder) { + err := metrics.RunningTaskRuns(c.taskRunLister) + if err != nil { + c.Logger.Warnf("Failed to log the metrics : %v", err) + } + }(c.metrics) } - return err + + return nil } func (c *Reconciler) getTaskFunc(tr *v1alpha1.TaskRun) (resources.GetTask, v1alpha1.TaskKind) { diff --git a/test/builder/pod.go b/test/builder/pod.go index 8c56c96aeb5..7123ad80d7e 100644 --- a/test/builder/pod.go +++ b/test/builder/pod.go @@ -17,6 +17,8 @@ limitations under the License. package builder import ( + "time" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -27,6 +29,9 @@ type PodOp func(*corev1.Pod) // PodSpecOp is an operation which modifies a PodSpec struct. type PodSpecOp func(*corev1.PodSpec) +// PodStatusOp is an operation which modifies a PodStatus struct. +type PodStatusOp func(status *corev1.PodStatus) + // Pod creates a Pod with default values. // Any number of Pod modifiers can be passed to transform it. func Pod(name, namespace string, ops ...PodOp) *corev1.Pod { @@ -142,3 +147,28 @@ func PodVolumes(volumes ...corev1.Volume) PodSpecOp { spec.Volumes = volumes } } + +// PodCreationTimestamp sets the creation time of the pod +func PodCreationTimestamp(t time.Time) PodOp { + return func(p *corev1.Pod) { + p.CreationTimestamp = metav1.Time{Time: t} + } +} + +// PodStatus creates a PodStatus with default values. +// Any number of PodStatus modifiers can be passed to transform it. +func PodStatus(ops ...PodStatusOp) PodOp { + return func(pod *corev1.Pod) { + podStatus := &pod.Status + for _, op := range ops { + op(podStatus) + } + pod.Status = *podStatus + } +} + +func PodStatusConditions(cond corev1.PodCondition) PodStatusOp { + return func(status *corev1.PodStatus) { + status.Conditions = append(status.Conditions, cond) + } +} diff --git a/test/builder/task.go b/test/builder/task.go index 780690f7052..e133b677fdb 100644 --- a/test/builder/task.go +++ b/test/builder/task.go @@ -335,6 +335,13 @@ func TaskRunStartTime(startTime time.Time) TaskRunStatusOp { } } +// TaskRunCompletionTime sets the start time to the TaskRunStatus. +func TaskRunCompletionTime(completionTime time.Time) TaskRunStatusOp { + return func(s *v1alpha1.TaskRunStatus) { + s.CompletionTime = &metav1.Time{Time: completionTime} + } +} + // TaskRunCloudEvent adds an event to the TaskRunStatus. func TaskRunCloudEvent(target, error string, retryCount int32, condition v1alpha1.CloudEventCondition) TaskRunStatusOp { return func(s *v1alpha1.TaskRunStatus) {