Skip to content

Commit

Permalink
feat: new template counting and phase counting metrics
Browse files Browse the repository at this point in the history
From #12589.

New metric `total_count` which is like the old `count` metric and the
new `gauge` metric, but a counter, not a gauge. The gauge shows a
snapshot of what is happening right now in the cluster, the counter
can answer questions like how many `Failed` workflows have there been
in the last 24 hours.

Two further metrics for counting uses of WorkflowTemplates via
workflowTemplateRef only. These store the name of the WorkflowTemplate
or ClusterWorkflowTemplate if the `cluster_scope` label is true, and
the namespace where it is used. `workflowtemplate_triggered_total`
counts the number of uses. `workflowtemplate_runtime` records how long
each phase the workflow running the template spent in seconds.

Note to reviewers: this is part of a stack of reviews for metrics
changes. Please don't merge until the rest of the stack is also ready.

Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel committed Aug 16, 2024
1 parent adf98ce commit e12e15c
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 6 deletions.
33 changes: 33 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,15 @@ A gauge of the number of queue items that have not been processed yet.

See [queue adds count](#queue_adds_count) for details.

#### `total_count`

A counter of workflows that have entered each phase for tracking them through their life-cycle, by namespace.

| attribute | explanation |
|-------------|------------------------------------------------|
| `phase` | The phase that the workflow has entered |
| `namespace` | The namespace in which the workflow is running |

#### `version`

Build metadata for this Controller.
Expand Down Expand Up @@ -365,6 +374,30 @@ This will tell you the number of workflows with running pods.
| `type` | the type of condition, currently only `Running` |
| `status` | `true` or `false` |

#### `workflowtemplate_runtime`

A histogram of the duration of workflows using `workflowTemplateRef` only, as they enter each phase.
Counts both WorkflowTemplate and ClusterWorkflowTemplate usage.
Records time between entering the `Running` phase and completion, so does not include any time in `Pending`.

| attribute | explanation |
|-----------------|--------------------------------------------------------------|
| `cluster_scope` | A boolean set true if this is a ClusterWorkflowTemplate |
| `name` | ⚠️ The name of the WorkflowTemplate/ClusterWorkflowTemplate. |
| `namespace` | The namespace from which the WorkflowTemplate is being used |

#### `workflowtemplate_triggered_total`

A counter of workflows using `workflowTemplateRef` only, as they enter each phase.
Counts both WorkflowTemplate and ClusterWorkflowTemplate usage.

| attribute | explanation |
|-----------------|--------------------------------------------------------------|
| `cluster_scope` | A boolean set true if this is a ClusterWorkflowTemplate |
| `name` | ⚠️ The name of the WorkflowTemplate/ClusterWorkflowTemplate. |
| `namespace` | The namespace from which the WorkflowTemplate is being used |
| `phase` | The phase that the workflow entered |

### Metric types

Please see the [Prometheus docs on metric types](https://prometheus.io/docs/concepts/metric_types/).
Expand Down
3 changes: 3 additions & 0 deletions docs/upgrading.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ The following are new metrics:
* `queue_longest_running`
* `queue_retries`
* `queue_unfinished_work`
* `total_count`
* `version`
* `workflowtemplate_runtime`
* `workflowtemplate_triggered_total`

and can be disabled with

Expand Down
50 changes: 50 additions & 0 deletions test/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,56 @@ func (s *MetricsSuite) TestFailedMetric() {
})
}

