-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
adopt.go
307 lines (283 loc) · 9.75 KB
/
adopt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package jobs
import (
"context"
"fmt"
"sync"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/opentracing/opentracing-go"
)
// claimJobs places a claim with the given SessionID to job rows that are
// available.
func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error {
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
rows, err := r.ex.Query(
ctx, "claim-jobs", txn, `
UPDATE system.jobs SET claim_session_id = $1, claim_instance_id = $2
WHERE claim_session_id IS NULL ORDER BY created DESC LIMIT $3 RETURNING id`,
s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop,
)
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
if log.ExpensiveLogEnabled(ctx, 1) || len(rows) > 0 {
log.Infof(ctx, "claimed %d jobs", len(rows))
}
return nil
})
}
// processClaimedJobs processes all jobs currently claimed by the registry.
func (r *Registry) processClaimedJobs(ctx context.Context, s sqlliveness.Session) error {
rows, err := r.ex.QueryEx(
ctx, "select-running/get-claimed-jobs", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUser}, `
SELECT id FROM system.jobs
WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4)`,
StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID(),
)
if err != nil {
return errors.Wrapf(err, "could query for claimed jobs")
}
// This map will eventually contain the job ids that must be resumed.
claimedToResume := make(map[int64]struct{}, len(rows))
// Initially all claimed jobs are supposed to be resumed but some may be
// running on this registry already so we will filter them out later.
for _, row := range rows {
id := int64(*row[0].(*tree.DInt))
claimedToResume[id] = struct{}{}
}
r.filterAlreadyRunningAndCancelFromPreviousSessions(ctx, s, claimedToResume)
r.resumeClaimedJobs(ctx, s, claimedToResume)
return nil
}
// resumeClaimedJobs invokes r.resumeJob for each job in claimedToResume. It
// does so concurrently.
func (r *Registry) resumeClaimedJobs(
ctx context.Context, s sqlliveness.Session, claimedToResume map[int64]struct{},
) {
const resumeConcurrency = 64
sem := make(chan struct{}, resumeConcurrency)
var wg sync.WaitGroup
add := func() { sem <- struct{}{}; wg.Add(1) }
done := func() { <-sem; wg.Done() }
for id := range claimedToResume {
add()
go func(id int64) {
defer done()
if err := r.resumeJob(ctx, id, s); err != nil && ctx.Err() == nil {
log.Errorf(ctx, "could not run claimed job %d: %v", id, err)
}
}(id)
}
wg.Wait()
}
// filterAlreadyRunningAndCancelFromPreviousSessions will lock the registry and
// inspect the set of currently running jobs, removing those entries from
// claimedToResume. Additionally it verifies that the session associated with the
// running job matches the current session, canceling the job if not.
func (r *Registry) filterAlreadyRunningAndCancelFromPreviousSessions(
ctx context.Context, s sqlliveness.Session, claimedToResume map[int64]struct{},
) {
r.mu.Lock()
defer r.mu.Unlock()
// Process all current adopted jobs in our in-memory jobs map.
for id, aj := range r.mu.adoptedJobs {
if aj.sid != s.ID() {
log.Warningf(ctx, "job %d: running without having a live claim; canceling", id)
aj.cancel()
delete(r.mu.adoptedJobs, id)
} else {
if _, ok := claimedToResume[id]; ok {
// job id is already running no need to resume it then.
delete(claimedToResume, id)
continue
}
}
}
}
// resumeJob resumes a claimed job.
func (r *Registry) resumeJob(ctx context.Context, jobID int64, s sqlliveness.Session) error {
log.Infof(ctx, "job %d: resuming execution", jobID)
row, err := r.ex.QueryRowEx(
ctx, "get-job-row", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUser}, `
SELECT status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)
FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,
jobID, s.ID().UnsafeBytes(),
)
if err != nil {
return errors.Wrapf(err, "job %d: could not query job table row", jobID)
}
if row == nil {
return errors.Errorf("job %d: claim with session id %s does not exist", jobID, s.ID())
}
status := Status(*row[0].(*tree.DString))
if status != StatusRunning && status != StatusReverting {
// A concurrent registry could have requested the job to be paused or canceled.
return errors.Errorf("job %d: status changed to %s which is not resumable`", jobID, status)
}
if isAlive := *row[3].(*tree.DBool); !isAlive {
return errors.Errorf("job %d: claim with session id %s has expired", jobID, s.ID())
}
payload, err := UnmarshalPayload(row[1])
if err != nil {
return err
}
// In version 20.1, the registry must not adopt 19.2-style schema change jobs
// until they've undergone a migration.
// TODO(lucy): Remove this in 20.2.
if deprecatedIsOldSchemaChangeJob(payload) {
log.VEventf(ctx, 2, "job %d: skipping adoption because schema change job has not been migrated", jobID)
return nil
}
progress, err := UnmarshalProgress(row[2])
if err != nil {
return err
}
job := &Job{id: &jobID, registry: r}
job.mu.payload = *payload
job.mu.progress = *progress
resumer, err := r.createResumer(job, r.settings)
if err != nil {
return err
}
resumeCtx, cancel := r.makeCtx()
resultsCh := make(chan tree.Datums)
errCh := make(chan error, 1)
aj := &adoptedJob{sid: s.ID(), cancel: cancel}
r.addAdoptedJob(jobID, aj)
if err := r.stopper.RunAsyncTask(ctx, job.taskName(), func(ctx context.Context) {
r.runJob(resumeCtx, resumer, resultsCh, errCh, job, status, job.taskName(), nil)
}); err != nil {
r.removeAdoptedJob(jobID)
return err
}
go func() {
// Drain and ignore results.
for range resultsCh {
}
}()
go func() {
// Wait for the job to finish. No need to print the error because if there
// was one it's been set in the job status already.
<-errCh
close(resultsCh)
}()
return nil
}
func (r *Registry) removeAdoptedJob(jobID int64) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.mu.adoptedJobs, jobID)
}
func (r *Registry) addAdoptedJob(jobID int64, aj *adoptedJob) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.adoptedJobs[jobID] = aj
}
func (r *Registry) runJob(
ctx context.Context,
resumer Resumer,
resultsCh chan<- tree.Datums,
errCh chan<- error,
job *Job,
status Status,
taskName string,
onDone func(),
) {
if onDone != nil {
defer onDone()
}
job.mu.Lock()
var finalResumeError error
if job.mu.payload.FinalResumeError != nil {
finalResumeError = errors.DecodeError(ctx, *job.mu.payload.FinalResumeError)
}
username := job.mu.payload.Username
typ := job.mu.payload.Type()
job.mu.Unlock()
// Bookkeeping.
phs, cleanup := r.planFn("resume-"+taskName, username)
defer cleanup()
spanName := fmt.Sprintf(`%s-%d`, typ, *job.ID())
var span opentracing.Span
ctx, span = r.ac.AnnotateCtxWithSpan(ctx, spanName)
defer span.Finish()
// Run the actual job.
err := r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, status, finalResumeError)
if err != nil {
// TODO (lucy): This needs to distinguish between assertion errors in
// the job registry and assertion errors in job execution returned from
// Resume() or OnFailOrCancel(), and only fail on the former. We have
// tests that purposely introduce bad state in order to produce
// assertion errors, which shouldn't cause the test to panic. For now,
// comment this out.
// if errors.HasAssertionFailure(err) {
// log.ReportOrPanic(ctx, nil, err.Error())
// }
log.Errorf(ctx, "job %d: adoption completed with error %v", *job.ID(), err)
}
r.unregister(*job.ID())
errCh <- err
}
func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqlliveness.Session) error {
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
rows, err := r.ex.QueryEx(
ctx, "cancel/pause-requested", txn, sessiondata.InternalExecutorOverride{User: security.NodeUser}, `
UPDATE system.jobs
SET status =
CASE
WHEN status = $1 THEN $2
WHEN status = $3 THEN $4
ELSE status
END
WHERE (status IN ($1, $3)) AND (claim_session_id = $5 AND claim_instance_id = $6)
RETURNING id, status`,
StatusPauseRequested, StatusPaused,
StatusCancelRequested, StatusReverting,
s.ID().UnsafeBytes(), r.ID(),
)
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
for _, row := range rows {
id := int64(*row[0].(*tree.DInt))
job := &Job{id: &id, registry: r}
switch Status(*row[1].(*tree.DString)) {
case StatusPaused:
r.unregister(id)
log.Infof(ctx, "job %d, session %s: paused", id, s.ID())
case StatusReverting:
if err := job.WithTxn(txn).Update(ctx, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
r.unregister(id)
md.Payload.Error = errJobCanceled.Error()
encodedErr := errors.EncodeError(ctx, errJobCanceled)
md.Payload.FinalResumeError = &encodedErr
ju.UpdatePayload(md.Payload)
return nil
}); err != nil {
return errors.Wrapf(err, "job %d: tried to cancel but could not mark as reverting: %s", id, err)
}
log.Infof(ctx, "job %d, session id: %s canceled: the job is now reverting",
id, s.ID())
default:
log.ReportOrPanic(ctx, nil, "unexpected job status")
}
}
return nil
})
}