-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathtxn_interceptor_committer.go
443 lines (408 loc) · 19.4 KB
/
txn_interceptor_committer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package kv
import (
"context"
"sync"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
)
var parallelCommitsEnabled = settings.RegisterBoolSetting(
"kv.transaction.parallel_commits_enabled",
"if enabled, transactional commits will be parallelized with transactional writes",
true,
)
// txnCommitter is a txnInterceptor that concerns itself with committing and
// rolling back transactions. It intercepts EndTransaction requests and
// coordinates their execution. This is accomplished either by issuing them
// directly with proper addressing if they are alone, eliding them if they are
// not needed, or coordinating their execution in parallel with the rest of
// their batch if they are part of a larger set of requests.
//
// The third operation listed, which we define as a "parallel commit", is the
// most interesting. Marking a transaction record as committed in parallel with
// writing the rest of the transaction's intents is a clear win in terms of
// latency - in theory it removes the cost of an entire consensus round-trip
// from a transaction. However, doing so safely comes with extra complication.
// It requires an extension to the transaction model, additional client-side
// logic, buy-in from concurrency control, and specialized support from a
// transaction recovery mechanism. txnCommitter is responsible for the parallel
// commit-specific client-side logic.
//
// Parallel commits works by defining a committed transaction as a transaction
// that meets one of the two following commit conditions:
// 1. a transaction is *explicitly committed* if it has a transaction record with
// a COMMITTED status
// 2. a transaction is *implicitly committed* if it has a transaction record with
// a STAGING status and intents written for all writes declared as "in-flight"
// on the transaction record at equal or lower timestamps than the transaction
// record's commit timestamp
//
// A transaction may move from satisfying the implicit commit condition to
// satisfying the explicit commit condition. This is desirable because it moves
// the commit condition from a distributed condition to one local to the
// transaction record. Regardless, once either commit condition is satisfied, a
// transaction will remain committed in perpetuity both to itself and to all
// concurrent observers.
//
// The txnCommitter interceptor's role in this is to determine the set of writes
// that will be in-flight during a parallel commit. It collects this set from
// both the writes and the query intent requests that it finds present in the
// same batch as the committing end transaction request. The writes in this
// batch indicate a new intent write and the query intent requests indicate a
// previous pipelined intent write that has not yet been proven as successful.
// Before issuing the batch, the txnCommitter attaches this set to the end
// transaction request.
//
// The txnCommitter then collects the response of the batch when it returns.
// Based on the outcome of the requests in the batch, the interceptor determines
// whether the transaction successfully committed by satisfying the implicit
// commit condition.
//
// If all requests in the batch succeeded (including the EndTransaction request)
// then the implicit commit condition is satisfied. The interceptor returns a
// successful response up then stack and launches an async task to make the
// commit explicit by moving the transaction record's status from STAGING to
// COMMITTED.
//
// If all requests did not succeed then the implicit commit condition is not
// satisfied and the transaction is still in-progress (and could still be
// committed or aborted at a later time). There are a number of reasons why
// some of the requests in the final batch may have failed:
// - intent writes: these requests may fail to write an intent due to a logical
// error like a ConditionFailedError. They also could have succeeded at writing
// an intent but failed to write it at the desired timestamp because they ran
// into the timestamp cache or another committed value. In the first case, the
// txnCommitter will receive an error. In the second, it will generate one in
// needTxnRetryAfterStaging.
// - query intents: these requests may fail because they discover that one of the
// previously issued writes has failed; either because it never left an intent
// or because it left one at too high of a timestamp. In this case, the request
// will return an error because the requests all have the ErrorIfMissing option
// set. It will also prevent the write from ever succeeding in the future, which
// ensures that the transaction will never suddenly become implicitly committed
// at a later point due to the write eventually succeeding (e.g. after a replay).
// - end txn: this request may fail with a TransactionRetryError for any number of
// reasons, such as if the transaction's provisional commit timestamp has been
// pushed past its read timestamp. In all of these cases, an error will be
// returned and the transaction record will not be staged.
//
// If it is unknown whether all of the requests in the final batch succeeded
// (e.g. due to a network error) then an AmbiguousResultError is returned. The
// logic to enforce this is in DistSender.
// TODO(nvanbenschoten): merge this logic.
//
// In all cases, the interceptor abstracts away the details of this from all
// interceptors above it in the coordinator interceptor stack.
type txnCommitter struct {
st *cluster.Settings
stopper *stop.Stopper
wrapped lockedSender
mu sync.Locker
}
// SendLocked implements the lockedSender interface.
func (tc *txnCommitter) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// If the batch does not include an EndTransaction request, pass it through.
rArgs, hasET := ba.GetArg(roachpb.EndTransaction)
if !hasET {
return tc.wrapped.SendLocked(ctx, ba)
}
et := rArgs.(*roachpb.EndTransactionRequest)
// Determine whether we can elide the EndTransaction entirely. We can do
// so if the transaction is read-only, which we determine based on whether
// the EndTransaction request contains any writes.
if len(et.IntentSpans) == 0 && len(et.InFlightWrites) == 0 {
return tc.sendLockedWithElidedEndTransaction(ctx, ba, et)
}
// Assign the transaction's key to the Request's header if it isn't already
// set. This is the only place where EndTransactionRequest.Key is assigned,
// but we could be dealing with a re-issued batch after a refresh. Remember,
// the committer is below the span refresh on the interceptor stack.
if et.Key == nil {
et.Key = ba.Txn.Key
}
// Determine whether the commit can be run in parallel with the rest of the
// writes in the batch. If not, move the in-flight writes currently attached
// to the EndTransaction request to the IntentSpans and clear the in-flight
// write set; no writes will be in-flight concurrently with the EndTransaction
// request.
if len(et.InFlightWrites) > 0 && !tc.canCommitInParallelWithWrites(ba, et) {
// NB: when parallel commits is disabled, this is the best place to
// detect whether the batch has only distinct spans. We can set this
// flag based on whether any of previously declared in-flight writes
// in this batch overlap with each other. This will have (rare) false
// negatives when the in-flight writes overlap with existing intent
// spans, but never false positives.
et.IntentSpans, ba.Header.DistinctSpans = mergeIntoSpans(et.IntentSpans, et.InFlightWrites)
// Disable parallel commits.
et.InFlightWrites = nil
}
// If the EndTransaction request is a rollback, pass it through.
if !et.Commit {
return tc.wrapped.SendLocked(ctx, ba)
}
// Send the adjusted batch through the wrapped lockedSender. Unlocks while
// sending then re-locks.
br, pErr := tc.wrapped.SendLocked(ctx, ba)
if pErr != nil {
// If the batch resulted in an error but the EndTransaction request
// succeeded, staging the transaction record in the process, downgrade
// the status back to PENDING. Even though the transaction record may
// have a status of STAGING, we know that the transaction failed to
// implicitly commit, so interceptors above the txnCommitter in the
// stack don't need to be made aware that the record is staging.
if txn := pErr.GetTxn(); txn != nil && txn.Status == roachpb.STAGING {
pErr.SetTxn(cloneWithStatus(txn, roachpb.PENDING))
}
// Same deal with MixedSuccessErrors.
// TODO(nvanbenschoten): We can remove this once MixedSuccessErrors
// are removed.
if aPSErr, ok := pErr.GetDetail().(*roachpb.MixedSuccessError); ok {
if txn := aPSErr.Wrapped.GetTxn(); txn != nil && txn.Status == roachpb.STAGING {
aPSErr.Wrapped.SetTxn(cloneWithStatus(txn, roachpb.PENDING))
}
}
return nil, pErr
}
// Determine next steps based on the status of the transaction.
switch br.Txn.Status {
case roachpb.STAGING:
// Continue with STAGING-specific validation and cleanup.
case roachpb.COMMITTED:
// The transaction is explicitly committed. This is possible if all
// in-flight writes were sent to the same range as the EndTransaction
// request, in a single batch. In this case, a range can determine that
// all in-flight writes will succeed with the EndTransaction and can
// decide to skip the STAGING state.
//
// This is also possible if we never attached any in-flight writes to the
// EndTransaction request, either because canCommitInParallelWithWrites
// returned false or because there were no unproven in-flight writes
// (see txnPipeliner) and there were no writes in the batch request.
return br, nil
default:
return nil, roachpb.NewErrorf("unexpected response status without error: %v", br.Txn)
}
// Determine whether the transaction needs to either retry or refresh. When
// the EndTransaction request evaluated while STAGING the transaction
// record, it performed this check. However, the transaction proto may have
// changed due to writes evaluated concurrently with the EndTransaction even
// if none of those writes returned an error. Remember that the transaction
// proto we see here could be a combination of protos from responses, all
// merged by DistSender.
if pErr := needTxnRetryAfterStaging(br); pErr != nil {
return nil, pErr
}
// If the transaction doesn't need to retry then it is implicitly committed!
// We're the only ones who know that though -- other concurrent transactions
// will need to go through the full status resolution process to make a
// determination about the status of our STAGING transaction. To avoid this,
// we transition to an explicitly committed transaction as soon as possible.
// This also has the side-effect of kicking off intent resolution.
mergedIntentSpans, _ := mergeIntoSpans(et.IntentSpans, et.InFlightWrites)
tc.makeTxnCommitExplicitAsync(ctx, br.Txn, mergedIntentSpans)
// Switch the status on the batch response's transaction to COMMITTED. No
// interceptor above this one in the stack should ever need to deal with
// transaction proto in the STAGING state.
br.Txn = cloneWithStatus(br.Txn, roachpb.COMMITTED)
return br, nil
}
// sendLockedWithElidedEndTransaction sends the provided batch without its
// EndTransaction request. However, if the EndTransaction request is alone in
// the batch, nothing will be sent at all. Either way, the result of the
// EndTransaction will be synthesized and returned in the batch response.
//
// The method is used for read-only transactions, which never need to write a
// transaction record.
func (tc *txnCommitter) sendLockedWithElidedEndTransaction(
ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTransactionRequest,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
// Send the batch without its final request, which we know to be the
// EndTransaction request that we're eliding. If this would result in us
// sending an empty batch, mock out a reply instead of sending anything.
ba.Requests = ba.Requests[:len(ba.Requests)-1]
if len(ba.Requests) > 0 {
br, pErr = tc.wrapped.SendLocked(ctx, ba)
if pErr != nil {
return nil, pErr
}
} else {
br = &roachpb.BatchResponse{}
// NB: there's no need to clone the txn proto here because we already
// call cloneWithStatus below.
br.Txn = ba.Txn
}
// Check if the (read-only) txn was pushed above its deadline.
deadline := et.Deadline
if deadline != nil && !br.Txn.Timestamp.Less(*deadline) {
return nil, generateTxnDeadlineExceededErr(ba.Txn, *deadline)
}
// Update the response's transaction proto. This normally happens on the
// server and is sent back in response headers, but in this case the
// EndTransaction request was optimized away. The caller may still inspect
// the transaction struct, so we manually update it here to emulate a true
// transaction.
status := roachpb.ABORTED
if et.Commit {
status = roachpb.COMMITTED
}
br.Txn = cloneWithStatus(br.Txn, status)
// Synthesize and append an EndTransaction response.
resp := &roachpb.EndTransactionResponse{}
resp.Txn = br.Txn
br.Add(resp)
return br, nil
}
// canCommitInParallelWithWrites determines whether the batch can issue its
// committing EndTransaction in parallel with other in-flight writes.
func (tc *txnCommitter) canCommitInParallelWithWrites(
ba roachpb.BatchRequest, et *roachpb.EndTransactionRequest,
) bool {
if !tc.st.Version.IsActive(cluster.VersionParallelCommits) {
return false
}
if !parallelCommitsEnabled.Get(&tc.st.SV) {
return false
}
// We're trying to parallel commit, not parallel abort.
if !et.Commit {
return false
}
// If the transaction has a commit trigger, we don't allow it to commit in
// parallel with writes. There's no fundamental reason for this restriction,
// but for now it's not worth the complication.
if et.InternalCommitTrigger != nil {
return false
}
// Similar to how we can't pipeline ranged writes, we also can't commit in
// parallel with them. The reason for this is that the status resolution
// process for STAGING transactions wouldn't know where to look for the
// intents.
for _, ru := range ba.Requests[:len(ba.Requests)-1] {
req := ru.GetInner()
if roachpb.IsTransactionWrite(req) && roachpb.IsRange(req) {
return false
}
}
return true
}
// mergeIntoSpans merges all provided sequenced writes into the span slice. It
// then sorts the spans and merges an that overlap. The function does not mutate
// the provided span slice. Returns true iff all of the spans are distinct.
func mergeIntoSpans(s []roachpb.Span, ws []roachpb.SequencedWrite) ([]roachpb.Span, bool) {
m := make([]roachpb.Span, len(s)+len(ws))
copy(m, s)
for i, w := range ws {
m[len(s)+i] = roachpb.Span{Key: w.Key}
}
return roachpb.MergeSpans(m)
}
// needTxnRetryAfterStaging determines whether the transaction needs to refresh
// (see txnSpanRefresher) or retry based on the batch response of a parallel
// commit attempt.
func needTxnRetryAfterStaging(br *roachpb.BatchResponse) *roachpb.Error {
if len(br.Responses) == 0 {
return roachpb.NewErrorf("no responses in BatchResponse: %v", br)
}
lastResp := br.Responses[len(br.Responses)-1].GetInner()
etResp, ok := lastResp.(*roachpb.EndTransactionResponse)
if !ok {
return roachpb.NewErrorf("unexpected response in BatchResponse: %v", lastResp)
}
if etResp.StagingTimestamp.IsEmpty() {
return roachpb.NewErrorf("empty StagingTimestamp in EndTransactionResponse: %v", etResp)
}
if etResp.StagingTimestamp.Less(br.Txn.Timestamp) {
// If the timestamp that the transaction record was staged at
// is less than the timestamp of the transaction in the batch
// response then one of the concurrent writes was pushed to
// a higher timestamp. This violates the "implicit commit"
// condition and neither the transaction coordinator nor any
// other concurrent actor will consider this transaction to
// be committed as is.
err := roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "" /* extraMsg */)
txn := cloneWithStatus(br.Txn, roachpb.PENDING)
return roachpb.NewErrorWithTxn(err, txn)
}
return nil
}
// makeTxnCommitExplicitAsync launches an async task that attempts to move the
// transaction from implicitly committed (STAGING status with all intents
// written) to explicitly committed (COMMITTED status). It does so by sending a
// second EndTransactionRequest, this time with no InFlightWrites attached.
func (tc *txnCommitter) makeTxnCommitExplicitAsync(
ctx context.Context, txn *roachpb.Transaction, intentSpans []roachpb.Span,
) {
// TODO(nvanbenschoten): consider adding tracing for this request.
// TODO(nvanbenschoten): add a timeout to this request.
// TODO(nvanbenschoten): consider making this semi-synchronous to
// backpressure client writes when these start to slow down. This
// would be similar to what we do for intent resolution.
log.VEventf(ctx, 2, "making txn commit explicit: %s", txn)
if err := tc.stopper.RunAsyncTask(
context.Background(), "txnCommitter: making txn commit explicit", func(ctx context.Context) {
tc.mu.Lock()
defer tc.mu.Unlock()
if err := makeTxnCommitExplicitLocked(ctx, tc.wrapped, txn, intentSpans); err != nil {
log.VErrEventf(ctx, 1, "making txn commit explicit failed for %s: %v", txn, err)
}
},
); err != nil {
log.VErrEventf(ctx, 1, "failed to make txn commit explicit: %v", err)
}
}
func makeTxnCommitExplicitLocked(
ctx context.Context, s lockedSender, txn *roachpb.Transaction, intentSpans []roachpb.Span,
) error {
// Clone the txn to prevent data races.
txn = txn.Clone()
// Construct a new batch with just an EndTransaction request.
ba := roachpb.BatchRequest{}
ba.Header = roachpb.Header{Txn: txn}
et := roachpb.EndTransactionRequest{Commit: true}
et.Key = txn.Key
et.IntentSpans = intentSpans
ba.Add(&et)
_, pErr := s.SendLocked(ctx, ba)
if pErr != nil {
// Detect whether the error indicates that someone else beat
// us to explicitly committing the transaction record.
tse, ok := pErr.GetDetail().(*roachpb.TransactionStatusError)
if ok && tse.Reason == roachpb.TransactionStatusError_REASON_TXN_COMMITTED {
return nil
}
return pErr.GoError()
}
return nil
}
// setWrapped implements the txnInterceptor interface.
func (tc *txnCommitter) setWrapped(wrapped lockedSender) { tc.wrapped = wrapped }
// populateMetaLocked implements the txnReqInterceptor interface.
func (tc *txnCommitter) populateMetaLocked(meta *roachpb.TxnCoordMeta) {}
// augmentMetaLocked implements the txnReqInterceptor interface.
func (tc *txnCommitter) augmentMetaLocked(meta roachpb.TxnCoordMeta) {}
// epochBumpedLocked implements the txnReqInterceptor interface.
func (tc *txnCommitter) epochBumpedLocked() {}
// closeLocked implements the txnReqInterceptor interface.
func (tc *txnCommitter) closeLocked() {}
func cloneWithStatus(txn *roachpb.Transaction, s roachpb.TransactionStatus) *roachpb.Transaction {
clone := txn.Clone()
clone.Status = s
return clone
}