-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathmonitor.go
332 lines (284 loc) · 9.48 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
package monitor
import (
"context"
"encoding/base64"
"errors"
"fmt"
"maps"
"math/rand/v2"
"reflect"
"sync"
"time"
"github.com/Khan/genqlient/graphql"
"github.com/buildkite/agent-stack-k8s/v2/api"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/model"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
)
type Monitor struct {
gql graphql.Client
logger *zap.Logger
cfg Config
}
type Config struct {
GraphQLEndpoint string
Namespace string
Token string
ClusterUUID string
MaxInFlight int
JobCreationConcurrency int
PollInterval time.Duration
StaleJobDataTimeout time.Duration
Org string
Tags []string
}
func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, error) {
graphqlClient := api.NewClient(cfg.Token, cfg.GraphQLEndpoint)
// Poll no more frequently than every 1s (please don't DoS us).
cfg.PollInterval = min(cfg.PollInterval, time.Second)
// Default StaleJobDataTimeout to 10s.
if cfg.StaleJobDataTimeout <= 0 {
cfg.StaleJobDataTimeout = config.DefaultStaleJobDataTimeout
}
// Default CreationConcurrency to 5.
if cfg.JobCreationConcurrency <= 0 {
cfg.JobCreationConcurrency = config.DefaultJobCreationConcurrency
}
return &Monitor{
gql: graphqlClient,
logger: logger,
cfg: cfg,
}, nil
}
// jobResp is used to identify the response types from methods that call the GraphQL API
// in the cases where a cluster is specified or otherwise.
// The return types are are isomorphic, but this has been lost in the generation of the
// API calling methods. As such, the implementations should be syntacticaly identical, but
// semantically, they operate on different types.
type jobResp interface {
OrganizationExists() bool
CommandJobs() []*api.JobJobTypeCommand
}
type unclusteredJobResp api.GetScheduledJobsResponse
func (r unclusteredJobResp) OrganizationExists() bool {
return r.Organization.Id != nil
}
func (r unclusteredJobResp) CommandJobs() []*api.JobJobTypeCommand {
jobs := make([]*api.JobJobTypeCommand, 0, len(r.Organization.Jobs.Edges))
for _, edge := range r.Organization.Jobs.Edges {
jobs = append(jobs, edge.Node.(*api.JobJobTypeCommand))
}
return jobs
}
type clusteredJobResp api.GetScheduledJobsClusteredResponse
func (r clusteredJobResp) OrganizationExists() bool {
return r.Organization.Id != nil
}
func (r clusteredJobResp) CommandJobs() []*api.JobJobTypeCommand {
jobs := make([]*api.JobJobTypeCommand, 0, len(r.Organization.Jobs.Edges))
for _, edge := range r.Organization.Jobs.Edges {
jobs = append(jobs, edge.Node.(*api.JobJobTypeCommand))
}
return jobs
}
// getScheduledCommandJobs calls either the clustered or unclustered GraphQL API
// methods, depending on if a cluster uuid was provided in the config
func (m *Monitor) getScheduledCommandJobs(ctx context.Context, queue string) (jobResp jobResp, err error) {
jobQueryCounter.Inc()
start := time.Now()
defer func() {
jobQueryDurationHistogram.Observe(time.Since(start).Seconds())
if err != nil {
jobQueryErrorCounter.Inc()
}
}()
if m.cfg.ClusterUUID == "" {
resp, err := api.GetScheduledJobs(ctx, m.gql, m.cfg.Org, []string{fmt.Sprintf("queue=%s", queue)})
return unclusteredJobResp(*resp), err
}
var agentQueryRule []string
if queue != "" {
agentQueryRule = append(agentQueryRule, fmt.Sprintf("queue=%s", queue))
}
resp, err := api.GetScheduledJobsClustered(
ctx, m.gql, m.cfg.Org, agentQueryRule, encodeClusterGraphQLID(m.cfg.ClusterUUID),
)
return clusteredJobResp(*resp), err
}
func (m *Monitor) Start(ctx context.Context, handler model.JobHandler) <-chan error {
logger := m.logger.With(zap.String("org", m.cfg.Org))
errs := make(chan error, 1)
agentTags, tagErrs := agenttags.TagMapFromTags(m.cfg.Tags)
if len(tagErrs) != 0 {
logger.Warn("making a map of agent tags", zap.Errors("err", tagErrs))
}
var queue string
var ok bool
if queue, ok = agentTags["queue"]; !ok {
errs <- errors.New("missing required tag: queue")
return errs
}
go func() {
logger.Info("started")
defer logger.Info("stopped")
monitorUpGauge.Set(1)
defer monitorUpGauge.Set(0)
ticker := time.NewTicker(m.cfg.PollInterval)
defer ticker.Stop()
first := make(chan struct{}, 1)
first <- struct{}{}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-first:
}
queriedAt := time.Now() // used for end-to-end durations
resp, err := m.getScheduledCommandJobs(ctx, queue)
if err != nil {
// Avoid logging if the context is already closed.
if ctx.Err() != nil {
return
}
logger.Warn("failed to get scheduled command jobs", zap.Error(err))
continue
}
if !resp.OrganizationExists() {
errs <- fmt.Errorf("invalid organization: %q", m.cfg.Org)
return
}
jobs := resp.CommandJobs()
if len(jobs) == 0 {
continue
}
jobsReturnedCounter.Add(float64(len(jobs)))
// The next handler should be the Limiter (except in some tests).
// Limiter handles deduplicating jobs before passing to the scheduler.
m.passJobsToNextHandler(ctx, logger, handler, agentTags, jobs, queriedAt)
}
}()
return errs
}
func (m *Monitor) passJobsToNextHandler(
ctx context.Context,
logger *zap.Logger,
handler model.JobHandler,
agentTags map[string]string,
jobs []*api.JobJobTypeCommand,
queriedAt time.Time,
) {
// A sneaky way to create a channel that is closed after a duration.
// Why not pass directly to handler.Handle? Because that might
// interrupt scheduling a pod, when all we want is to bound the
// time spent waiting for the limiter.
staleCtx, staleCancel := context.WithTimeout(ctx, m.cfg.StaleJobDataTimeout)
defer staleCancel()
// Why shuffle the jobs? Suppose we sort the jobs to prefer, say, oldest.
// The first job we'll always try to schedule will then be the oldest, which
// sounds reasonable. But if that job is not able to be accepted by the
// cluster for some reason (e.g. there are multiple stack controllers on the
// same BK queue, and the job is already created by another controller),
// and the k8s API is slow, then we'll live-lock between grabbing jobs,
// trying to run the same oldest one, failing, then timing out (staleness).
// Shuffling increases the odds of making progress.
rand.Shuffle(len(jobs), func(i, j int) {
jobs[i], jobs[j] = jobs[j], jobs[i]
})
// We also try to get more jobs to the API by processing them in parallel.
jobsCh := make(chan *api.JobJobTypeCommand)
defer close(jobsCh)
var wg sync.WaitGroup
for range min(m.cfg.JobCreationConcurrency, len(jobs)) {
wg.Add(1)
go func() {
defer wg.Done()
jobHandlerWorker(ctx, staleCtx, logger, handler, agentTags, queriedAt, jobsCh)
}()
}
defer wg.Wait()
for i, job := range jobs {
select {
case <-ctx.Done():
return
case <-staleCtx.Done():
// Every remaining job is stale.
staleJobsCounter.Add(float64(len(jobs) - i))
return
case jobsCh <- job:
}
}
}
func jobHandlerWorker(
ctx, staleCtx context.Context,
logger *zap.Logger,
handler model.JobHandler,
agentTags map[string]string,
queriedAt time.Time,
jobsCh <-chan *api.JobJobTypeCommand,
) {
for {
select {
case <-ctx.Done():
return
case <-staleCtx.Done():
return
case j := <-jobsCh:
if j == nil {
return
}
jobsReachedWorkerCounter.Inc()
jobTags, tagErrs := agenttags.TagMapFromTags(j.AgentQueryRules)
if len(tagErrs) != 0 {
logger.Warn("making a map of job tags", zap.Errors("err", tagErrs))
}
// The api returns jobs that match ANY agent tags (the agent query rules)
// However, we can only acquire jobs that match ALL agent tags
if !agenttags.JobTagsMatchAgentTags(maps.All(jobTags), agentTags) {
logger.Debug("skipping job because it did not match all tags", zap.Any("job", j))
jobsFilteredOutCounter.Inc()
continue
}
job := model.Job{
CommandJob: &j.CommandJob,
StaleCh: staleCtx.Done(),
QueriedAt: queriedAt,
}
// The next handler should be the deduper (except in some tests).
// Deduper handles deduplicating jobs before passing to the scheduler.
logger.Debug("passing job to next handler",
zap.Stringer("handler", reflect.TypeOf(handler)),
zap.String("job-uuid", j.Uuid),
)
jobHandlerCallsCounter.Inc()
// The next handler operates under the main ctx, but can optionally
// use staleCtx.Done() (stored in job) to skip work. (Only Limiter
// does this.)
switch err := handler.Handle(ctx, job); {
case errors.Is(err, model.ErrDuplicateJob):
// Job wasn't scheduled because it's already scheduled.
jobHandlerErrorCounter.WithLabelValues("duplicate").Inc()
case errors.Is(err, model.ErrStaleJob):
// Job wasn't scheduled because the data has become stale.
// Staleness is set within this function, so we can return early.
jobHandlerErrorCounter.WithLabelValues("stale").Inc()
staleJobsCounter.Inc() // also incremented elsewhere
return
case err != nil:
// Note: this check is for the original context, not staleCtx,
// in order to avoid the log when the context is cancelled
// (particularly during tests).
if ctx.Err() != nil {
return
}
logger.Error("failed to create job", zap.Error(err))
jobHandlerErrorCounter.WithLabelValues("other").Inc()
}
}
}
}
func encodeClusterGraphQLID(clusterUUID string) string {
return base64.StdEncoding.EncodeToString([]byte("Cluster---" + clusterUUID))
}