-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
Copy pathalias.go
299 lines (262 loc) · 8.88 KB
/
alias.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package checks
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
// Constants related to alias check backoff.
const (
checkAliasBackoffMin = 3 // 3 attempts before backing off
checkAliasBackoffMaxWait = 1 * time.Minute // maximum backoff wait time
)
// CheckAlias is a check type that aliases the health of another service
// instance or node. If the service aliased has any critical health checks, then
// this check is critical. If the service has no critical but warnings,
// then this check is warning, and if a service has only passing checks, then
// this check is passing.
type CheckAlias struct {
Node string // Node name of the service. If empty, assumed to be this node.
ServiceID structs.ServiceID // ID (not name) of the service to alias
CheckID structs.CheckID // ID of this check
RPC RPC // Used to query remote server if necessary
RPCReq structs.NodeSpecificRequest // Base request
Notify AliasNotifier // For updating the check state
stop bool
stopCh chan struct{}
stopLock sync.Mutex
stopWg sync.WaitGroup
acl.EnterpriseMeta
}
// AliasNotifier is a CheckNotifier specifically for the Alias check.
// This requires additional methods that are satisfied by the agent
// local state.
type AliasNotifier interface {
CheckNotifier
AddAliasCheck(structs.CheckID, structs.ServiceID, chan<- struct{}) error
RemoveAliasCheck(structs.CheckID, structs.ServiceID)
Checks(*acl.EnterpriseMeta) map[structs.CheckID]*structs.HealthCheck
}
// Start is used to start the check, runs until Stop() func (c *CheckAlias) Start() {
func (c *CheckAlias) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
c.stop = false
c.stopCh = make(chan struct{})
c.stopWg.Add(1)
go c.run(c.stopCh)
}
// Stop is used to stop the check.
func (c *CheckAlias) Stop() {
c.stopLock.Lock()
if !c.stop {
c.stop = true
close(c.stopCh)
}
c.stopLock.Unlock()
// Wait until the associated goroutine is definitely complete before
// returning to the caller. This is to prevent the new and old checks from
// both updating the state of the alias check using possibly stale
// information.
c.stopWg.Wait()
}
// run is invoked in a goroutine until Stop() is called.
func (c *CheckAlias) run(stopCh chan struct{}) {
defer c.stopWg.Done()
// If we have a specific node set, then use a blocking query
if c.Node != "" {
c.runQuery(stopCh)
return
}
// Use the local state to match the service.
c.runLocal(stopCh)
}
func (c *CheckAlias) runLocal(stopCh chan struct{}) {
// Very important this is buffered as 1 so that we do not lose any
// queued updates. This only has to be exactly 1 since the existence
// of any update triggers us to load the full health check state.
notifyCh := make(chan struct{}, 1)
c.Notify.AddAliasCheck(c.CheckID, c.ServiceID, notifyCh)
defer c.Notify.RemoveAliasCheck(c.CheckID, c.ServiceID)
// maxDurationBetweenUpdates is maximum time we go between explicit
// notifications before we re-query the aliased service checks anyway. This
// helps in the case we miss an edge triggered event and the alias does not
// accurately reflect the underlying service health status.
const maxDurationBetweenUpdates = 1 * time.Minute
var refreshTimer <-chan time.Time
extendRefreshTimer := func() {
refreshTimer = time.After(maxDurationBetweenUpdates)
}
updateStatus := func() {
checks := c.Notify.Checks(c.WithWildcardNamespace())
checksList := make([]*structs.HealthCheck, 0, len(checks))
for _, chk := range checks {
checksList = append(checksList, chk)
}
c.processChecks(checksList, func(serviceID *structs.ServiceID) bool {
return c.Notify.ServiceExists(*serviceID)
})
extendRefreshTimer()
}
// Immediately run to get the current state of the target service
updateStatus()
for {
select {
case <-refreshTimer:
updateStatus()
case <-notifyCh:
updateStatus()
case <-stopCh:
return
}
}
}
// CheckIfServiceIDExists is used to determine if a service exists
type CheckIfServiceIDExists func(*structs.ServiceID) bool
func (c *CheckAlias) checkServiceExistsOnRemoteServer(serviceID *structs.ServiceID) (bool, error) {
if serviceID == nil {
return false, fmt.Errorf("serviceID cannot be nil")
}
args := c.RPCReq
args.Node = c.Node
args.AllowStale = true
args.EnterpriseMeta = c.EnterpriseMeta
// We are late at maximum of 15s compared to leader
args.MaxStaleDuration = 15 * time.Second
attempts := 0
RETRY_CALL:
var out structs.IndexedNodeServices
attempts++
if err := c.RPC.RPC(context.Background(), "Catalog.NodeServices", &args, &out); err != nil {
if attempts <= 3 {
time.Sleep(time.Duration(attempts) * time.Second)
goto RETRY_CALL
}
return false, err
}
// Do not proceed for nil returned services.
if out.NodeServices == nil {
return false, fmt.Errorf("no services found on node")
}
for _, srv := range out.NodeServices.Services {
if serviceID.Matches(srv.CompoundServiceID()) {
return true, nil
}
}
return false, nil
}
func (c *CheckAlias) runQuery(stopCh chan struct{}) {
args := c.RPCReq
args.Node = c.Node
args.AllowStale = true
args.MaxQueryTime = 1 * time.Minute
args.EnterpriseMeta = c.EnterpriseMeta
// We are late at maximum of 15s compared to leader
args.MaxStaleDuration = 15 * time.Second
var attempt uint
for {
// Check if we're stopped. We fallthrough and block otherwise,
// which has a maximum time set above so we'll always check for
// stop within a reasonable amount of time.
select {
case <-stopCh:
return
default:
}
// Backoff if we have to
if attempt > checkAliasBackoffMin {
shift := attempt - checkAliasBackoffMin
if shift > 31 {
shift = 31 // so we don't overflow to 0
}
waitTime := (1 << shift) * time.Second
if waitTime > checkAliasBackoffMaxWait {
waitTime = checkAliasBackoffMaxWait
}
time.Sleep(waitTime)
}
// Get the current health checks for the specified node.
//
// NOTE(mitchellh): This currently returns ALL health checks for
// a node even though we also have the service ID. This can be
// optimized if we introduce a new RPC endpoint to filter both,
// but for blocking queries isn't that much more efficient since the checks
// index is global to the cluster.
var out structs.IndexedHealthChecks
if err := c.RPC.RPC(context.Background(), "Health.NodeChecks", &args, &out); err != nil {
attempt++
if attempt > 1 {
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical,
fmt.Sprintf("Failure checking aliased node or service: %s", err))
}
continue
}
attempt = 0 // Reset the attempts so we don't backoff the next
// Set our index for the next request
args.MinQueryIndex = out.Index
// We want to ensure that we're always blocking on subsequent requests
// to avoid hot loops. Index 1 is always safe since the min raft index
// is at least 5. Note this shouldn't happen but protecting against this
// case is safer than a 100% CPU loop.
if args.MinQueryIndex < 1 {
args.MinQueryIndex = 1
}
c.processChecks(out.HealthChecks, func(serviceID *structs.ServiceID) bool {
ret, err := c.checkServiceExistsOnRemoteServer(serviceID)
if err != nil {
// We cannot determine if node has the check, let's assume it exists
return true
}
return ret
})
}
}
// processChecks is a common helper for taking a set of health checks and
// using them to update our alias. This is abstracted since the checks can
// come from both the remote server as well as local state.
func (c *CheckAlias) processChecks(checks []*structs.HealthCheck, CheckIfServiceIDExists CheckIfServiceIDExists) {
health := api.HealthPassing
msg := "No checks found."
serviceFound := false
for _, chk := range checks {
if c.Node != "" && !strings.EqualFold(c.Node, chk.Node) {
continue
}
serviceMatch := c.ServiceID.Matches(chk.CompoundServiceID())
if chk.ServiceID != "" && !serviceMatch {
continue
}
// We have at least one healthcheck for this service
if serviceMatch {
serviceFound = true
}
if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning {
health = chk.Status
msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output)
// Critical checks exit the for loop immediately since we
// know that this is the health state. Warnings do not since
// there may still be a critical check.
if chk.Status == api.HealthCritical {
break
}
} else {
// if current health is warning, don't overwrite it
if health == api.HealthPassing {
msg = "All checks passing."
}
}
}
if !serviceFound {
if !CheckIfServiceIDExists(&c.ServiceID) {
msg = fmt.Sprintf("Service %s could not be found on node %s", c.ServiceID.ID, c.Node)
health = api.HealthCritical
}
}
c.Notify.UpdateCheck(c.CheckID, health, msg)
}