-
Notifications
You must be signed in to change notification settings - Fork 2k
/
stats_hook.go
161 lines (133 loc) · 4.02 KB
/
stats_hook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package taskrunner
import (
"context"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
)
// StatsUpdater is the interface required by the StatsHook to update stats.
// Satisfied by TaskRunner.
type StatsUpdater interface {
UpdateStats(*cstructs.TaskResourceUsage)
}
// statsHook manages the task stats collection goroutine.
type statsHook struct {
updater StatsUpdater
interval time.Duration
// cancel is called by Exited
cancel context.CancelFunc
mu sync.Mutex
logger hclog.Logger
}
func newStatsHook(su StatsUpdater, interval time.Duration, logger hclog.Logger) *statsHook {
h := &statsHook{
updater: su,
interval: interval,
}
h.logger = logger.Named(h.Name())
return h
}
func (*statsHook) Name() string {
return "stats_hook"
}
func (h *statsHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
// This shouldn't happen, but better safe than risk leaking a goroutine
if h.cancel != nil {
h.logger.Debug("poststart called twice without exiting between")
h.cancel()
}
// Using a new context here because the existing context is for the scope of
// the Poststart request. If that context was used, stats collection would
// stop when the task was killed. It makes for more readable code and better
// follows the taskrunner hook model to create a new context that can be
// canceled on the Exited hook.
ctx, cancel := context.WithCancel(context.Background())
h.cancel = cancel
go h.collectResourceUsageStats(ctx, req.DriverStats)
return nil
}
func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
if h.cancel == nil {
// No stats running
return nil
}
// Call cancel to stop stats collection
h.cancel()
// Clear cancel func so we don't double call for any reason
h.cancel = nil
return nil
}
// collectResourceUsageStats starts collecting resource usage stats of a Task.
// Collection ends when the passed channel is closed
func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interfaces.DriverStats) {
ch, err := handle.Stats(ctx, h.interval)
if err != nil {
// Check if the driver doesn't implement stats
if err.Error() == cstructs.DriverStatsNotImplemented.Error() {
h.logger.Debug("driver does not support stats")
return
}
h.logger.Error("failed to start stats collection for task", "error", err)
}
var backoff time.Duration
var retry int
limit := time.Second * 5
for {
time.Sleep(backoff)
select {
case ru, ok := <-ch:
// Channel is closed
if !ok {
var re *structs.RecoverableError
ch, err = handle.Stats(ctx, h.interval)
if err == nil {
goto RETRY
}
// We do not log when the plugin is shutdown since this is
// likely because the driver plugin has unexpectedly exited,
// in which case sleeping and trying again or returning based
// on the stop channel is the correct behavior
if err != bstructs.ErrPluginShutdown {
h.logger.Debug("error fetching stats of task", "error", err)
goto RETRY
}
// check if the error is terminal otherwise it's likely a
// transport error and we should retry
re, ok = err.(*structs.RecoverableError)
if ok && re.IsUnrecoverable() {
return
}
h.logger.Warn("stats collection for task failed", "error", err)
RETRY:
// Calculate the new backoff
backoff = (1 << (2 * uint64(retry))) * time.Second
if backoff > limit {
backoff = limit
}
// Increment retry counter
retry++
continue
}
// Update stats on TaskRunner and emit them
h.updater.UpdateStats(ru)
case <-ctx.Done():
return
}
}
}
func (h *statsHook) Shutdown() {
h.mu.Lock()
defer h.mu.Unlock()
if h.cancel == nil {
return
}
h.cancel()
}