Skip to content

Commit

Permalink
kv: only scan intents span for QueryIntent request
Browse files Browse the repository at this point in the history
Change QueryIntent to only read from the lock table.
Previously the request required merging the read from the MVCC
iterator with the lock table. This should improve performance.

Also, add unit testing for QueryIntent.

Fixes #69716

Release note: None
  • Loading branch information
andrewbaptist committed Jun 2, 2022
1 parent 3bdc450 commit 2ff8b5f
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ go_test(
"cmd_export_test.go",
"cmd_get_test.go",
"cmd_lease_test.go",
"cmd_query_intent_test.go",
"cmd_query_resolved_timestamp_test.go",
"cmd_recover_txn_test.go",
"cmd_refresh_range_bench_test.go",
Expand Down
37 changes: 16 additions & 21 deletions pkg/kv/kvserver/batcheval/cmd_query_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -74,23 +73,16 @@ func QueryIntent(
h.Timestamp, args.Txn.WriteTimestamp)
}

// Read at the specified key at the maximum timestamp. This ensures that we
// see an intent if one exists, regardless of what timestamp it is written
// at.
_, intent, err := storage.MVCCGet(ctx, reader, args.Key, hlc.MaxTimestamp, storage.MVCCGetOptions{
// Perform an inconsistent read so that intents are returned instead of
// causing WriteIntentErrors.
Inconsistent: true,
// Even if the request header contains a txn, perform the engine lookup
// without a transaction so that intents for a matching transaction are
// not returned as values (i.e. we don't want to see our own writes).
Txn: nil,
})
// Read from the lock table to see if an intent exists.
// Iterate over the lock key space with this key as a lower bound.
// With prefix set to true there should be at most one result.
intent, err := storage.GetIntent(reader, args.Key)
if err != nil {
return result.Result{}, err
}

var curIntentPushed bool

