From a83d40894ed2df3efc959a1f4f17ecac562679ec Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 3 Jun 2019 21:06:59 -0400 Subject: [PATCH] kv: detect if missing intent is due to intent resolution during parallel commit Fixes #37866. This commit implements the medium-term solution to #37866 proposed and modeled in #37900. The solution is to catch `IntentMissingErrors` in `DistSender`'s `divideAndSendParallelCommit` method coming from a parallel commit's pre-commit QueryIntent batch. When we see one of these errors, we immediately send a `QueryTxn` request to the transaction record. This will result in one of the four statuses: 1. PENDING: Unexpected because the parallel commit `EndTransactionRequest` succeeded. Ignore. 2. STAGING: Unambiguously not the issue from #37866. Ignore. 3. COMMITTED: Unambiguously the issue from #37866. Strip the error and return the updated proto. 4. ABORTED: Still ambiguous. Transform error into an AmbiguousCommitError and return. This solution isolates the ambiguity caused by the loss of information during intent resolution to just the case where the result of the QueryTxn is ABORTED. This is because an ABORTED record can mean either 1) the transaction was ABORTED and the missing intent was removed or 2) the transaction was COMMITTED, all intents were resolved, and the transaction record was GCed. Release note: None --- pkg/kv/dist_sender.go | 107 ++++++++++++++++++++++++++--- pkg/kv/dist_sender_test.go | 134 ++++++++++++++++++++++++++++++++++++- 2 files changed, 230 insertions(+), 11 deletions(-) diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index df41995339c5..e2b9d052db22 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -914,14 +914,31 @@ func (ds *DistSender) divideAndSendParallelCommit( } if qiPErr := qiReply.pErr; qiPErr != nil { // The batch with the pre-commit QueryIntent requests returned an error. - // Wrap this in a MixedSuccessError, as we know that the EndTransaction - // batch succeeded. It is not possible for qiPErr to be a MixedSuccessError - // itself, so we don't need to handle that case like we do down below. - qiPErr.UpdateTxn(ba.Txn) - maybeSwapErrorIndex(qiPErr, swapIdx, lastIdx) - pErr := roachpb.NewError(&roachpb.MixedSuccessError{Wrapped: qiPErr}) - pErr.Index = qiPErr.Index - return nil, pErr + ignoreMissing := false + if _, ok := qiPErr.GetDetail().(*roachpb.IntentMissingError); ok { + // If the error is an IntentMissingError, detect whether this is due + // to intent resolution and can be safely ignored. + ignoreMissing, err = ds.detectIntentMissingDueToIntentResolution(ctx, br.Txn) + if err != nil { + return nil, roachpb.NewError(err) + } + } + if !ignoreMissing { + // Wrap this in a MixedSuccessError, as we know that the EndTransaction + // batch succeeded. It is not possible for qiPErr to be a MixedSuccessError + // itself, so we don't need to handle that case like we do down below. + qiPErr.UpdateTxn(br.Txn) + maybeSwapErrorIndex(qiPErr, swapIdx, lastIdx) + pErr := roachpb.NewError(&roachpb.MixedSuccessError{Wrapped: qiPErr}) + pErr.Index = qiPErr.Index + return nil, pErr + } + // Populate the pre-commit QueryIntent batch response. If we made it + // here then we know we can ignore intent missing errors. + qiReply.reply = qiBa.CreateReply() + for _, ru := range qiReply.reply.Responses { + ru.GetQueryIntent().FoundIntent = true + } } // Both halves of the split batch succeeded. Piece them back together. @@ -935,6 +952,80 @@ func (ds *DistSender) divideAndSendParallelCommit( return br, nil } +// detectIntentMissingDueToIntentResolution attempts to detect whether a missing +// intent error thrown by a pre-commit QueryIntent request was due to intent +// resolution after the transaction was already finalized instead of due to a +// failure of the corresponding pipelined write. It is possible for these two +// situations to be confused because the pre-commit QueryIntent requests are +// issued in parallel with the staging EndTransaction request and may evaluate +// after the transaction becomes implicitly committed. If this happens and a +// concurrent transaction observes the implicit commit and makes the commit +// explicit, it is allowed to begin resolving the transactions intents. +// +// MVCC values don't remember their transaction once they have been resolved. +// This loss of information means that QueryIntent returns an intent missing +// error if it finds the resolved value that correspond to its desired intent. +// Because of this, the race discussed above can result in intent missing errors +// during a parallel commit even when the transaction successfully committed. +// +// This method queries the transaction record to determine whether an intent +// missing error was caused by this race or whether the intent missing error +// is real and guarantees that the transaction is not implicitly committed. +// +// See #37866 (issue) and #37900 (corresponding tla+ update). +func (ds *DistSender) detectIntentMissingDueToIntentResolution( + ctx context.Context, txn *roachpb.Transaction, +) (bool, error) { + ba := roachpb.BatchRequest{} + ba.Timestamp = ds.clock.Now() + ba.Add(&roachpb.QueryTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txn.TxnMeta.Key, + }, + Txn: txn.TxnMeta, + }) + log.VEvent(ctx, 1, "detecting whether missing intent is due to intent resolution") + br, pErr := ds.Send(ctx, ba) + if pErr != nil { + // We weren't able to determine whether the intent missing error is + // due to intent resolution or not, so it is still ambiguous whether + // the commit succeeded. + return false, roachpb.NewAmbiguousResultError(fmt.Sprintf("error=%s [intent missing]", pErr)) + } + respTxn := &br.Responses[0].GetQueryTxn().QueriedTxn + switch respTxn.Status { + case roachpb.COMMITTED: + // The transaction has already been finalized as committed. The missing + // intent error must have been a result of a concurrent transaction + // recovery finding the transaction in the implicit commit state and + // resolving one of its intents before the pre-commit QueryIntent + // queried that intent. We know that the transaction was committed + // successfully, so ignore the error. + return true, nil + case roachpb.ABORTED: + // The transaction has either already been finalized as aborted or has + // been finalized as committed and already had its transaction record + // GCed. We can't distinguish between these two conditions with full + // certainty, so we're forced to return an ambiguous commit error. + // TODO(nvanbenschoten): QueryTxn will materialize an ABORTED transaction + // record if one does not already exist. If we are certain that no actor + // will ever persist an ABORTED transaction record after a COMMIT record is + // GCed and we returned whether the record was synthesized in the QueryTxn + // response then we could use the existence of an ABORTED transaction record + // to further isolates the ambiguity caused by the loss of information + // during intent resolution. If this error becomes a problem, we can explore + // this option. + return false, roachpb.NewAmbiguousResultError("intent missing and record aborted") + default: + // The transaction has not been finalized yet, so the missing intent + // error must have been caused by a real missing intent. Propagate the + // missing intent error. + // NB: we don't expect the record to be PENDING at this point, but it's + // not worth making any hard assertions about what we get back here. + return false, nil + } +} + // maybeSwapErrorIndex swaps the error index from a to b or b to a if the // error's index is set and is equal to one of these to values. func maybeSwapErrorIndex(pErr *roachpb.Error, a, b int) { diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index d35b00c76ed2..b93d2d531c4d 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "reflect" + "regexp" "sort" "strconv" "sync/atomic" @@ -2191,7 +2192,8 @@ func TestMultiRangeWithEndTransaction(t *testing.T) { var testFn simpleSendFn = func( _ context.Context, _ SendOptions, - _ ReplicaSlice, ba roachpb.BatchRequest, + _ ReplicaSlice, + ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { var cur []roachpb.Method for _, union := range ba.Requests { @@ -2324,7 +2326,8 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) { var testFn simpleSendFn = func( _ context.Context, _ SendOptions, - _ ReplicaSlice, ba roachpb.BatchRequest, + _ ReplicaSlice, + ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { var cur []roachpb.Method for _, union := range ba.Requests { @@ -2346,7 +2349,7 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) { ds := NewDistSender(cfg, g) ds.DisableParallelBatches() - // Send a batch request containing two puts. + // Send a batch request containing the requests. var ba roachpb.BatchRequest ba.Txn = &roachpb.Transaction{Name: "test"} ba.Add(test.reqs...) @@ -2364,6 +2367,131 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) { } } +// TestParallelCommitsDetectIntentMissingCause tests the functionality in +// DistSender.detectIntentMissingDueToIntentResolution. +func TestParallelCommitsDetectIntentMissingCause(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + key := roachpb.Key("a") + txn := roachpb.MakeTransaction( + "test", key, roachpb.NormalUserPriority, + clock.Now(), clock.MaxOffset().Nanoseconds(), + ) + + testCases := []struct { + name string + queryTxnFn func() (roachpb.TransactionStatus, error) + expErr string + }{ + { + name: "transaction record PENDING, real intent missing error", + queryTxnFn: func() (roachpb.TransactionStatus, error) { + return roachpb.PENDING, nil + }, + expErr: "the batch experienced mixed success and failure: intent missing", + }, + { + name: "transaction record STAGING, real intent missing error", + queryTxnFn: func() (roachpb.TransactionStatus, error) { + return roachpb.STAGING, nil + }, + expErr: "the batch experienced mixed success and failure: intent missing", + }, + { + name: "transaction record COMMITTED, intent missing error caused by intent resolution", + queryTxnFn: func() (roachpb.TransactionStatus, error) { + return roachpb.COMMITTED, nil + }, + }, + { + name: "transaction record ABORTED, ambiguous intent missing error", + queryTxnFn: func() (roachpb.TransactionStatus, error) { + return roachpb.ABORTED, nil + }, + expErr: "result is ambiguous (intent missing and record aborted)", + }, + { + name: "QueryTxn error, unresolved ambiguity", + queryTxnFn: func() (roachpb.TransactionStatus, error) { + return 0, errors.New("unable to query txn") + }, + expErr: "result is ambiguous (error=unable to query txn [intent missing])", + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + var testFn simpleSendFn = func( + _ context.Context, + _ SendOptions, + _ ReplicaSlice, + ba roachpb.BatchRequest, + ) (*roachpb.BatchResponse, error) { + br := ba.CreateReply() + switch ba.Requests[0].GetInner().Method() { + case roachpb.QueryIntent: + br.Error = roachpb.NewError(roachpb.NewIntentMissingError(key, nil)) + case roachpb.QueryTxn: + status, err := test.queryTxnFn() + if err != nil { + br.Error = roachpb.NewError(err) + } else { + respTxn := txn + respTxn.Status = status + br.Responses[0].GetQueryTxn().QueriedTxn = respTxn + } + case roachpb.EndTransaction: + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.STAGING + } + return br, nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(testFn), + }, + RangeDescriptorDB: defaultMockRangeDescriptorDB, + } + ds := NewDistSender(cfg, g) + + // Send a parallel commit batch request. + var ba roachpb.BatchRequest + ba.Txn = txn.Clone() + ba.Add(&roachpb.QueryIntentRequest{ + RequestHeader: roachpb.RequestHeader{Key: key}, + Txn: txn.TxnMeta, + ErrorIfMissing: true, + }) + ba.Add(&roachpb.EndTransactionRequest{ + RequestHeader: roachpb.RequestHeader{Key: key}, + Commit: true, + InFlightWrites: []roachpb.SequencedWrite{{Key: key, Sequence: 1}}, + }) + + // Verify that the response is expected. + _, pErr := ds.Send(context.Background(), ba) + if test.expErr == "" { + if pErr != nil { + t.Fatalf("unexpected error %v", pErr) + } + } else { + if !testutils.IsPError(pErr, regexp.QuoteMeta(test.expErr)) { + t.Fatalf("expected error %q; found %v", test.expErr, pErr) + } + } + }) + } +} + func TestCountRanges(t *testing.T) { defer leaktest.AfterTest(t)() stopper := stop.NewStopper()