Skip to content

Commit

Permalink
Added support for tagged metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu committed Nov 2, 2017
1 parent 9593e12 commit 103ff55
Showing 1 changed file with 58 additions and 11 deletions.
69 changes: 58 additions & 11 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ type AllocRunner struct {
// can lower write volume by not re-writing these values
immutablePersisted bool
allocDirPersisted bool

// baseLabels are used when emitting tagged metrics. All alloc runner metrics
// will have these tags, and optionally more.
baseLabels []metrics.Label
}

// COMPAT: Remove in 0.7.0
Expand Down Expand Up @@ -174,6 +178,18 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,

// TODO Should be passed a context
ar.ctx, ar.exitFn = context.WithCancel(context.TODO())

ar.baseLabels = []metrics.Label{
{
Name: "job",
Value: alloc.Job.Name,
},
{
Name: "task_group",
Value: alloc.TaskGroup,
},
}

return ar
}

Expand Down Expand Up @@ -646,7 +662,13 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
taskState.Failed = true
}
if event.Type == structs.TaskRestarting {
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "restart"}, 1)
if !r.config.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "restart"},
1, r.baseLabels)
}
if r.config.BackwardsCompatibleMetrics {
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "restart"}, 1)
}
taskState.Restarts++
taskState.LastRestart = time.Unix(0, event.Time)
}
Expand All @@ -670,7 +692,13 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
// Capture the start time if it is just starting
if taskState.State != structs.TaskStateRunning {
taskState.StartedAt = time.Now().UTC()
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "running"}, 1)
if !r.config.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "running"},
1, r.baseLabels)
}
if r.config.BackwardsCompatibleMetrics {
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "running"}, 1)
}
}
case structs.TaskStateDead:
// Capture the finished time. If it has never started there is no finish
Expand All @@ -695,9 +723,21 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv

// Emitting metrics to indicate task complete and failures
if taskState.Failed {
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "failed"}, 1)
if !r.config.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "failed"},
1, r.baseLabels)
}
if r.config.BackwardsCompatibleMetrics {
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "failed"}, 1)
}
} else {
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "complete"}, 1)
if !r.config.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "complete"},
1, r.baseLabels)
}
if r.config.BackwardsCompatibleMetrics {
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "complete"}, 1)
}
}
// If the task failed, we should kill all the other tasks in the task group.
if taskState.Failed {
Expand Down Expand Up @@ -804,7 +844,13 @@ func (r *AllocRunner) Run() {
}

// Increment alloc runner start counter. Incr'd even when restoring existing tasks so 1 start != 1 task execution
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, "start"}, 1)
if !r.config.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "start"},
1, r.baseLabels)
}
if r.config.BackwardsCompatibleMetrics {
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, "start"}, 1)
}

// Start the watcher
wCtx, watcherCancel := context.WithCancel(r.ctx)
Expand Down Expand Up @@ -935,12 +981,13 @@ func (r *AllocRunner) handleDestroy() {
alloc := r.Alloc()

// Increment the destroy count for this alloc runner since this allocation is being removed from this client.
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, "destroy"}, 1)

//TODO(schmichael) updater can cause a GC which can block on this alloc
// runner shutting down. Since handleDestroy can be called by Run() we
// can't block shutdown here as it would cause a deadlock.
go r.updater(alloc)
if !r.config.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "destroy"},
1, r.baseLabels)
}
if r.config.BackwardsCompatibleMetrics {
metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, "destroy"}, 1)
}

// Broadcast and persist state synchronously
r.sendBroadcast(alloc)
Expand Down

0 comments on commit 103ff55

Please sign in to comment.