if intent != nil {
// See comment on QueryIntentRequest.Txn for an explanation of this
// comparison.
Expand All @@ -103,12 +95,15 @@ func QueryIntent(

// If we found a matching intent, check whether the intent was pushed
// past its expected timestamp.
if reply.FoundIntent {
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
if !reply.FoundIntent {
log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v",
args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence)
} else {
cmpTS := args.Txn.WriteTimestamp
if ownTxn {
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
cmpTS.Forward(h.Txn.WriteTimestamp)
}
if cmpTS.Less(intent.Txn.WriteTimestamp) {
Expand All @@ -130,10 +125,10 @@ func QueryIntent(

if !reply.FoundIntent && args.ErrorIfMissing {
if ownTxn && curIntentPushed {
// If the transaction's own intent was pushed, go ahead and
// return a TransactionRetryError immediately with an updated
// transaction proto. This is an optimization that can help
// the txn use refresh spans more effectively.
// If the transaction's own intent was pushed, go ahead and return a
// TransactionRetryError immediately with an updated transaction proto.
// This is an optimization that can help the txn use refresh spans more
// effectively.
return result.Result{}, roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "intent pushed")
}
return result.Result{}, roachpb.NewIntentMissingError(args.Key, intent)
Expand Down
141 changes: 141 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_query_intent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestQueryIntent(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
db := storage.NewDefaultInMemForTesting()
defer db.Close()

makeTS := func(ts int64) hlc.Timestamp {
return hlc.Timestamp{WallTime: ts}
}

writeIntent := func(k roachpb.Key, ts int64) roachpb.Transaction {
txn := roachpb.MakeTransaction("test", k, 0, makeTS(ts), 0, 1)
require.NoError(t, storage.MVCCDelete(ctx, db, nil, k, makeTS(ts), hlc.ClockTimestamp{}, &txn))
return txn
}

// Write three keys in three separate transactions.
keyA := roachpb.Key("a")
keyAA := roachpb.Key("aa")
keyB := roachpb.Key("b")
keyC := roachpb.Key("c")

txA := writeIntent(keyA, 5)
txAA := writeIntent(keyAA, 6)
txB := writeIntent(keyB, 7)

st := cluster.MakeTestingClusterSettings()
clock := hlc.NewClock(hlc.NewManualClock(10), time.Nanosecond)
evalCtx := &MockEvalCtx{ClusterSettings: st, Clock: clock}

// Since we can't move the intents clock after they are written, created
// cloned transactions with the clock shifted instead.
txABack := *txA.Clone()
txAForward := *txA.Clone()
txABack.WriteTimestamp = txABack.WriteTimestamp.Add(-2, 0)
txAForward.WriteTimestamp = txAForward.WriteTimestamp.Add(20, 0)

type Success struct{}
type NotFound struct{}
success := Success{}
notFound := NotFound{}

tests := []struct {
name string
hTransaction roachpb.Transaction
argTransaction roachpb.Transaction
key roachpb.Key
errorFlag bool
response interface{}
}{
// Perform standard reading of all three keys.
{"readA", txA, txA, keyA, true, success},
{"readAA", txAA, txAA, keyAA, true, success},
{"readB", txB, txB, keyB, true, success},

{"readC", txA, txA, keyC, true, &roachpb.IntentMissingError{}},

// This tries reading a different key than this tx was written with. The
// returned error depends on the error flag setting.
{"wrongTxE", txA, txA, keyB, true, &roachpb.IntentMissingError{}},
{"wrongTx", txA, txA, keyB, false, notFound},

// This sets a mismatch for transactions in the header and the body. An
// error is returned regardless of the errorFlag.
{"mismatchTxE", txA, txB, keyA, true, errors.AssertionFailedf("")},
{"mismatchTx", txA, txB, keyA, false, errors.AssertionFailedf("")},

// This simulates pushed intents by moving the tx clock backwards in time.
// An error is only returned if the error flag is set.
{"clockBackE", txABack, txABack, keyA, true, &roachpb.TransactionRetryError{}},
{"clockBack", txABack, txABack, keyA, false, notFound},

// This simulates pushed transactions by moving the tx clock forward in time.
{"clockFwd", txAForward, txAForward, keyA, true, success},

// This simulates a mismatch in the header and arg write timestamps. This is
// always an error regardless of flag.
{"mismatchTxClockE", txA, txAForward, keyA, true, errors.AssertionFailedf("")},
{"mismatchTxClock", txA, txAForward, keyA, false, errors.AssertionFailedf("")},

// It is OK if the time on the arg transaction is moved backwards, its
// unclear if this happens in practice.
{"mismatchTxClock", txA, txABack, keyA, true, success},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cArgs := CommandArgs{
Header: roachpb.Header{Timestamp: clock.Now(), Txn: &test.hTransaction},
Args: &roachpb.QueryIntentRequest{
RequestHeader: roachpb.RequestHeader{Key: test.key},
Txn: test.argTransaction.TxnMeta,
ErrorIfMissing: test.errorFlag,
},
}
cArgs.EvalCtx = evalCtx.EvalContext()
var resp roachpb.QueryIntentResponse
_, err := QueryIntent(ctx, db, cArgs, &resp)
switch test.response {
case success:
require.NoError(t, err)
require.True(t, resp.FoundIntent)
case notFound:
require.NoError(t, err)
require.False(t, resp.FoundIntent)
default:
require.IsType(t, test.response, err, "received %v", err)
require.False(t, resp.FoundIntent)
}
})
}
}
59 changes: 59 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,65 @@ type EncryptionRegistries struct {
KeyRegistry []byte
}

// GetIntent will look up an intent given a key. It there is no intent for a
// key, it will return nil rather than an error. Errors are returned for problem
// at the storage layer, problem decoding the key, problem unmarshalling the
// intent, missing transaction on the intent or multiple intents for this key.
func GetIntent(reader Reader, key roachpb.Key) (*roachpb.Intent, error) {
// Translate this key from a regular key to one in the lock space so it can be
// used for queries.
lbKey, _ := keys.LockTableSingleKey(key, nil)

iter := reader.NewEngineIterator(IterOptions{Prefix: true, LowerBound: lbKey})
defer iter.Close()

valid, err := iter.SeekEngineKeyGE(EngineKey{Key: lbKey})
if err != nil {
return nil, err
}
if !valid {
return nil, nil
}

engineKey, err := iter.EngineKey()
if err != nil {
return nil, err
}
checkKey, err := keys.DecodeLockTableSingleKey(engineKey.Key)
if err != nil {
return nil, err
}
if !checkKey.Equal(key) {
// This should not be possible, a key and using prefix match means that it
// must match.
return nil, errors.AssertionFailedf("key does not match expected %v != %v", checkKey, key)
}
var meta enginepb.MVCCMetadata
if err = protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil {
return nil, err
}
if meta.Txn == nil {
return nil, errors.AssertionFailedf("txn is null for key %v, intent %v", key, meta)
}
intent := roachpb.MakeIntent(meta.Txn, key)

hasNext, err := iter.NextEngineKey()
if err != nil {
// We expect false on the call to next, but not an error.
return nil, err
}
// This should not be possible. There can only be one outstanding write
// intent for a key and with prefix match we don't find additional names.
if hasNext {
engineKey, err := iter.EngineKey()
if err != nil {
return nil, err
}
return nil, errors.AssertionFailedf("unexpected additional key found %v while looking for %v", engineKey, key)
}
return &intent, nil
}

// Scan returns up to max key/value objects starting from start (inclusive)
// and ending at end (non-inclusive). Specify max=0 for unbounded scans. Since
// this code may use an intentInterleavingIter, the caller should not attempt
Expand Down
59 changes: 59 additions & 0 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/pebble"
Expand Down Expand Up @@ -1599,6 +1600,64 @@ func TestFS(t *testing.T) {
}
}

func TestGetIntent(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
reader, err := Open(ctx, InMemory(), CacheSize(1<<20 /* 1 MiB */))
require.NoError(t, err)
defer reader.Close()

txn1ID := uuid.MakeV4()
txn1TS := hlc.Timestamp{Logical: 1}
txn1 := &roachpb.Transaction{TxnMeta: enginepb.TxnMeta{Key: roachpb.Key("a"), ID: txn1ID, Epoch: 1, WriteTimestamp: txn1TS, MinTimestamp: txn1TS, CoordinatorNodeID: 1}, ReadTimestamp: txn1TS}

for _, keyName := range []string{"a", "aa"} {
key := roachpb.Key(keyName)
err := MVCCPut(ctx, reader, nil, key, txn1.ReadTimestamp, hlc.ClockTimestamp{}, roachpb.Value{RawBytes: key}, txn1)
require.NoError(t, err)
}

txn2ID := uuid.MakeV4()
txn2TS := hlc.Timestamp{Logical: 2}
txn2 := &roachpb.Transaction{TxnMeta: enginepb.TxnMeta{Key: roachpb.Key("a"), ID: txn2ID, Epoch: 2, WriteTimestamp: txn2TS, MinTimestamp: txn2TS, CoordinatorNodeID: 2}, ReadTimestamp: txn2TS}

key := roachpb.Key("b")
err = MVCCPut(ctx, reader, nil, key, txn2.ReadTimestamp, hlc.ClockTimestamp{}, roachpb.Value{RawBytes: key}, txn2)
require.NoError(t, err)

tests := []struct {
name string
key string
txn *roachpb.Transaction
err bool
found bool
}{
{"found a", "a", txn1, false, true},
{"found aa", "aa", txn1, false, true},
{"found b", "b", txn2, false, true},
{"not found", "c", nil, false, false},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
intent, err := GetIntent(reader, roachpb.Key(test.key))
if test.err {
require.Error(t, err)
} else {
require.NoError(t, err)
if test.found {
require.NotNil(t, intent)
require.Equal(t, roachpb.Key(test.key), intent.Key)
require.Equal(t, test.txn.TxnMeta, intent.Txn)
} else {
require.Nil(t, intent)
}
}
})
}
}

func TestScanIntents(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 2ff8b5f

Please sign in to comment.