From 2ff8b5f14f672fc8b4045cd8955fb99219135d0d Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Wed, 25 May 2022 11:56:31 -0400 Subject: [PATCH] kv: only scan intents span for QueryIntent request Change QueryIntent to only read from the lock table. Previously the request required merging the read from the MVCC iterator with the lock table. This should improve performance. Also, add unit testing for QueryIntent. Fixes #69716 Release note: None --- pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + pkg/kv/kvserver/batcheval/cmd_query_intent.go | 37 ++--- .../batcheval/cmd_query_intent_test.go | 141 ++++++++++++++++++ pkg/storage/engine.go | 59 ++++++++ pkg/storage/engine_test.go | 59 ++++++++ 5 files changed, 276 insertions(+), 21 deletions(-) create mode 100644 pkg/kv/kvserver/batcheval/cmd_query_intent_test.go diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 9d1a82dbb3d3..5232668a17c9 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -104,6 +104,7 @@ go_test( "cmd_export_test.go", "cmd_get_test.go", "cmd_lease_test.go", + "cmd_query_intent_test.go", "cmd_query_resolved_timestamp_test.go", "cmd_recover_txn_test.go", "cmd_refresh_range_bench_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_query_intent.go b/pkg/kv/kvserver/batcheval/cmd_query_intent.go index 80b42e05d55d..26debfe3d7e5 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_intent.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_intent.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -74,23 +73,16 @@ func QueryIntent( h.Timestamp, args.Txn.WriteTimestamp) } - // Read at the specified key at the maximum timestamp. This ensures that we - // see an intent if one exists, regardless of what timestamp it is written - // at. - _, intent, err := storage.MVCCGet(ctx, reader, args.Key, hlc.MaxTimestamp, storage.MVCCGetOptions{ - // Perform an inconsistent read so that intents are returned instead of - // causing WriteIntentErrors. - Inconsistent: true, - // Even if the request header contains a txn, perform the engine lookup - // without a transaction so that intents for a matching transaction are - // not returned as values (i.e. we don't want to see our own writes). - Txn: nil, - }) + // Read from the lock table to see if an intent exists. + // Iterate over the lock key space with this key as a lower bound. + // With prefix set to true there should be at most one result. + intent, err := storage.GetIntent(reader, args.Key) if err != nil { return result.Result{}, err } var curIntentPushed bool + if intent != nil { // See comment on QueryIntentRequest.Txn for an explanation of this // comparison. @@ -103,12 +95,15 @@ func QueryIntent( // If we found a matching intent, check whether the intent was pushed // past its expected timestamp. - if reply.FoundIntent { - // If the request is querying an intent for its own transaction, forward - // the timestamp we compare against to the provisional commit timestamp - // in the batch header. + if !reply.FoundIntent { + log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v", + args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence) + } else { cmpTS := args.Txn.WriteTimestamp if ownTxn { + // If the request is querying an intent for its own transaction, forward + // the timestamp we compare against to the provisional commit timestamp + // in the batch header. cmpTS.Forward(h.Txn.WriteTimestamp) } if cmpTS.Less(intent.Txn.WriteTimestamp) { @@ -130,10 +125,10 @@ func QueryIntent( if !reply.FoundIntent && args.ErrorIfMissing { if ownTxn && curIntentPushed { - // If the transaction's own intent was pushed, go ahead and - // return a TransactionRetryError immediately with an updated - // transaction proto. This is an optimization that can help - // the txn use refresh spans more effectively. + // If the transaction's own intent was pushed, go ahead and return a + // TransactionRetryError immediately with an updated transaction proto. + // This is an optimization that can help the txn use refresh spans more + // effectively. return result.Result{}, roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "intent pushed") } return result.Result{}, roachpb.NewIntentMissingError(args.Key, intent) diff --git a/pkg/kv/kvserver/batcheval/cmd_query_intent_test.go b/pkg/kv/kvserver/batcheval/cmd_query_intent_test.go new file mode 100644 index 000000000000..1b0e82010886 --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_query_intent_test.go @@ -0,0 +1,141 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestQueryIntent(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + db := storage.NewDefaultInMemForTesting() + defer db.Close() + + makeTS := func(ts int64) hlc.Timestamp { + return hlc.Timestamp{WallTime: ts} + } + + writeIntent := func(k roachpb.Key, ts int64) roachpb.Transaction { + txn := roachpb.MakeTransaction("test", k, 0, makeTS(ts), 0, 1) + require.NoError(t, storage.MVCCDelete(ctx, db, nil, k, makeTS(ts), hlc.ClockTimestamp{}, &txn)) + return txn + } + + // Write three keys in three separate transactions. + keyA := roachpb.Key("a") + keyAA := roachpb.Key("aa") + keyB := roachpb.Key("b") + keyC := roachpb.Key("c") + + txA := writeIntent(keyA, 5) + txAA := writeIntent(keyAA, 6) + txB := writeIntent(keyB, 7) + + st := cluster.MakeTestingClusterSettings() + clock := hlc.NewClock(hlc.NewManualClock(10), time.Nanosecond) + evalCtx := &MockEvalCtx{ClusterSettings: st, Clock: clock} + + // Since we can't move the intents clock after they are written, created + // cloned transactions with the clock shifted instead. + txABack := *txA.Clone() + txAForward := *txA.Clone() + txABack.WriteTimestamp = txABack.WriteTimestamp.Add(-2, 0) + txAForward.WriteTimestamp = txAForward.WriteTimestamp.Add(20, 0) + + type Success struct{} + type NotFound struct{} + success := Success{} + notFound := NotFound{} + + tests := []struct { + name string + hTransaction roachpb.Transaction + argTransaction roachpb.Transaction + key roachpb.Key + errorFlag bool + response interface{} + }{ + // Perform standard reading of all three keys. + {"readA", txA, txA, keyA, true, success}, + {"readAA", txAA, txAA, keyAA, true, success}, + {"readB", txB, txB, keyB, true, success}, + + {"readC", txA, txA, keyC, true, &roachpb.IntentMissingError{}}, + + // This tries reading a different key than this tx was written with. The + // returned error depends on the error flag setting. + {"wrongTxE", txA, txA, keyB, true, &roachpb.IntentMissingError{}}, + {"wrongTx", txA, txA, keyB, false, notFound}, + + // This sets a mismatch for transactions in the header and the body. An + // error is returned regardless of the errorFlag. + {"mismatchTxE", txA, txB, keyA, true, errors.AssertionFailedf("")}, + {"mismatchTx", txA, txB, keyA, false, errors.AssertionFailedf("")}, + + // This simulates pushed intents by moving the tx clock backwards in time. + // An error is only returned if the error flag is set. + {"clockBackE", txABack, txABack, keyA, true, &roachpb.TransactionRetryError{}}, + {"clockBack", txABack, txABack, keyA, false, notFound}, + + // This simulates pushed transactions by moving the tx clock forward in time. + {"clockFwd", txAForward, txAForward, keyA, true, success}, + + // This simulates a mismatch in the header and arg write timestamps. This is + // always an error regardless of flag. + {"mismatchTxClockE", txA, txAForward, keyA, true, errors.AssertionFailedf("")}, + {"mismatchTxClock", txA, txAForward, keyA, false, errors.AssertionFailedf("")}, + + // It is OK if the time on the arg transaction is moved backwards, its + // unclear if this happens in practice. + {"mismatchTxClock", txA, txABack, keyA, true, success}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cArgs := CommandArgs{ + Header: roachpb.Header{Timestamp: clock.Now(), Txn: &test.hTransaction}, + Args: &roachpb.QueryIntentRequest{ + RequestHeader: roachpb.RequestHeader{Key: test.key}, + Txn: test.argTransaction.TxnMeta, + ErrorIfMissing: test.errorFlag, + }, + } + cArgs.EvalCtx = evalCtx.EvalContext() + var resp roachpb.QueryIntentResponse + _, err := QueryIntent(ctx, db, cArgs, &resp) + switch test.response { + case success: + require.NoError(t, err) + require.True(t, resp.FoundIntent) + case notFound: + require.NoError(t, err) + require.False(t, resp.FoundIntent) + default: + require.IsType(t, test.response, err, "received %v", err) + require.False(t, resp.FoundIntent) + } + }) + } +} diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 82ea0ebe5183..eb57b011915c 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -883,6 +883,65 @@ type EncryptionRegistries struct { KeyRegistry []byte } +// GetIntent will look up an intent given a key. It there is no intent for a +// key, it will return nil rather than an error. Errors are returned for problem +// at the storage layer, problem decoding the key, problem unmarshalling the +// intent, missing transaction on the intent or multiple intents for this key. +func GetIntent(reader Reader, key roachpb.Key) (*roachpb.Intent, error) { + // Translate this key from a regular key to one in the lock space so it can be + // used for queries. + lbKey, _ := keys.LockTableSingleKey(key, nil) + + iter := reader.NewEngineIterator(IterOptions{Prefix: true, LowerBound: lbKey}) + defer iter.Close() + + valid, err := iter.SeekEngineKeyGE(EngineKey{Key: lbKey}) + if err != nil { + return nil, err + } + if !valid { + return nil, nil + } + + engineKey, err := iter.EngineKey() + if err != nil { + return nil, err + } + checkKey, err := keys.DecodeLockTableSingleKey(engineKey.Key) + if err != nil { + return nil, err + } + if !checkKey.Equal(key) { + // This should not be possible, a key and using prefix match means that it + // must match. + return nil, errors.AssertionFailedf("key does not match expected %v != %v", checkKey, key) + } + var meta enginepb.MVCCMetadata + if err = protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil { + return nil, err + } + if meta.Txn == nil { + return nil, errors.AssertionFailedf("txn is null for key %v, intent %v", key, meta) + } + intent := roachpb.MakeIntent(meta.Txn, key) + + hasNext, err := iter.NextEngineKey() + if err != nil { + // We expect false on the call to next, but not an error. + return nil, err + } + // This should not be possible. There can only be one outstanding write + // intent for a key and with prefix match we don't find additional names. + if hasNext { + engineKey, err := iter.EngineKey() + if err != nil { + return nil, err + } + return nil, errors.AssertionFailedf("unexpected additional key found %v while looking for %v", engineKey, key) + } + return &intent, nil +} + // Scan returns up to max key/value objects starting from start (inclusive) // and ending at end (non-inclusive). Specify max=0 for unbounded scans. Since // this code may use an intentInterleavingIter, the caller should not attempt diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index c3b873716205..70ce7a2f9e76 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" "github.com/cockroachdb/pebble" @@ -1599,6 +1600,64 @@ func TestFS(t *testing.T) { } } +func TestGetIntent(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + reader, err := Open(ctx, InMemory(), CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err) + defer reader.Close() + + txn1ID := uuid.MakeV4() + txn1TS := hlc.Timestamp{Logical: 1} + txn1 := &roachpb.Transaction{TxnMeta: enginepb.TxnMeta{Key: roachpb.Key("a"), ID: txn1ID, Epoch: 1, WriteTimestamp: txn1TS, MinTimestamp: txn1TS, CoordinatorNodeID: 1}, ReadTimestamp: txn1TS} + + for _, keyName := range []string{"a", "aa"} { + key := roachpb.Key(keyName) + err := MVCCPut(ctx, reader, nil, key, txn1.ReadTimestamp, hlc.ClockTimestamp{}, roachpb.Value{RawBytes: key}, txn1) + require.NoError(t, err) + } + + txn2ID := uuid.MakeV4() + txn2TS := hlc.Timestamp{Logical: 2} + txn2 := &roachpb.Transaction{TxnMeta: enginepb.TxnMeta{Key: roachpb.Key("a"), ID: txn2ID, Epoch: 2, WriteTimestamp: txn2TS, MinTimestamp: txn2TS, CoordinatorNodeID: 2}, ReadTimestamp: txn2TS} + + key := roachpb.Key("b") + err = MVCCPut(ctx, reader, nil, key, txn2.ReadTimestamp, hlc.ClockTimestamp{}, roachpb.Value{RawBytes: key}, txn2) + require.NoError(t, err) + + tests := []struct { + name string + key string + txn *roachpb.Transaction + err bool + found bool + }{ + {"found a", "a", txn1, false, true}, + {"found aa", "aa", txn1, false, true}, + {"found b", "b", txn2, false, true}, + {"not found", "c", nil, false, false}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + intent, err := GetIntent(reader, roachpb.Key(test.key)) + if test.err { + require.Error(t, err) + } else { + require.NoError(t, err) + if test.found { + require.NotNil(t, intent) + require.Equal(t, roachpb.Key(test.key), intent.Key) + require.Equal(t, test.txn.TxnMeta, intent.Txn) + } else { + require.Nil(t, intent) + } + } + }) + } +} + func TestScanIntents(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)