Skip to content

Commit

Permalink
kv: only scan intents span for QueryIntent request
Browse files Browse the repository at this point in the history
Change QueryIntent to only read from the lock table.
Previously the request required merging the read from the MVCC
iterator with the lock table. This should improve performance.

Also, add unit testing for QueryIntent.

Fixes #69716

Release note: None
  • Loading branch information
andrewbaptist committed May 26, 2022
1 parent 5d67ba5 commit c07169f
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 24 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ go_test(
"cmd_export_test.go",
"cmd_get_test.go",
"cmd_lease_test.go",
"cmd_query_intent_test.go",
"cmd_query_resolved_timestamp_test.go",
"cmd_recover_txn_test.go",
"cmd_refresh_range_bench_test.go",
Expand Down
44 changes: 20 additions & 24 deletions pkg/kv/kvserver/batcheval/cmd_query_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -74,24 +73,18 @@ func QueryIntent(
h.Timestamp, args.Txn.WriteTimestamp)
}

// Read at the specified key at the maximum timestamp. This ensures that we
// see an intent if one exists, regardless of what timestamp it is written
// at.
_, intent, err := storage.MVCCGet(ctx, reader, args.Key, hlc.MaxTimestamp, storage.MVCCGetOptions{
// Perform an inconsistent read so that intents are returned instead of
// causing WriteIntentErrors.
Inconsistent: true,
// Even if the request header contains a txn, perform the engine lookup
// without a transaction so that intents for a matching transaction are
// not returned as values (i.e. we don't want to see our own writes).
Txn: nil,
})
// Read from the lock table to see if an intent exists.
// Iterate over the lock key space with this key as a lower bound.
// With prefix set to true there should be at most one result.
intentPtr, err := storage.GetIntent(reader, args.Key)
if err != nil {
return result.Result{}, err
}

var curIntentPushed bool
if intent != nil {

if intentPtr != nil {
intent := *intentPtr
// See comment on QueryIntentRequest.Txn for an explanation of this
// comparison.
// TODO(nvanbenschoten): Now that we have a full intent history,
Expand All @@ -103,12 +96,15 @@ func QueryIntent(

// If we found a matching intent, check whether the intent was pushed
// past its expected timestamp.
if reply.FoundIntent {
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
if !reply.FoundIntent {
log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v",
args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence)
} else {
cmpTS := args.Txn.WriteTimestamp
if ownTxn {
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
cmpTS.Forward(h.Txn.WriteTimestamp)
}
if cmpTS.Less(intent.Txn.WriteTimestamp) {
Expand All @@ -122,21 +118,21 @@ func QueryIntent(
// the response transaction.
if ownTxn {
reply.Txn = h.Txn.Clone()
reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp)
reply.Txn.WriteTimestamp.Forward((intent).Txn.WriteTimestamp)
}
}
}
}

if !reply.FoundIntent && args.ErrorIfMissing {
if ownTxn && curIntentPushed {
// If the transaction's own intent was pushed, go ahead and
// return a TransactionRetryError immediately with an updated
// transaction proto. This is an optimization that can help
// the txn use refresh spans more effectively.
// If the transaction's own intent was pushed, go ahead and return a
// TransactionRetryError immediately with an updated transaction proto.
// This is an optimization that can help the txn use refresh spans more
// effectively.
return result.Result{}, roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "intent pushed")
}
return result.Result{}, roachpb.NewIntentMissingError(args.Key, intent)
return result.Result{}, roachpb.NewIntentMissingError(args.Key, intentPtr)
}
return result.Result{}, nil
}
139 changes: 139 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_query_intent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestQueryIntent(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
db := storage.NewDefaultInMemForTesting()
defer db.Close()

makeTS := func(ts int64) hlc.Timestamp {
return hlc.Timestamp{WallTime: ts}
}

writeIntent := func(k roachpb.Key, ts int64) roachpb.Transaction {
txn := roachpb.MakeTransaction("test", k, 0, makeTS(ts), 0, 1)
require.NoError(t, storage.MVCCDelete(ctx, db, nil, k, makeTS(ts), hlc.ClockTimestamp{}, &txn))
return txn
}

// Write three keys in three separate transactions.
keyA := roachpb.Key("a")
keyAA := roachpb.Key("aa")
keyB := roachpb.Key("b")

txA := writeIntent(keyA, 5)
txAA := writeIntent(keyAA, 6)
txB := writeIntent(keyB, 7)

st := cluster.MakeTestingClusterSettings()
clock := hlc.NewClock(hlc.NewManualClock(10).UnixNano, time.Nanosecond)
evalCtx := &MockEvalCtx{ClusterSettings: st, Clock: clock}

// Since we can't move the intents clock after they are written, created
// cloned transactions with the clock shifted instead.
txABack := *txA.Clone()
txAForward := *txA.Clone()
txABack.WriteTimestamp = txABack.WriteTimestamp.Add(-2, 0)
txAForward.WriteTimestamp = txAForward.WriteTimestamp.Add(20, 0)

type ResponseType int
const (
Error ResponseType = iota
NotFound
Success
)

tests := []struct {
name string
hTransaction roachpb.Transaction
argTransaction roachpb.Transaction
key roachpb.Key
errorFlag bool
response ResponseType
}{
// Perform standard reading of all three keys.
{"readA", txA, txA, keyA, true, Success},
{"readAA", txAA, txAA, keyAA, true, Success},
{"readB", txB, txB, keyB, true, Success},

// This tries reading a different key than this tx was written with. The
// returned error depends on the error flag setting.
{"wrongTxE", txA, txA, keyB, true, Error},
{"wrongTx", txA, txA, keyB, false, NotFound},

// This sets a mismatch for transactions in the header and the body. An
// error is returned regardless of the errorFlag.
{"mismatchTxE", txA, txB, keyA, true, Error},
{"mismatchTx", txA, txB, keyA, false, Error},

// This simulates pushed intents by moving the tx clock backwards in time.
// An error is only returned if the error flag is set.
{"clockBackE", txABack, txABack, keyA, true, Error},
{"clockBack", txABack, txABack, keyA, false, NotFound},

// This simulates pushed transactions by moving the tx clock forward in time.
{"clockFwd", txAForward, txAForward, keyA, true, Success},

// This simulates a mismatch in the header and arg write timestamps. This is
// always an error regardless of flag.
{"mismatchTxClockE", txA, txAForward, keyA, true, Error},
{"mismatchTxClock", txA, txAForward, keyA, false, Error},

// It is OK if the time on the arg transaction is moved backwards, its
// unclear if this happens in practice.
{"mismatchTxClock", txA, txABack, keyA, true, Success},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cArgs := CommandArgs{
Header: roachpb.Header{Timestamp: clock.Now(), Txn: &test.hTransaction},
Args: &roachpb.QueryIntentRequest{
RequestHeader: roachpb.RequestHeader{Key: test.key},
Txn: test.argTransaction.TxnMeta,
ErrorIfMissing: test.errorFlag,
},
}
cArgs.EvalCtx = evalCtx.EvalContext()
var resp roachpb.QueryIntentResponse
_, err := QueryIntent(ctx, db, cArgs, &resp)
switch test.response {
case Error:
require.Error(t, err)
require.False(t, resp.FoundIntent)
case NotFound:
require.NoError(t, err)
require.False(t, resp.FoundIntent)
case Success:
require.NoError(t, err)
require.True(t, resp.FoundIntent)
}
})
}
}
58 changes: 58 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,64 @@ type EncryptionRegistries struct {
KeyRegistry []byte
}

