diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index df41995339c5..47e64254ef2b 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -914,14 +914,31 @@ func (ds *DistSender) divideAndSendParallelCommit( } if qiPErr := qiReply.pErr; qiPErr != nil { // The batch with the pre-commit QueryIntent requests returned an error. - // Wrap this in a MixedSuccessError, as we know that the EndTransaction - // batch succeeded. It is not possible for qiPErr to be a MixedSuccessError - // itself, so we don't need to handle that case like we do down below. - qiPErr.UpdateTxn(ba.Txn) - maybeSwapErrorIndex(qiPErr, swapIdx, lastIdx) - pErr := roachpb.NewError(&roachpb.MixedSuccessError{Wrapped: qiPErr}) - pErr.Index = qiPErr.Index - return nil, pErr + ignoreMissing := false + if _, ok := qiPErr.GetDetail().(*roachpb.IntentMissingError); ok && br.Txn != nil { + // If the error is an IntentMissingError, detect whether this is due + // to intent resolution and can be safely ignored. + ignoreMissing, err = ds.detectIntentMissingDueToIntentResolution(ctx, br.Txn) + if err != nil { + return nil, roachpb.NewError(err) + } + } + if !ignoreMissing { + // Wrap this in a MixedSuccessError, as we know that the EndTransaction + // batch succeeded. It is not possible for qiPErr to be a MixedSuccessError + // itself, so we don't need to handle that case like we do down below. + qiPErr.UpdateTxn(br.Txn) + maybeSwapErrorIndex(qiPErr, swapIdx, lastIdx) + pErr := roachpb.NewError(&roachpb.MixedSuccessError{Wrapped: qiPErr}) + pErr.Index = qiPErr.Index + return nil, pErr + } + // Populate the pre-commit QueryIntent batch response. If we made it + // here then we know we can ignore intent missing errors. + qiReply.reply = qiBa.CreateReply() + for _, ru := range qiReply.reply.Responses { + ru.GetQueryIntent().FoundIntent = true + } } // Both halves of the split batch succeeded. Piece them back together. @@ -935,6 +952,80 @@ func (ds *DistSender) divideAndSendParallelCommit( return br, nil } +// detectIntentMissingDueToIntentResolution attempts to detect whether a missing +// intent error thrown by a pre-commit QueryIntent request was due to intent +// resolution after the transaction was already finalized instead of due to a +// failure of the corresponding pipelined write. It is possible for these two +// situations to be confused because the pre-commit QueryIntent requests are +// issued in parallel with the staging EndTransaction request and may evaluate +// after the transaction becomes implicitly committed. If this happens and a +// concurrent transaction observes the implicit commit and makes the commit +// explicit, it is allowed to begin resolving the transactions intents. +// +// MVCC values don't remember their transaction once they have been resolved. +// This loss of information means that QueryIntent returns an intent missing +// error if it finds the resolved value that correspond to its desired intent. +// Because of this, the race discussed above can result in intent missing errors +// during a parallel commit even when the transaction successfully committed. +// +// This method queries the transaction record to determine whether an intent +// missing error was caused by this race or whether the intent missing error +// is real and guarantees that the transaction is not implicitly committed. +// +// See #37866 (issue) and #37900 (corresponding tla+ update). +func (ds *DistSender) detectIntentMissingDueToIntentResolution( + ctx context.Context, txn *roachpb.Transaction, +) (bool, error) { + ba := roachpb.BatchRequest{} + ba.Timestamp = ds.clock.Now() + ba.Add(&roachpb.QueryTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txn.TxnMeta.Key, + }, + Txn: txn.TxnMeta, + }) + log.VEvent(ctx, 1, "detecting whether missing intent is due to intent resolution") + br, pErr := ds.Send(ctx, ba) + if pErr != nil { + // We weren't able to determine whether the intent missing error is + // due to intent resolution or not, so it is still ambiguous whether + // the commit succeeded. + return false, roachpb.NewAmbiguousResultError(fmt.Sprintf("error=%s [intent missing]", pErr)) + } + respTxn := &br.Responses[0].GetQueryTxn().QueriedTxn + switch respTxn.Status { + case roachpb.COMMITTED: + // The transaction has already been finalized as committed. The missing + // intent error must have been a result of a concurrent transaction + // recovery finding the transaction in the implicit commit state and + // resolving one of its intents before the pre-commit QueryIntent + // queried that intent. We know that the transaction was committed + // successfully, so ignore the error. + return true, nil + case roachpb.ABORTED: + // The transaction has either already been finalized as aborted or has + // been finalized as committed and already had its transaction record + // GCed. We can't distinguish between these two conditions with full + // certainty, so we're forced to return an ambiguous commit error. + // TODO(nvanbenschoten): QueryTxn will materialize an ABORTED transaction + // record if one does not already exist. If we are certain that no actor + // will ever persist an ABORTED transaction record after a COMMIT record is + // GCed and we returned whether the record was synthesized in the QueryTxn + // response then we could use the existence of an ABORTED transaction record + // to further isolates the ambiguity caused by the loss of information + // during intent resolution. If this error becomes a problem, we can explore + // this option. + return false, roachpb.NewAmbiguousResultError("intent missing and record aborted") + default: + // The transaction has not been finalized yet, so the missing intent + // error must have been caused by a real missing intent. Propagate the + // missing intent error. + // NB: we don't expect the record to be PENDING at this point, but it's + // not worth making any hard assertions about what we get back here. + return false, nil + } +} + // maybeSwapErrorIndex swaps the error index from a to b or b to a if the // error's index is set and is equal to one of these to values. func maybeSwapErrorIndex(pErr *roachpb.Error, a, b int) { diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index d35b00c76ed2..b93d2d531c4d 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "reflect" + "regexp" "sort" "strconv" "sync/atomic" @@ -2191,7 +2192,8 @@ func TestMultiRangeWithEndTransaction(t *testing.T) { var testFn simpleSendFn = func( _ context.Context, _ SendOptions, - _ ReplicaSlice, ba roachpb.BatchRequest, + _ ReplicaSlice, + ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { var cur []roachpb.Method for _, union := range ba.Requests { @@ -2324,7 +2326,8 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) { var testFn simpleSendFn = func( _ context.Context, _ SendOptions, - _ ReplicaSlice, ba roachpb.BatchRequest, + _ ReplicaSlice, + ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { var cur []roachpb.Method for _, union := range ba.Requests { @@ -2346,7 +2349,7 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) { ds := NewDistSender(cfg, g) ds.DisableParallelBatches() - // Send a batch request containing two puts. + // Send a batch request containing the requests. var ba roachpb.BatchRequest ba.Txn = &roachpb.Transaction{Name: "test"} ba.Add(test.reqs...) @@ -2364,6 +2367,131 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) { } } +// TestParallelCommitsDetectIntentMissingCause tests the functionality in +// DistSender.detectIntentMissingDueToIntentResolution. +func TestParallelCommitsDetectIntentMissingCause(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + key := roachpb.Key("a") + txn := roachpb.MakeTransaction( + "test", key, roachpb.NormalUserPriority, + clock.Now(), clock.MaxOffset().Nanoseconds(), + ) + + testCases := []struct { + name string + queryTxnFn func() (roachpb.TransactionStatus, error) + expErr string + }{ + { + name: "transaction record PENDING, real intent missing error", + queryTxnFn: func() (roachpb.TransactionStatus, error) { + return roachpb.PENDING, nil + }, + expErr: "the batch experienced mixed success and failure: intent missing", + }, + { + name: "transaction record STAGING, real intent missing error", + queryTxnFn: func() (roachpb.TransactionStatus, error) { + return roachpb.STAGING, nil + }, + expErr: "the batch experienced mixed success and failure: intent missing", + }, + { + name: "transaction record COMMITTED, intent missing error caused by intent resolution", + queryTxnFn: func() (roachpb.TransactionStatus, error) { + return roachpb.COMMITTED, nil + }, + }, + { + name: "transaction record ABORTED, ambiguous intent missing error", + queryTxnFn: func() (roachpb.TransactionStatus, error) { + return roachpb.ABORTED, nil + }, + expErr: "result is ambiguous (intent missing and record aborted)", + }, + { + name: "QueryTxn error, unresolved ambiguity", + queryTxnFn: func() (roachpb.TransactionStatus, error) { + return 0, errors.New("unable to query txn") + }, + expErr: "result is ambiguous (error=unable to query txn [intent missing])", + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + var testFn simpleSendFn = func( + _ context.Context, + _ SendOptions, + _ ReplicaSlice, + ba roachpb.BatchRequest, + ) (*roachpb.BatchResponse, error) { + br := ba.CreateReply() + switch ba.Requests[0].GetInner().Method() { + case roachpb.QueryIntent: + br.Error = roachpb.NewError(roachpb.NewIntentMissingError(key, nil)) + case roachpb.QueryTxn: + status, err := test.queryTxnFn() + if err != nil { + br.Error = roachpb.NewError(err) + } else { + respTxn := txn + respTxn.Status = status + br.Responses[0].GetQueryTxn().QueriedTxn = respTxn + } + case roachpb.EndTransaction: + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.STAGING + } + return br, nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(testFn), + }, + RangeDescriptorDB: defaultMockRangeDescriptorDB, + } + ds := NewDistSender(cfg, g) + + // Send a parallel commit batch request. + var ba roachpb.BatchRequest + ba.Txn = txn.Clone() + ba.Add(&roachpb.QueryIntentRequest{ + RequestHeader: roachpb.RequestHeader{Key: key}, + Txn: txn.TxnMeta, + ErrorIfMissing: true, + }) + ba.Add(&roachpb.EndTransactionRequest{ + RequestHeader: roachpb.RequestHeader{Key: key}, + Commit: true, + InFlightWrites: []roachpb.SequencedWrite{{Key: key, Sequence: 1}}, + }) + + // Verify that the response is expected. + _, pErr := ds.Send(context.Background(), ba) + if test.expErr == "" { + if pErr != nil { + t.Fatalf("unexpected error %v", pErr) + } + } else { + if !testutils.IsPError(pErr, regexp.QuoteMeta(test.expErr)) { + t.Fatalf("expected error %q; found %v", test.expErr, pErr) + } + } + }) + } +} + func TestCountRanges(t *testing.T) { defer leaktest.AfterTest(t)() stopper := stop.NewStopper()