-
Notifications
You must be signed in to change notification settings - Fork 2k
/
check_watcher.go
341 lines (285 loc) · 8.95 KB
/
check_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
package consul
import (
"context"
"fmt"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// defaultPollFreq is the default rate to poll the Consul Checks API
defaultPollFreq = 900 * time.Millisecond
)
// ChecksAPI is the part of the Consul API the checkWatcher requires.
type ChecksAPI interface {
// Checks returns a list of all checks.
Checks() (map[string]*api.AgentCheck, error)
}
// WorkloadRestarter allows the checkWatcher to restart tasks or entire task groups.
type WorkloadRestarter interface {
Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error
}
// checkRestart handles restarting a task if a check is unhealthy.
type checkRestart struct {
allocID string
taskName string
checkID string
checkName string
taskKey string // composite of allocID + taskName for uniqueness
task WorkloadRestarter
grace time.Duration
interval time.Duration
timeLimit time.Duration
ignoreWarnings bool
// Mutable fields
// unhealthyState is the time a check first went unhealthy. Set to the
// zero value if the check passes before timeLimit.
unhealthyState time.Time
// graceUntil is when the check's grace period expires and unhealthy
// checks should be counted.
graceUntil time.Time
logger log.Logger
}
// apply restart state for check and restart task if necessary. Current
// timestamp is passed in so all check updates have the same view of time (and
// to ease testing).
//
// Returns true if a restart was triggered in which case this check should be
// removed (checks are added on task startup).
func (c *checkRestart) apply(ctx context.Context, now time.Time, status string) bool {
healthy := func() {
if !c.unhealthyState.IsZero() {
c.logger.Debug("canceling restart because check became healthy")
c.unhealthyState = time.Time{}
}
}
switch status {
case api.HealthCritical:
case api.HealthWarning:
if c.ignoreWarnings {
// Warnings are ignored, reset state and exit
healthy()
return false
}
default:
// All other statuses are ok, reset state and exit
healthy()
return false
}
if now.Before(c.graceUntil) {
// In grace period, exit
return false
}
if c.unhealthyState.IsZero() {
// First failure, set restart deadline
if c.timeLimit != 0 {
c.logger.Debug("check became unhealthy. Will restart if check doesn't become healthy", "time_limit", c.timeLimit)
}
c.unhealthyState = now
}
// restart timeLimit after start of this check becoming unhealthy
restartAt := c.unhealthyState.Add(c.timeLimit)
// Must test >= because if limit=1, restartAt == first failure
if now.Equal(restartAt) || now.After(restartAt) {
// hasn't become healthy by deadline, restart!
c.logger.Debug("restarting due to unhealthy check")
// Tell TaskRunner to restart due to failure
reason := fmt.Sprintf("healthcheck: check %q unhealthy", c.checkName)
event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason)
go asyncRestart(ctx, c.logger, c.task, event)
return true
}
return false
}
// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended
// to be called in a goroutine.
func asyncRestart(ctx context.Context, logger log.Logger, task WorkloadRestarter, event *structs.TaskEvent) {
// Check watcher restarts are always failures
const failure = true
// Restarting is asynchronous so there's no reason to allow this
// goroutine to block indefinitely.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := task.Restart(ctx, event, failure); err != nil {
// Restart errors are not actionable and only relevant when
// debugging allocation lifecycle management.
logger.Debug("failed to restart task", "error", err,
"event_time", event.Time, "event_type", event.Type)
}
}
// checkWatchUpdates add or remove checks from the watcher
type checkWatchUpdate struct {
checkID string
remove bool
checkRestart *checkRestart
}
// checkWatcher watches Consul checks and restarts tasks when they're
// unhealthy.
type checkWatcher struct {
consul ChecksAPI
// pollFreq is how often to poll the checks API and defaults to
// defaultPollFreq
pollFreq time.Duration
// checkUpdateCh is how watches (and removals) are sent to the main
// watching loop
checkUpdateCh chan checkWatchUpdate
// done is closed when Run has exited
done chan struct{}
// lastErr is true if the last Consul call failed. It is used to
// squelch repeated error messages.
lastErr bool
logger log.Logger
}
// newCheckWatcher creates a new checkWatcher but does not call its Run method.
func newCheckWatcher(logger log.Logger, consul ChecksAPI) *checkWatcher {
return &checkWatcher{
consul: consul,
pollFreq: defaultPollFreq,
checkUpdateCh: make(chan checkWatchUpdate, 8),
done: make(chan struct{}),
logger: logger.ResetNamed("consul.health"),
}
}
// Run the main Consul checks watching loop to restart tasks when their checks
// fail. Blocks until context is canceled.
func (w *checkWatcher) Run(ctx context.Context) {
defer close(w.done)
// map of check IDs to their metadata
checks := map[string]*checkRestart{}
// timer for check polling
checkTimer := time.NewTimer(0)
defer checkTimer.Stop() // ensure timer is never leaked
stopTimer := func() {
checkTimer.Stop()
select {
case <-checkTimer.C:
default:
}
}
// disable by default
stopTimer()
// Main watch loop
for {
// disable polling if there are no checks
if len(checks) == 0 {
stopTimer()
}
select {
case update := <-w.checkUpdateCh:
if update.remove {
// Remove a check
delete(checks, update.checkID)
continue
}
// Add/update a check
checks[update.checkID] = update.checkRestart
w.logger.Debug("watching check", "alloc_id", update.checkRestart.allocID,
"task", update.checkRestart.taskName, "check", update.checkRestart.checkName)
// if first check was added make sure polling is enabled
if len(checks) == 1 {
stopTimer()
checkTimer.Reset(w.pollFreq)
}
case <-ctx.Done():
return
case <-checkTimer.C:
checkTimer.Reset(w.pollFreq)
// Set "now" as the point in time the following check results represent
now := time.Now()
results, err := w.consul.Checks()
if err != nil {
if !w.lastErr {
w.lastErr = true
w.logger.Error("failed retrieving health checks", "error", err)
}
continue
}
w.lastErr = false
// Keep track of tasks restarted this period so they
// are only restarted once and all of their checks are
// removed.
restartedTasks := map[string]struct{}{}
// Loop over watched checks and update their status from results
for cid, check := range checks {
// Shortcircuit if told to exit
if ctx.Err() != nil {
return
}
if _, ok := restartedTasks[check.taskKey]; ok {
// Check for this task already restarted; remove and skip check
delete(checks, cid)
continue
}
result, ok := results[cid]
if !ok {
// Only warn if outside grace period to avoid races with check registration
if now.After(check.graceUntil) {
w.logger.Warn("watched check not found in Consul", "check", check.checkName, "check_id", cid)
}
continue
}
restarted := check.apply(ctx, now, result.Status)
if restarted {
// Checks are registered+watched on
// startup, so it's safe to remove them
// whenever they're restarted
delete(checks, cid)
restartedTasks[check.taskKey] = struct{}{}
}
}
// Ensure even passing checks for restartedTasks are removed
if len(restartedTasks) > 0 {
for cid, check := range checks {
if _, ok := restartedTasks[check.taskKey]; ok {
delete(checks, cid)
}
}
}
}
}
}
// Watch a check and restart its task if unhealthy.
func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, restarter WorkloadRestarter) {
if !check.TriggersRestarts() {
// Not watched, noop
return
}
c := &checkRestart{
allocID: allocID,
taskName: taskName,
checkID: checkID,
checkName: check.Name,
taskKey: fmt.Sprintf("%s%s", allocID, taskName), // unique task ID
task: restarter,
interval: check.Interval,
grace: check.CheckRestart.Grace,
graceUntil: time.Now().Add(check.CheckRestart.Grace),
timeLimit: check.Interval * time.Duration(check.CheckRestart.Limit-1),
ignoreWarnings: check.CheckRestart.IgnoreWarnings,
logger: w.logger.With("alloc_id", allocID, "task", taskName, "check", check.Name),
}
update := checkWatchUpdate{
checkID: checkID,
checkRestart: c,
}
select {
case w.checkUpdateCh <- update:
// sent watch
case <-w.done:
// exited; nothing to do
}
}
// Unwatch a check.
func (w *checkWatcher) Unwatch(cid string) {
c := checkWatchUpdate{
checkID: cid,
remove: true,
}
select {
case w.checkUpdateCh <- c:
// sent remove watch
case <-w.done:
// exited; nothing to do
}
}