func (s *MetricsSuite) TestTemplateMetrics() {
s.Given().
Workflow(`@testdata/templateref-metrics.yaml`).
WorkflowTemplate(`@testdata/basic-workflowtemplate.yaml`).
When().
CreateWorkflowTemplates().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
s.e(s.T()).GET("").
Expect().
Status(200).
Body().
Contains(`total_count{namespace="argo",phase="Running"}`). // Count for this depends on other tests
Contains(`total_count{namespace="argo",phase="Succeeded"}`).
Contains(`workflowtemplate_triggered_total{cluster_scope="false",name="basic",namespace="argo",phase="New"} 1`).
Contains(`workflowtemplate_triggered_total{cluster_scope="false",name="basic",namespace="argo",phase="Running"} 1`).
Contains(`workflowtemplate_triggered_total{cluster_scope="false",name="basic",namespace="argo",phase="Succeeded"} 1`).
Contains(`workflowtemplate_runtime_count{cluster_scope="false",name="basic",namespace="argo"} 1`).
Contains(`workflowtemplate_runtime_bucket{cluster_scope="false",name="basic",namespace="argo",le="0"} 0`).
Contains(`workflowtemplate_runtime_bucket{cluster_scope="false",name="basic",namespace="argo",le="+Inf"} 1`)
})
}

func (s *MetricsSuite) TestClusterTemplateMetrics() {
s.Given().
Workflow(`@testdata/clustertemplateref-metrics.yaml`).
ClusterWorkflowTemplate(`@testdata/basic-clusterworkflowtemplate.yaml`).
When().
CreateClusterWorkflowTemplates().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
s.e(s.T()).GET("").
Expect().
Status(200).
Body().
Contains(`workflowtemplate_triggered_total{cluster_scope="true",name="basic",namespace="argo",phase="New"} 1`).
Contains(`workflowtemplate_triggered_total{cluster_scope="true",name="basic",namespace="argo",phase="Running"} 1`).
Contains(`workflowtemplate_triggered_total{cluster_scope="true",name="basic",namespace="argo",phase="Succeeded"} 1`).
Contains(`workflowtemplate_runtime_count{cluster_scope="true",name="basic",namespace="argo"} 1`).
Contains(`workflowtemplate_runtime_bucket{cluster_scope="true",name="basic",namespace="argo",le="0"} 0`).
Contains(`workflowtemplate_runtime_bucket{cluster_scope="true",name="basic",namespace="argo",le="+Inf"} 1`)
})
}

func TestMetricsSuite(t *testing.T) {
suite.Run(t, new(MetricsSuite))
}
8 changes: 8 additions & 0 deletions test/e2e/testdata/clustertemplateref-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: templateref-metrics-
spec:
workflowTemplateRef:
name: basic
clusterScope: true
7 changes: 7 additions & 0 deletions test/e2e/testdata/templateref-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: templateref-metrics-
spec:
workflowTemplateRef:
name: basic
30 changes: 24 additions & 6 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2339,6 +2339,21 @@ func (woc *wfOperationCtx) checkTemplateTimeout(tmpl *wfv1.Template, node *wfv1.
return nil, nil
}

// recordWorkflowPhaseChange stores the metrics associated with the workflow phase changing
func (woc *wfOperationCtx) recordWorkflowPhaseChange(ctx context.Context) {
phase := string(woc.wf.Status.Phase)
woc.controller.metrics.ChangeWorkflowPhase(ctx, phase, woc.wf.ObjectMeta.Namespace)
if woc.wf.Spec.WorkflowTemplateRef != nil { // not-woc-misuse
woc.controller.metrics.CountWorkflowTemplate(ctx, phase, woc.wf.Spec.WorkflowTemplateRef.Name, woc.wf.ObjectMeta.Namespace, woc.wf.Spec.WorkflowTemplateRef.ClusterScope) // not-woc-misuse
switch woc.wf.Status.Phase {
case wfv1.WorkflowSucceeded, wfv1.WorkflowFailed, wfv1.WorkflowError:
duration := time.Since(woc.wf.Status.StartedAt.Time)
woc.controller.metrics.RecordWorkflowTemplateTime(ctx, duration, woc.wf.Spec.WorkflowTemplateRef.Name, woc.wf.ObjectMeta.Namespace, woc.wf.Spec.WorkflowTemplateRef.ClusterScope) // not-woc-misuse
log.Warnf("Recording template time")
}
}
}

