From 77640e5c5094fb4c80ad964b835714a8b9dcebad Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 7 Mar 2023 18:32:07 -0500 Subject: [PATCH] kv: don't error on pushed intent during QueryIntent, increase write timestamp Fixes #95225. This commit updates QueryIntent's behavior when `ErrorIfMissing` to only return an error if no matching intent is found. Notably, the request no longer returns an error if a matching intent is found, but has been pushed. Previously, this case would result in a `RETRY_SERIALIZABLE` ("intent pushed") error, causing the querying transaction to immediately refresh and retry. At the time, we thought that this behavior was "an optimization" because it allowed a transaction to notice that it had been pushed and may abort earlier, instead of waiting until commit time. In practice, this does not seem to be a meaningful improvement. Meanwhile, it makes any solution to #95227 more difficult because it allows a stream of high-priority reading transaction to starve a low-priority writing transaction before that writing transaction even reaches its commit phase. We'd like to solve these starvation cases through some form of commit-time coordination (either blocking pushers temporarily or refreshing into the future). This change helps unlock those solutions. Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 1 + .../kvcoord/txn_interceptor_pipeliner.go | 13 ++- .../kvcoord/txn_interceptor_pipeliner_test.go | 9 +- pkg/kv/kvpb/api.proto | 38 ++++--- pkg/kv/kvserver/batcheval/cmd_query_intent.go | 32 +++--- .../batcheval/cmd_query_intent_test.go | 98 ++++++++++++------- pkg/kv/kvserver/replica_test.go | 41 ++++---- pkg/kv/kvserver/replica_tscache.go | 18 +--- .../kvserver/txn_recovery_integration_test.go | 4 +- pkg/kv/kvserver/txnrecovery/manager.go | 2 +- pkg/kv/kvserver/txnrecovery/manager_test.go | 8 +- 11 files changed, 148 insertions(+), 116 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index bfb43da94c55..72f5c18beb94 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1093,6 +1093,7 @@ func (ds *DistSender) divideAndSendParallelCommit( qiReply.reply = qiBa.CreateReply() for _, ru := range qiReply.reply.Responses { ru.GetQueryIntent().FoundIntent = true + ru.GetQueryIntent().FoundUnpushedIntent = true } } diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index bec4f0c868ff..c4e5bdc94b29 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -702,12 +702,19 @@ func (tp *txnPipeliner) updateLockTrackingInner( // Remove any in-flight writes that were proven to exist. // It shouldn't be possible for a QueryIntentRequest with // the ErrorIfMissing option set to return without error - // and with with FoundIntent=false, but we handle that - // case here because it happens a lot in tests. - if resp.(*kvpb.QueryIntentResponse).FoundIntent { + // and with FoundIntent=false, but we handle that case here + // because it happens a lot in tests. + // TODO(nvanbenschoten): we only need to check FoundIntent, but this field + // was not set before v23.2, so for now, we check both fields. Remove this + // in the future. + qiResp := resp.(*kvpb.QueryIntentResponse) + if qiResp.FoundIntent || qiResp.FoundUnpushedIntent { tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence) // Move to lock footprint. tp.lockFootprint.insert(roachpb.Span{Key: qiReq.Key}) + } else { + log.Warningf(ctx, + "QueryIntent(ErrorIfMissing=true) found no intent, but did not error; resp=%+v", qiResp) } } else if kvpb.IsLocking(req) { // If the request intended to acquire locks, track its lock spans. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index 2d2cbc4dcd60..1eb55e519b56 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -308,9 +308,16 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { br = ba.CreateReply() br.Txn = ba.Txn br.Txn.Status = roachpb.COMMITTED - br.Responses[1].GetQueryIntent().FoundIntent = true + // NOTE: expected response from a v23.1 node. + // TODO(nvanbenschoten): update this case when v23.1 compatibility is no + // longer required. + br.Responses[2].GetQueryIntent().FoundIntent = false + br.Responses[1].GetQueryIntent().FoundUnpushedIntent = true + // NOTE: expected responses from a v23.2 node. br.Responses[2].GetQueryIntent().FoundIntent = true + br.Responses[2].GetQueryIntent().FoundUnpushedIntent = true br.Responses[3].GetQueryIntent().FoundIntent = true + br.Responses[2].GetQueryIntent().FoundUnpushedIntent = false return br, nil }) diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 51c2588d8794..f502eca34249 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -1290,20 +1290,22 @@ message QueryTxnResponse { // A QueryIntentRequest is arguments to the QueryIntent() method. It visits // the specified key and checks whether an intent is present for the given -// transaction. If the intent is found to be missing then it is prevented -// from ever being written in the future. +// transaction. If the intent is found, it checks whether the intent has a +// timestamp at or below the given transaction's timestamp to determine full +// vs. partial matches. If, on the other hand, the intent is found to be +// missing then it is prevented from ever being written in the future. message QueryIntentRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; // The TxnMeta that the intent is expected to have. Specifically, whether an // intent is a match or not is defined as whether an intent exists that could // be committed by the provided transaction. If an intent is found at the - // specified key, the intent is only considered a match if it has the same ID, - // the same epoch, and a write timestamp that is equal to or less than that in - // the provided transaction. + // specified key, the intent is only considered a "full match" if it has the + // same ID, the same epoch, and a write timestamp that is equal to or less + // than that in the provided transaction. // - // Additionally, the intent is only considered a match if its sequence number - // is equal to or greater than the expected txn's sequence number. The + // Additionally, the intent is only considered a "full match" if its sequence + // number is equal to or greater than the expected txn's sequence number. The // requests doesn't require an exact sequence number match because the // transaction could have performed overlapping writes, in which case only the // latest sequence number will remain. We assume that if a transaction has @@ -1311,6 +1313,13 @@ message QueryIntentRequest { // have succeeded in writing an intent at the smaller sequence number as // well. // + // The intent is considered to be "pushed" and a "partial match" if it + // satisfies all conditions above except the timestamp condition. In these + // cases, the intent can not be committed by the provided transaction at its + // current provisional commit timestamp, but it could be committed by that + // transaction at a later commit timestamp. The intent's value can also be + // read by the provided transaction, even though it has been pushed. + // // QueryIntentRequests may be issued in non-transactional BatchRequests or in // transactional BatchRequests. If issued inside of a transaction, the TxnMeta // must be a reference to the same transaction as the batch's transaction, or @@ -1323,17 +1332,22 @@ message QueryIntentRequest { // own intent after having successfully refreshed. storage.enginepb.TxnMeta txn = 2 [(gogoproto.nullable) = false]; - // If true, return an IntentMissingError if a matching intent is not found. - // Special-cased to return a SERIALIZABLE retry error if a SERIALIZABLE - // transaction queries its own intent and finds it has been pushed. + // If true, return an IntentMissingError if no matching intent (neither a + // "partial match" nor a "full match") is found. bool error_if_missing = 3; } // A QueryIntentResponse is the return value from the QueryIntent() method. message QueryIntentResponse { ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; - // Whether an intent matching the expected transaction was found at the key. - bool found_intent = 2; + // Whether an intent matching the expected transaction was found at the key + // with a timestamp that is equal to or less than that in the provided + // transaction - a "full match" as defined above. + bool found_unpushed_intent = 2; + // Whether an intent matching the expected transaction was found at the key, + // regardless of the intent's timestamp — a "partial match" as defined above. + // found_unpushed_intent implies found_intent. + bool found_intent = 3; } // A QueryLocksRequest is arguments to the QueryLocks() method. It requests diff --git a/pkg/kv/kvserver/batcheval/cmd_query_intent.go b/pkg/kv/kvserver/batcheval/cmd_query_intent.go index f9ff04f82057..2da4994ac937 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_intent.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_intent.go @@ -45,9 +45,7 @@ func declareKeysQueryIntent( // happens during the timestamp cache update). // // QueryIntent returns an error if the intent is missing and its ErrorIfMissing -// field is set to true. This error is typically an IntentMissingError, but the -// request is special-cased to return a SERIALIZABLE retry error if a transaction -// queries its own intent and finds it has been pushed. +// field is set to true. func QueryIntent( ctx context.Context, reader storage.Reader, cArgs CommandArgs, resp kvpb.Response, ) (result.Result, error) { @@ -81,8 +79,8 @@ func QueryIntent( return result.Result{}, err } - var curIntentPushed bool - + reply.FoundIntent = false + reply.FoundUnpushedIntent = false if intent != nil { // See comment on QueryIntentRequest.Txn for an explanation of this // comparison. @@ -93,12 +91,12 @@ func QueryIntent( (args.Txn.Epoch == intent.Txn.Epoch) && (args.Txn.Sequence <= intent.Txn.Sequence) - // If we found a matching intent, check whether the intent was pushed - // past its expected timestamp. if !reply.FoundIntent { log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v", args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence) } else { + // If we found a matching intent, check whether the intent was pushed past + // its expected timestamp. cmpTS := args.Txn.WriteTimestamp if ownTxn { // If the request is querying an intent for its own transaction, forward @@ -106,31 +104,25 @@ func QueryIntent( // in the batch header. cmpTS.Forward(h.Txn.WriteTimestamp) } - if cmpTS.Less(intent.Txn.WriteTimestamp) { - // The intent matched but was pushed to a later timestamp. Consider a - // pushed intent a missing intent. - curIntentPushed = true - log.VEventf(ctx, 2, "found pushed intent") - reply.FoundIntent = false + reply.FoundUnpushedIntent = intent.Txn.WriteTimestamp.LessEq(cmpTS) + if !reply.FoundUnpushedIntent { + log.VEventf(ctx, 2, "found pushed intent") // If the request was querying an intent in its own transaction, update // the response transaction. + // TODO(nvanbenschoten): if this is necessary for correctness, say so. + // And then add a test to demonstrate that. if ownTxn { reply.Txn = h.Txn.Clone() reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp) } } } + } else { + log.VEventf(ctx, 2, "found no intent") } if !reply.FoundIntent && args.ErrorIfMissing { - if ownTxn && curIntentPushed { - // If the transaction's own intent was pushed, go ahead and return a - // TransactionRetryError immediately with an updated transaction proto. - // This is an optimization that can help the txn use refresh spans more - // effectively. - return result.Result{}, kvpb.NewTransactionRetryError(kvpb.RETRY_SERIALIZABLE, "intent pushed") - } return result.Result{}, kvpb.NewIntentMissingError(args.Key, intent) } return result.Result{}, nil diff --git a/pkg/kv/kvserver/batcheval/cmd_query_intent_test.go b/pkg/kv/kvserver/batcheval/cmd_query_intent_test.go index e9087cb7bb8c..4161f060de6a 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_intent_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_intent_test.go @@ -59,17 +59,22 @@ func TestQueryIntent(t *testing.T) { clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Unix(0, 10))) evalCtx := &MockEvalCtx{ClusterSettings: st, Clock: clock} - // Since we can't move the intents clock after they are written, created - // cloned transactions with the clock shifted instead. + // Since we can't move the intents timestamp after they are written, created + // cloned transactions with the timestamp shifted instead. txABack := *txA.Clone() txAForward := *txA.Clone() txABack.WriteTimestamp = txABack.WriteTimestamp.Add(-2, 0) txAForward.WriteTimestamp = txAForward.WriteTimestamp.Add(20, 0) - type Success struct{} - type NotFound struct{} - success := Success{} - notFound := NotFound{} + type response int + const ( + _ response = iota + expAssertionError + expIntentMissingError + expNotFound + expFoundIntent + expFoundUnpushedIntent + ) tests := []struct { name string @@ -77,41 +82,39 @@ func TestQueryIntent(t *testing.T) { argTransaction roachpb.Transaction key roachpb.Key errorFlag bool - response interface{} + resp response }{ // Perform standard reading of all three keys. - {"readA", txA, txA, keyA, true, success}, - {"readAA", txAA, txAA, keyAA, true, success}, - {"readB", txB, txB, keyB, true, success}, - - {"readC", txA, txA, keyC, true, &kvpb.IntentMissingError{}}, + {"readA", txA, txA, keyA, true, expFoundUnpushedIntent}, + {"readAA", txAA, txAA, keyAA, true, expFoundUnpushedIntent}, + {"readB", txB, txB, keyB, true, expFoundUnpushedIntent}, + {"readC", txA, txA, keyC, true, expIntentMissingError}, // This tries reading a different key than this tx was written with. The // returned error depends on the error flag setting. - {"wrongTxE", txA, txA, keyB, true, &kvpb.IntentMissingError{}}, - {"wrongTx", txA, txA, keyB, false, notFound}, + {"wrongTxErr", txA, txA, keyB, true, expIntentMissingError}, + {"wrongTx", txA, txA, keyB, false, expNotFound}, // This sets a mismatch for transactions in the header and the body. An // error is returned regardless of the errorFlag. - {"mismatchTxE", txA, txB, keyA, true, errors.AssertionFailedf("")}, - {"mismatchTx", txA, txB, keyA, false, errors.AssertionFailedf("")}, + {"mismatchTxErr", txA, txB, keyA, true, expAssertionError}, + {"mismatchTx", txA, txB, keyA, false, expAssertionError}, - // This simulates pushed intents by moving the tx clock backwards in time. - // An error is only returned if the error flag is set. - {"clockBackE", txABack, txABack, keyA, true, &kvpb.TransactionRetryError{}}, - {"clockBack", txABack, txABack, keyA, false, notFound}, + // This simulates pushed intents by moving the tx timestamp backwards in time. + // An error is not returned, regardless of the error flag. + {"timestampBackErr", txABack, txABack, keyA, true, expFoundIntent}, + {"timestampBack", txABack, txABack, keyA, false, expFoundIntent}, - // This simulates pushed transactions by moving the tx clock forward in time. - {"clockFwd", txAForward, txAForward, keyA, true, success}, + // This simulates pushed transactions by moving the tx timestamp forward in time. + // In two of the cases, the header timestamp leads the argument's timestamp. + {"timestampFwd", txAForward, txAForward, keyA, true, expFoundUnpushedIntent}, + {"timestampFwdHeaderAhead", txAForward, txA, keyA, true, expFoundUnpushedIntent}, + {"timestampEqualHeaderAhead", txA, txABack, keyA, true, expFoundUnpushedIntent}, // This simulates a mismatch in the header and arg write timestamps. This is // always an error regardless of flag. - {"mismatchTxClockE", txA, txAForward, keyA, true, errors.AssertionFailedf("")}, - {"mismatchTxClock", txA, txAForward, keyA, false, errors.AssertionFailedf("")}, - - // It is OK if the time on the arg transaction is moved backwards, its - // unclear if this happens in practice. - {"mismatchTxClock", txA, txABack, keyA, true, success}, + {"headerBehindErr", txA, txAForward, keyA, true, expAssertionError}, + {"headerBehind", txA, txAForward, keyA, false, expAssertionError}, } for _, test := range tests { @@ -127,16 +130,39 @@ func TestQueryIntent(t *testing.T) { cArgs.EvalCtx = evalCtx.EvalContext() var resp kvpb.QueryIntentResponse _, err := QueryIntent(ctx, db, cArgs, &resp) - switch test.response { - case success: - require.NoError(t, err) - require.True(t, resp.FoundIntent) - case notFound: - require.NoError(t, err) + switch test.resp { + case expAssertionError: + require.NotNil(t, err) + require.IsType(t, errors.AssertionFailedf(""), err) require.False(t, resp.FoundIntent) - default: - require.IsType(t, test.response, err, "received %v", err) + require.False(t, resp.FoundUnpushedIntent) + require.Nil(t, resp.Txn) + case expIntentMissingError: + require.NotNil(t, err) + require.IsType(t, &kvpb.IntentMissingError{}, err) require.False(t, resp.FoundIntent) + require.False(t, resp.FoundUnpushedIntent) + require.Nil(t, resp.Txn) + case expNotFound: + require.Nil(t, err) + require.False(t, resp.FoundIntent) + require.False(t, resp.FoundUnpushedIntent) + require.Nil(t, resp.Txn) + case expFoundIntent: + require.Nil(t, err) + require.True(t, resp.FoundIntent) + require.False(t, resp.FoundUnpushedIntent) + // If the intent was found but was pushed, the response also carries the + // updated write timestamp. + require.NotNil(t, resp.Txn) + require.True(t, test.hTransaction.WriteTimestamp.Less(resp.Txn.WriteTimestamp)) + case expFoundUnpushedIntent: + require.Nil(t, err) + require.True(t, resp.FoundIntent) + require.True(t, resp.FoundUnpushedIntent) + require.Nil(t, resp.Txn) + default: + t.Fatalf("unexpected response: %v", test.resp) } }) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 2e06a59b6591..bafb85649c70 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -6130,7 +6130,8 @@ func TestQueryIntentRequest(t *testing.T) { key []byte, txnMeta enginepb.TxnMeta, baTxn *roachpb.Transaction, - expectIntent bool, + expectMatchingTxn bool, + expectMatchingTxnAndTimestamp bool, ) { t.Helper() var h kvpb.Header @@ -6140,8 +6141,8 @@ func TestQueryIntentRequest(t *testing.T) { h.Timestamp = txnMeta.WriteTimestamp } qiArgs := queryIntentArgs(key, txnMeta, errIfMissing) - qiRes, pErr := tc.SendWrappedWith(h, &qiArgs) - if errIfMissing && !expectIntent { + res, pErr := tc.SendWrappedWith(h, &qiArgs) + if errIfMissing && !expectMatchingTxn { ownIntent := baTxn != nil if ownIntent && txnMeta.WriteTimestamp.Less(txn.WriteTimestamp) { if _, ok := pErr.GetDetail().(*kvpb.TransactionRetryError); !ok { @@ -6153,34 +6154,32 @@ func TestQueryIntentRequest(t *testing.T) { } } } else { - if pErr != nil { - t.Fatal(pErr) - } - if e, a := expectIntent, qiRes.(*kvpb.QueryIntentResponse).FoundIntent; e != a { - t.Fatalf("expected FoundIntent=%t but FoundIntent=%t", e, a) - } + require.Nil(t, pErr) + qiRes := res.(*kvpb.QueryIntentResponse) + require.Equal(t, expectMatchingTxn, qiRes.FoundIntent) + require.Equal(t, expectMatchingTxnAndTimestamp, qiRes.FoundUnpushedIntent) } } for i, baTxn := range []*roachpb.Transaction{nil, txn} { // Query the intent with the correct txn meta. Should see intent regardless // of whether we're inside the txn or not. - queryIntent(key1, txn.TxnMeta, baTxn, true) + queryIntent(key1, txn.TxnMeta, baTxn, true, true) // Query an intent on a different key for the same transaction. Should not // see an intent. keyPrevent := roachpb.Key(fmt.Sprintf("%s-%t-%d", key2, errIfMissing, i)) - queryIntent(keyPrevent, txn.TxnMeta, baTxn, false) + queryIntent(keyPrevent, txn.TxnMeta, baTxn, false, false) // Query the intent with a larger epoch. Should not see an intent. largerEpochMeta := txn.TxnMeta largerEpochMeta.Epoch++ - queryIntent(key1, largerEpochMeta, baTxn, false) + queryIntent(key1, largerEpochMeta, baTxn, false, false) // Query the intent with a smaller epoch. Should not see an intent. smallerEpochMeta := txn.TxnMeta smallerEpochMeta.Epoch-- - queryIntent(key1, smallerEpochMeta, baTxn, false) + queryIntent(key1, smallerEpochMeta, baTxn, false, false) // Query the intent with a larger timestamp. Should see an intent. // See the comment on QueryIntentRequest.Txn for an explanation of why @@ -6192,27 +6191,27 @@ func TestQueryIntentRequest(t *testing.T) { largerBATxn = largerBATxn.Clone() largerBATxn.WriteTimestamp = largerTSMeta.WriteTimestamp } - queryIntent(key1, largerTSMeta, largerBATxn, true) + queryIntent(key1, largerTSMeta, largerBATxn, true, true) - // Query the intent with a smaller timestamp. Should not see an - // intent unless we're querying our own intent, in which case - // the smaller timestamp will be forwarded to the batch header - // transaction's timestamp. + // Query the intent with a smaller timestamp. Should be considered a + // pushed intent unless we're querying our own intent, in which case the + // smaller timestamp will be forwarded to the batch header transaction's + // timestamp and the intent will be considered an unpushed intent. smallerTSMeta := txn.TxnMeta smallerTSMeta.WriteTimestamp = smallerTSMeta.WriteTimestamp.Prev() - queryIntent(key1, smallerTSMeta, baTxn, baTxn == txn) + queryIntent(key1, smallerTSMeta, baTxn, true, baTxn == txn) // Query the intent with a larger sequence number. Should not see an intent. largerSeqMeta := txn.TxnMeta largerSeqMeta.Sequence++ - queryIntent(key1, largerSeqMeta, baTxn, false) + queryIntent(key1, largerSeqMeta, baTxn, false, false) // Query the intent with a smaller sequence number. Should see an intent. // See the comment on QueryIntentRequest.Txn for an explanation of why // the request behaves like this. smallerSeqMeta := txn.TxnMeta smallerSeqMeta.Sequence-- - queryIntent(key1, smallerSeqMeta, baTxn, true) + queryIntent(key1, smallerSeqMeta, baTxn, true, true) // Perform a write at keyPrevent. The associated intent at this key // was queried and found to be missing, so this write should be diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index fb1157401589..7f755e3c11bd 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -263,23 +263,9 @@ func (r *Replica) updateTimestampCache( case *kvpb.QueryIntentRequest: missing := false if pErr != nil { - switch t := pErr.GetDetail().(type) { - case *kvpb.IntentMissingError: - missing = true - case *kvpb.TransactionRetryError: - // QueryIntent will return a TxnRetry(SERIALIZABLE) error - // if a transaction is querying its own intent and finds - // it pushed. - // - // NB: we check the index of the error above, so this - // TransactionRetryError should indicate a missing intent - // from the QueryIntent request. However, bumping the - // timestamp cache wouldn't cause a correctness issue - // if we found the intent. - missing = t.Reason == kvpb.RETRY_SERIALIZABLE - } + _, missing = pErr.GetDetail().(*kvpb.IntentMissingError) } else { - missing = !resp.(*kvpb.QueryIntentResponse).FoundIntent + missing = !resp.(*kvpb.QueryIntentResponse).FoundUnpushedIntent } if missing { // If the QueryIntent determined that the intent is missing diff --git a/pkg/kv/kvserver/txn_recovery_integration_test.go b/pkg/kv/kvserver/txn_recovery_integration_test.go index d9405442f827..ab4b25a05f8e 100644 --- a/pkg/kv/kvserver/txn_recovery_integration_test.go +++ b/pkg/kv/kvserver/txn_recovery_integration_test.go @@ -371,12 +371,12 @@ func TestTxnClearRangeIntents(t *testing.T) { queryIntent := queryIntentArgs(keyA, txn.TxnMeta, false) reply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), kvpb.Header{}, &queryIntent) require.Nil(t, pErr, "error: %s", pErr) - require.True(t, reply.(*kvpb.QueryIntentResponse).FoundIntent, "intent missing for %q", keyA) + require.True(t, reply.(*kvpb.QueryIntentResponse).FoundUnpushedIntent, "intent missing for %q", keyA) queryIntent = queryIntentArgs(keyB, txn.TxnMeta, false) reply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), kvpb.Header{}, &queryIntent) require.Nil(t, pErr, "error: %s", pErr) - require.True(t, reply.(*kvpb.QueryIntentResponse).FoundIntent, "intent missing for %q", keyB) + require.True(t, reply.(*kvpb.QueryIntentResponse).FoundUnpushedIntent, "intent missing for %q", keyB) // Call ClearRange covering key B and its intent. clearRange := clearRangeArgs(clearFrom, clearTo) diff --git a/pkg/kv/kvserver/txnrecovery/manager.go b/pkg/kv/kvserver/txnrecovery/manager.go index 3991c3d6be21..ea88e29f3a87 100644 --- a/pkg/kv/kvserver/txnrecovery/manager.go +++ b/pkg/kv/kvserver/txnrecovery/manager.go @@ -260,7 +260,7 @@ func (m *manager) resolveIndeterminateCommitForTxnProbe( // any of the in-flight writes failed. for _, ru := range resps[1:] { queryIntentResp := ru.GetInner().(*kvpb.QueryIntentResponse) - if !queryIntentResp.FoundIntent { + if !queryIntentResp.FoundUnpushedIntent { return true /* preventedIntent */, nil, nil } } diff --git a/pkg/kv/kvserver/txnrecovery/manager_test.go b/pkg/kv/kvserver/txnrecovery/manager_test.go index 24e61eb32ceb..71f04ea6bbe8 100644 --- a/pkg/kv/kvserver/txnrecovery/manager_test.go +++ b/pkg/kv/kvserver/txnrecovery/manager_test.go @@ -107,8 +107,8 @@ func TestResolveIndeterminateCommit(t *testing.T) { br := ba.CreateReply() br.Responses[0].GetInner().(*kvpb.QueryTxnResponse).QueriedTxn = txn - br.Responses[1].GetInner().(*kvpb.QueryIntentResponse).FoundIntent = true - br.Responses[2].GetInner().(*kvpb.QueryIntentResponse).FoundIntent = !prevent + br.Responses[1].GetInner().(*kvpb.QueryIntentResponse).FoundUnpushedIntent = true + br.Responses[2].GetInner().(*kvpb.QueryIntentResponse).FoundUnpushedIntent = !prevent mockSender = kv.SenderFunc(func( _ context.Context, ba *kvpb.BatchRequest, @@ -288,8 +288,8 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) { } else { br.Responses[0].GetInner().(*kvpb.QueryTxnResponse).QueriedTxn = txn } - br.Responses[1].GetInner().(*kvpb.QueryIntentResponse).FoundIntent = true - br.Responses[2].GetInner().(*kvpb.QueryIntentResponse).FoundIntent = false + br.Responses[1].GetInner().(*kvpb.QueryIntentResponse).FoundUnpushedIntent = true + br.Responses[2].GetInner().(*kvpb.QueryIntentResponse).FoundUnpushedIntent = false mockSender = kv.SenderFunc(func( _ context.Context, ba *kvpb.BatchRequest,