diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 17c32ded28bb..1af4c75e4410 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -104,6 +104,7 @@ go_test( "cmd_export_test.go", "cmd_get_test.go", "cmd_lease_test.go", + "cmd_query_intent_test.go", "cmd_query_resolved_timestamp_test.go", "cmd_recover_txn_test.go", "cmd_refresh_range_bench_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_query_intent.go b/pkg/kv/kvserver/batcheval/cmd_query_intent.go index 80b42e05d55d..dcd47a1693d1 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_intent.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_intent.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -74,24 +73,18 @@ func QueryIntent( h.Timestamp, args.Txn.WriteTimestamp) } - // Read at the specified key at the maximum timestamp. This ensures that we - // see an intent if one exists, regardless of what timestamp it is written - // at. - _, intent, err := storage.MVCCGet(ctx, reader, args.Key, hlc.MaxTimestamp, storage.MVCCGetOptions{ - // Perform an inconsistent read so that intents are returned instead of - // causing WriteIntentErrors. - Inconsistent: true, - // Even if the request header contains a txn, perform the engine lookup - // without a transaction so that intents for a matching transaction are - // not returned as values (i.e. we don't want to see our own writes). - Txn: nil, - }) + // Read from the lock table to see if an intent exists. + // Iterate over the lock key space with this key as a lower bound. + // With prefix set to true there should be at most one result. + intentPtr, err := storage.GetIntent(reader, args.Key) if err != nil { return result.Result{}, err } var curIntentPushed bool - if intent != nil { + + if intentPtr != nil { + intent := *intentPtr // See comment on QueryIntentRequest.Txn for an explanation of this // comparison. // TODO(nvanbenschoten): Now that we have a full intent history, @@ -103,12 +96,15 @@ func QueryIntent( // If we found a matching intent, check whether the intent was pushed // past its expected timestamp. - if reply.FoundIntent { - // If the request is querying an intent for its own transaction, forward - // the timestamp we compare against to the provisional commit timestamp - // in the batch header. + if !reply.FoundIntent { + log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v", + args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence) + } else { cmpTS := args.Txn.WriteTimestamp if ownTxn { + // If the request is querying an intent for its own transaction, forward + // the timestamp we compare against to the provisional commit timestamp + // in the batch header. cmpTS.Forward(h.Txn.WriteTimestamp) } if cmpTS.Less(intent.Txn.WriteTimestamp) { @@ -122,7 +118,7 @@ func QueryIntent( // the response transaction. if ownTxn { reply.Txn = h.Txn.Clone() - reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp) + reply.Txn.WriteTimestamp.Forward((intent).Txn.WriteTimestamp) } } } @@ -130,13 +126,13 @@ func QueryIntent( if !reply.FoundIntent && args.ErrorIfMissing { if ownTxn && curIntentPushed { - // If the transaction's own intent was pushed, go ahead and - // return a TransactionRetryError immediately with an updated - // transaction proto. This is an optimization that can help - // the txn use refresh spans more effectively. + // If the transaction's own intent was pushed, go ahead and return a + // TransactionRetryError immediately with an updated transaction proto. + // This is an optimization that can help the txn use refresh spans more + // effectively. return result.Result{}, roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "intent pushed") } - return result.Result{}, roachpb.NewIntentMissingError(args.Key, intent) + return result.Result{}, roachpb.NewIntentMissingError(args.Key, intentPtr) } return result.Result{}, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_query_intent_test.go b/pkg/kv/kvserver/batcheval/cmd_query_intent_test.go new file mode 100644 index 000000000000..2d63e8b5ec07 --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_query_intent_test.go @@ -0,0 +1,139 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestQueryIntent(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + db := storage.NewDefaultInMemForTesting() + defer db.Close() + + makeTS := func(ts int64) hlc.Timestamp { + return hlc.Timestamp{WallTime: ts} + } + + writeIntent := func(k roachpb.Key, ts int64) roachpb.Transaction { + txn := roachpb.MakeTransaction("test", k, 0, makeTS(ts), 0, 1) + require.NoError(t, storage.MVCCDelete(ctx, db, nil, k, makeTS(ts), hlc.ClockTimestamp{}, &txn)) + return txn + } + + // Write three keys in three separate transactions. + keyA := roachpb.Key("a") + keyAA := roachpb.Key("aa") + keyB := roachpb.Key("b") + + txA := writeIntent(keyA, 5) + txAA := writeIntent(keyAA, 6) + txB := writeIntent(keyB, 7) + + st := cluster.MakeTestingClusterSettings() + clock := hlc.NewClock(hlc.NewManualClock(10).UnixNano, time.Nanosecond) + evalCtx := &MockEvalCtx{ClusterSettings: st, Clock: clock} + + // Since we can't move the intents clock after they are written, created + // cloned transactions with the clock shifted instead. + txABack := *txA.Clone() + txAForward := *txA.Clone() + txABack.WriteTimestamp = txABack.WriteTimestamp.Add(-2, 0) + txAForward.WriteTimestamp = txAForward.WriteTimestamp.Add(20, 0) + + type ResponseType int + const ( + Error ResponseType = iota + NotFound + Success + ) + + tests := []struct { + name string + hTransaction roachpb.Transaction + argTransaction roachpb.Transaction + key roachpb.Key + errorFlag bool + response ResponseType + }{ + // Perform standard reading of all three keys. + {"readA", txA, txA, keyA, true, Success}, + {"readAA", txAA, txAA, keyAA, true, Success}, + {"readB", txB, txB, keyB, true, Success}, + + // This tries reading a different key than this tx was written with. The + // returned error depends on the error flag setting. + {"wrongTxE", txA, txA, keyB, true, Error}, + {"wrongTx", txA, txA, keyB, false, NotFound}, + + // This sets a mismatch for transactions in the header and the body. An + // error is returned regardless of the errorFlag. + {"mismatchTxE", txA, txB, keyA, true, Error}, + {"mismatchTx", txA, txB, keyA, false, Error}, + + // This simulates pushed intents by moving the tx clock backwards in time. + // An error is only returned if the error flag is set. + {"clockBackE", txABack, txABack, keyA, true, Error}, + {"clockBack", txABack, txABack, keyA, false, NotFound}, + + // This simulates pushed transactions by moving the tx clock forward in time. + {"clockFwd", txAForward, txAForward, keyA, true, Success}, + + // This simulates a mismatch in the header and arg write timestamps. This is + // always an error regardless of flag. + {"mismatchTxClockE", txA, txAForward, keyA, true, Error}, + {"mismatchTxClock", txA, txAForward, keyA, false, Error}, + + // It is OK if the time on the arg transaction is moved backwards, its + // unclear if this happens in practice. + {"mismatchTxClock", txA, txABack, keyA, true, Success}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cArgs := CommandArgs{ + Header: roachpb.Header{Timestamp: clock.Now(), Txn: &test.hTransaction}, + Args: &roachpb.QueryIntentRequest{ + RequestHeader: roachpb.RequestHeader{Key: test.key}, + Txn: test.argTransaction.TxnMeta, + ErrorIfMissing: test.errorFlag, + }, + } + cArgs.EvalCtx = evalCtx.EvalContext() + var resp roachpb.QueryIntentResponse + _, err := QueryIntent(ctx, db, cArgs, &resp) + switch test.response { + case Error: + require.Error(t, err) + require.False(t, resp.FoundIntent) + case NotFound: + require.NoError(t, err) + require.False(t, resp.FoundIntent) + case Success: + require.NoError(t, err) + require.True(t, resp.FoundIntent) + } + }) + } +} diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 82ea0ebe5183..d04053cba157 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -883,6 +883,64 @@ type EncryptionRegistries struct { KeyRegistry []byte } +// GetIntent will look up an intent given a key. It there is no intent for a +// key, it will return nil rather than an error. Errors are returned for problem +// at the storage layer, problem decoding the key, problem unmarshalling the +// intent, missing transaction on the intent or multiple intents for this key. +func GetIntent(reader Reader, key roachpb.Key) (*roachpb.Intent, error) { + // translate to a key in the lock space + lbKey, _ := keys.LockTableSingleKey(key, nil) + + iter := reader.NewEngineIterator(IterOptions{Prefix: true, LowerBound: lbKey}) + defer iter.Close() + + valid, err := iter.SeekEngineKeyGE(EngineKey{Key: lbKey}) + if err != nil { + return nil, err + } + if !valid { + return nil, nil + } + + engineKey, err := iter.EngineKey() + if err != nil { + return nil, err + } + checkKey, err := keys.DecodeLockTableSingleKey(engineKey.Key) + if err != nil { + return nil, err + } + if !checkKey.Equal(key) { + // This should not be possible, a key and using prefix match means that it + // must match. + return nil, errors.AssertionFailedf("key does not match expected %v != %v", checkKey, key) + } + var meta enginepb.MVCCMetadata + if err = protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil { + return nil, err + } + if meta.Txn == nil { + return nil, errors.AssertionFailedf("Txn is null for key %v, intent %v", key, meta) + } + intent := roachpb.MakeIntent(meta.Txn, key) + + hasNext, err := iter.NextEngineKey() + if err != nil { + // We expect false on the call to next, but not an error. + return nil, err + } + // This should not be possible. There can only be one outstanding write + // intent for a key and with prefix match we don't find additional names. + if hasNext { + engineKey, err := iter.EngineKey() + if err != nil { + return nil, err + } + return nil, errors.AssertionFailedf("unexpected additional key found %v while looking for %v", engineKey, key) + } + return &intent, nil +} + // Scan returns up to max key/value objects starting from start (inclusive) // and ending at end (non-inclusive). Specify max=0 for unbounded scans. Since // this code may use an intentInterleavingIter, the caller should not attempt diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index c3b873716205..72759400caf0 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -1599,6 +1599,49 @@ func TestFS(t *testing.T) { } } +func TestGetIntent(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + reader, err := Open(ctx, InMemory(), CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err) + defer reader.Close() + + for _, keyName := range []string{"a", "aa", "b"} { + key := roachpb.Key(keyName) + err := MVCCPut(ctx, reader, nil, key, txn1.ReadTimestamp, hlc.ClockTimestamp{}, roachpb.Value{RawBytes: key}, txn1) + require.NoError(t, err) + } + + tests := []struct { + name string + key string + err bool + found bool + }{ + {"found a", "a", false, true}, + {"found aa", "aa", false, true}, + {"found b", "b", false, true}, + {"not found", "c", false, false}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + intent, err := GetIntent(reader, roachpb.Key(test.key)) + if test.err { + require.Error(t, err) + } else { + require.NoError(t, err) + if test.found { + require.NotNil(t, intent) + } else { + require.Nil(t, intent) + } + } + }) + } +} + func TestScanIntents(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)