// markWorkflowPhase is a convenience method to set the phase of the workflow with optional message
// optionally marks the workflow completed, which sets the finishedAt timestamp and completed label
func (woc *wfOperationCtx) markWorkflowPhase(ctx context.Context, phase wfv1.WorkflowPhase, message string) {
Expand All @@ -2358,6 +2373,7 @@ func (woc *wfOperationCtx) markWorkflowPhase(ctx context.Context, phase wfv1.Wor
woc.log.Infof("Updated phase %s -> %s", woc.wf.Status.Phase, phase)
woc.updated = true
woc.wf.Status.Phase = phase
woc.recordWorkflowPhaseChange(ctx)
if woc.wf.ObjectMeta.Labels == nil {
woc.wf.ObjectMeta.Labels = make(map[string]string)
}
Expand Down Expand Up @@ -3840,7 +3856,7 @@ func (woc *wfOperationCtx) includeScriptOutput(nodeName, boundaryID string) (boo
return hasOutputResultRef(name, parentTemplate), nil
}

func (woc *wfOperationCtx) fetchWorkflowSpec() (wfv1.WorkflowSpecHolder, error) {
func (woc *wfOperationCtx) fetchWorkflowSpec(ctx context.Context) (wfv1.WorkflowSpecHolder, error) {
if woc.wf.Spec.WorkflowTemplateRef == nil { // not-woc-misuse
return nil, fmt.Errorf("cannot fetch workflow spec without workflowTemplateRef")
}
Expand All @@ -3853,8 +3869,10 @@ func (woc *wfOperationCtx) fetchWorkflowSpec() (wfv1.WorkflowSpecHolder, error)
woc.log.WithError(err).Error("clusterWorkflowTemplate RBAC is missing")
return nil, fmt.Errorf("cannot get resource clusterWorkflowTemplate at cluster scope")
}
specHolder, err = woc.controller.cwftmplInformer.Lister().Get(woc.wf.Spec.WorkflowTemplateRef.Name) // not-woc-misuse
woc.controller.metrics.CountWorkflowTemplate(ctx, "New", woc.wf.Spec.WorkflowTemplateRef.Name, woc.wf.Namespace, true) // not-woc-misuse
specHolder, err = woc.controller.cwftmplInformer.Lister().Get(woc.wf.Spec.WorkflowTemplateRef.Name) // not-woc-misuse
} else {
woc.controller.metrics.CountWorkflowTemplate(ctx, "New", woc.wf.Spec.WorkflowTemplateRef.Name, woc.wf.Namespace, false) // not-woc-misuse
specHolder, err = woc.controller.wftmplInformer.Lister().WorkflowTemplates(woc.wf.Namespace).Get(woc.wf.Spec.WorkflowTemplateRef.Name) // not-woc-misuse
}
if err != nil {
Expand All @@ -3872,7 +3890,7 @@ func (woc *wfOperationCtx) retryStrategy(tmpl *wfv1.Template) *wfv1.RetryStrateg

func (woc *wfOperationCtx) setExecWorkflow(ctx context.Context) error {
if woc.wf.Spec.WorkflowTemplateRef != nil { // not-woc-misuse
err := woc.setStoredWfSpec()
err := woc.setStoredWfSpec(ctx)
if err != nil {
woc.markWorkflowError(ctx, err)
return err
Expand Down Expand Up @@ -3964,7 +3982,7 @@ func (woc *wfOperationCtx) needsStoredWfSpecUpdate() bool {
(woc.wf.Spec.Shutdown != woc.wf.Status.StoredWorkflowSpec.Shutdown) // not-woc-misuse
}

func (woc *wfOperationCtx) setStoredWfSpec() error {
func (woc *wfOperationCtx) setStoredWfSpec(ctx context.Context) error {
wfDefault := woc.controller.Config.WorkflowDefaults
if wfDefault == nil {
wfDefault = &wfv1.Workflow{}
Expand All @@ -3974,7 +3992,7 @@ func (woc *wfOperationCtx) setStoredWfSpec() error {

// Load the spec from WorkflowTemplate in first time.
if woc.wf.Status.StoredWorkflowSpec == nil {
wftHolder, err := woc.fetchWorkflowSpec()
wftHolder, err := woc.fetchWorkflowSpec(ctx)
if err != nil {
return err
}
Expand All @@ -3992,7 +4010,7 @@ func (woc *wfOperationCtx) setStoredWfSpec() error {
woc.wf.Status.StoredWorkflowSpec = &mergedWf.Spec
woc.updated = true
} else if woc.controller.Config.WorkflowRestrictions.MustNotChangeSpec() {
wftHolder, err := woc.fetchWorkflowSpec()
wftHolder, err := woc.fetchWorkflowSpec(ctx)
if err != nil {
return err
}
Expand Down
33 changes: 33 additions & 0 deletions workflow/metrics/counter_template.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package metrics

import (
"context"
)

const (
nameWFTemplateTriggered = `workflowtemplate_triggered_total`
)

func addWorkflowTemplateCounter(_ context.Context, m *Metrics) error {
return m.createInstrument(int64Counter,
nameWFTemplateTriggered,
"Total number of workflow templates triggered by workflowTemplateRef",
"{workflow_template}",
withAsBuiltIn(),
)
}

func templateLabels(name, namespace string, cluster bool) instAttribs {
return instAttribs{
{name: labelTemplateName, value: name},
{name: labelTemplateNamespace, value: namespace},
{name: labelTemplateCluster, value: cluster},
}
}

func (m *Metrics) CountWorkflowTemplate(ctx context.Context, phase, name, namespace string, cluster bool) {
labels := templateLabels(name, namespace, cluster)
labels = append(labels, instAttrib{name: labelWorkflowPhase, value: phase})

m.addInt(ctx, nameWFTemplateTriggered, 1, labels)
}
25 changes: 25 additions & 0 deletions workflow/metrics/counter_workflow_phase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package metrics

import (
"context"
)

const (
nameWorkflowPhaseCounter = `total_count`
)

func addWorkflowPhaseCounter(_ context.Context, m *Metrics) error {
return m.createInstrument(int64Counter,
nameWorkflowPhaseCounter,
"Total number of workflows that have entered each phase",
"{workflow}",
withAsBuiltIn(),
)
}

func (m *Metrics) ChangeWorkflowPhase(ctx context.Context, phase, namespace string) {
m.addInt(ctx, nameWorkflowPhaseCounter, 1, instAttribs{
{name: labelWorkflowPhase, value: phase},
{name: labelWorkflowNamespace, value: namespace},
})
}
23 changes: 23 additions & 0 deletions workflow/metrics/histogram_template.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package metrics

import (
"context"
"time"
)

const (
nameWorkflowTemplateRuntime = `workflowtemplate_runtime`
)

func addWorkflowTemplateHistogram(_ context.Context, m *Metrics) error {
return m.createInstrument(float64Histogram,
nameWorkflowTemplateRuntime,
"Duration of workflow template runs run through workflowTemplateRefs",
"s",
withAsBuiltIn(),
)
}

func (m *Metrics) RecordWorkflowTemplateTime(ctx context.Context, duration time.Duration, name, namespace string, cluster bool) {
m.record(ctx, nameWorkflowTemplateRuntime, duration.Seconds(), templateLabels(name, namespace, cluster))
}
4 changes: 4 additions & 0 deletions workflow/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ const (
labelRequestVerb = `verb`
labelRequestCode = `status_code`

labelTemplateName string = `name`
labelTemplateNamespace string = `namespace`
labelTemplateCluster string = `cluster_scope`

labelWorkerType string = `worker_type`

labelWorkflowNamespace string = `namespace`
Expand Down
3 changes: 3 additions & 0 deletions workflow/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func New(ctx context.Context, serviceName string, config *Config, callbacks Call
addPodPhaseGauge,
addPodMissingCounter,
addWorkflowPhaseGauge,
addWorkflowPhaseCounter,
addWorkflowTemplateCounter,
addWorkflowTemplateHistogram,
addOperationDurationHistogram,
addErrorCounter,
addLogCounter,
Expand Down

0 comments on commit e12e15c

Please sign in to comment.