// GetIntent will look up an intent given a key. It there is no intent for a
// key, it will return nil rather than an error. Errors are returned for problem
// at the storage layer, problem decoding the key, problem unmarshalling the
// intent, missing transaction on the intent or multiple intents for this key.
func GetIntent(reader Reader, key roachpb.Key) (*roachpb.Intent, error) {
// translate to a key in the lock space
lbKey, _ := keys.LockTableSingleKey(key, nil)

iter := reader.NewEngineIterator(IterOptions{Prefix: true, LowerBound: lbKey})
defer iter.Close()

valid, err := iter.SeekEngineKeyGE(EngineKey{Key: lbKey})
if err != nil {
return nil, err
}
if !valid {
return nil, nil
}

engineKey, err := iter.EngineKey()
if err != nil {
return nil, err
}
checkKey, err := keys.DecodeLockTableSingleKey(engineKey.Key)
if err != nil {
return nil, err
}
if !checkKey.Equal(key) {
// This should not be possible, a key and using prefix match means that it
// must match.
return nil, errors.AssertionFailedf("key does not match expected %v != %v", checkKey, key)
}
var meta enginepb.MVCCMetadata
if err = protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil {
return nil, err
}
if meta.Txn == nil {
return nil, errors.AssertionFailedf("Txn is null for key, intent", key, meta)
}
intent := roachpb.MakeIntent(meta.Txn, key)

hasNext, err := iter.NextEngineKey()
if err != nil {
// We expect false on the call to next, but not an error.
return nil, err
}
// This should not be possible. There can only be one outstanding write
// intent for a key and with prefix match we don't find additional names.
if hasNext {
engineKey, err := iter.EngineKey()
if err != nil {
return nil, err
}
return nil, errors.AssertionFailedf("unexpected additional key found %v while looking for %v", engineKey, key)
}
return &intent, nil
}

// Scan returns up to max key/value objects starting from start (inclusive)
// and ending at end (non-inclusive). Specify max=0 for unbounded scans. Since
// this code may use an intentInterleavingIter, the caller should not attempt
Expand Down
43 changes: 43 additions & 0 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1599,6 +1599,49 @@ func TestFS(t *testing.T) {
}
}

func TestGetIntent(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
reader, err := Open(ctx, InMemory(), CacheSize(1<<20 /* 1 MiB */))
require.NoError(t, err)
defer reader.Close()

for _, keyName := range []string{"a", "aa", "b"} {
key := roachpb.Key(keyName)
err := MVCCPut(ctx, reader, nil, key, txn1.ReadTimestamp, hlc.ClockTimestamp{}, roachpb.Value{RawBytes: key}, txn1)
require.NoError(t, err)
}

tests := []struct {
name string
key string
err bool
found bool
}{
{"found a", "a", false, true},
{"found aa", "aa", false, true},
{"found b", "b", false, true},
{"not found", "c", false, false},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
intent, err := GetIntent(reader, roachpb.Key(test.key))
if test.err {
require.Error(t, err)
} else {
require.NoError(t, err)
if test.found {
require.NotNil(t, intent)
} else {
require.Nil(t, intent)
}
}
})
}
}

func TestScanIntents(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit c07169f

Please sign in to comment.