Skip to content

Commit

Permalink
kv: only scan intents span for QueryIntent request
Browse files Browse the repository at this point in the history
Change QueryIntent to only read from the lock table.
Previously the request required merging the read from the MVCC
iterator with the lock table. This should improve performance.

Also, add unit testing for QueryIntent.

Fixes #69716

Release note: None
  • Loading branch information
andrewbaptist committed May 26, 2022
1 parent 5d67ba5 commit 80f1337
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 24 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ go_test(
"cmd_export_test.go",
"cmd_get_test.go",
"cmd_lease_test.go",
"cmd_query_intent_test.go",
"cmd_query_resolved_timestamp_test.go",
"cmd_recover_txn_test.go",
"cmd_refresh_range_bench_test.go",
Expand Down
45 changes: 21 additions & 24 deletions pkg/kv/kvserver/batcheval/cmd_query_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -52,6 +51,7 @@ func declareKeysQueryIntent(
func QueryIntent(
ctx context.Context, reader storage.Reader, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {

args := cArgs.Args.(*roachpb.QueryIntentRequest)
h := cArgs.Header
reply := resp.(*roachpb.QueryIntentResponse)
Expand All @@ -74,24 +74,18 @@ func QueryIntent(
h.Timestamp, args.Txn.WriteTimestamp)
}

// Read at the specified key at the maximum timestamp. This ensures that we
// see an intent if one exists, regardless of what timestamp it is written
// at.
_, intent, err := storage.MVCCGet(ctx, reader, args.Key, hlc.MaxTimestamp, storage.MVCCGetOptions{
// Perform an inconsistent read so that intents are returned instead of
// causing WriteIntentErrors.
Inconsistent: true,
// Even if the request header contains a txn, perform the engine lookup
// without a transaction so that intents for a matching transaction are
// not returned as values (i.e. we don't want to see our own writes).
Txn: nil,
})
// Read from the lock table to see if an intent exists.
// Iterate over the lock key space with this key as a lower bound.
// With prefix set to true there should be at most one result.
intentPtr, err := storage.QueryIntent(reader, args.Key)
if err != nil {
return result.Result{}, err
}

var curIntentPushed bool
if intent != nil {

if intentPtr != nil {
intent := *intentPtr
// See comment on QueryIntentRequest.Txn for an explanation of this
// comparison.
// TODO(nvanbenschoten): Now that we have a full intent history,
Expand All @@ -103,12 +97,15 @@ func QueryIntent(

// If we found a matching intent, check whether the intent was pushed
// past its expected timestamp.
if reply.FoundIntent {
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
if !reply.FoundIntent {
log.Infof(ctx, "intent mismatch requires - %v == %v and %v == %v and %v <= %v",
args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence)
} else {
cmpTS := args.Txn.WriteTimestamp
if ownTxn {
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
cmpTS.Forward(h.Txn.WriteTimestamp)
}
if cmpTS.Less(intent.Txn.WriteTimestamp) {
Expand All @@ -122,21 +119,21 @@ func QueryIntent(
// the response transaction.
if ownTxn {
reply.Txn = h.Txn.Clone()
reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp)
reply.Txn.WriteTimestamp.Forward((intent).Txn.WriteTimestamp)
}
}
}
}

if !reply.FoundIntent && args.ErrorIfMissing {
if ownTxn && curIntentPushed {
// If the transaction's own intent was pushed, go ahead and
// return a TransactionRetryError immediately with an updated
// transaction proto. This is an optimization that can help
// the txn use refresh spans more effectively.
// If the transaction's own intent was pushed, go ahead and return a
// TransactionRetryError immediately with an updated transaction proto.
// This is an optimization that can help the txn use refresh spans more
// effectively.
return result.Result{}, roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "intent pushed")
}
return result.Result{}, roachpb.NewIntentMissingError(args.Key, intent)
return result.Result{}, roachpb.NewIntentMissingError(args.Key, intentPtr)
}
return result.Result{}, nil
}
123 changes: 123 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_query_intent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestQueryIntent(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
db := storage.NewDefaultInMemForTesting()
defer db.Close()

makeTS := func(ts int64) hlc.Timestamp {
return hlc.Timestamp{WallTime: ts}
}

writeIntent := func(k roachpb.Key, ts int64) roachpb.Transaction {
txn := roachpb.MakeTransaction("test", k, 0, makeTS(ts), 0, 1)
require.NoError(t, storage.MVCCDelete(ctx, db, nil, k, makeTS(ts), hlc.ClockTimestamp{}, &txn))
return txn
}

// write three keys in three separate transactions
keyA := roachpb.Key("a")
keyAA := roachpb.Key("aa")
keyB := roachpb.Key("b")

txA := writeIntent(keyA, 5)
txAA := writeIntent(keyAA, 6)
txB := writeIntent(keyB, 7)

st := cluster.MakeTestingClusterSettings()
clock := hlc.NewClock(hlc.NewManualClock(10).UnixNano, time.Nanosecond)
evalCtx := &MockEvalCtx{ClusterSettings: st, Clock: clock}

// created cloned transactions with the clock changed (since we can't move the
// intents clock)
txABack := *txA.Clone()
txAForward := *txA.Clone()
txABack.WriteTimestamp = txABack.WriteTimestamp.Add(-2, 0)
txAForward.WriteTimestamp = txAForward.WriteTimestamp.Add(20, 0)

tests := []struct {
name string
hTransaction roachpb.Transaction
argTransaction roachpb.Transaction
key roachpb.Key
errorFlag bool
expectError bool
}{
// test standard reading of the three keys
{"readA", txA, txA, keyA, true, false},
{"readAA", txAA, txAA, keyAA, true, false},
{"readB", txB, txB, keyB, true, false},

// test reading a different key than this tx
// error depends on the error flag
{"wrongTx", txA, txA, keyB, true, true},
{"wrongTx", txA, txA, keyB, false, false},

// mismatch transactions in header and body
// error if the two tx don't match regardless of error flag
{"mismatchTx", txA, txB, keyA, true, true},
{"mismatchTx", txA, txB, keyA, false, true},

// simulate pushed intent by moving tx clock back 2
{"clockBack", txABack, txABack, keyA, true, true},
{"clockBack", txABack, txABack, keyA, false, false},

// simulate pushed transaction by moving tx clock forward 2
{"clockFwd", txAForward, txAForward, keyA, true, false},

// mismatch in the header and arg write timestamps - always an error regardless of flag
{"mismatchTxClock", txA, txAForward, keyA, true, true},
{"mismatchTxClock", txA, txAForward, keyA, false, true},

// ok if the arg clock is moved backwards
{"mismatchTxClock", txA, txABack, keyA, true, false},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cArgs := CommandArgs{
Header: roachpb.Header{Timestamp: clock.Now(), Txn: &test.hTransaction},
Args: &roachpb.QueryIntentRequest{
RequestHeader: roachpb.RequestHeader{Key: test.key},
Txn: test.argTransaction.TxnMeta,
ErrorIfMissing: test.errorFlag,
},
}
cArgs.EvalCtx = evalCtx.EvalContext()
var resp roachpb.QueryIntentResponse
_, err := QueryIntent(ctx, db, cArgs, &resp)
if test.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
57 changes: 57 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,63 @@ type EncryptionRegistries struct {
KeyRegistry []byte
}

// QueryIntent will look up an intent given a key. It there is no intent for a
// key, it will return nil rather than an error. Errors are returned for the
// following cases: problem at the storage layer unable to decode the key
// returned from the iterator unable to unmarshal the intent more than one entry
// for this key
func QueryIntent(reader Reader, key roachpb.Key) (*roachpb.Intent, error) {
// translate to a key in the lock space
var intent *roachpb.Intent = nil
lbKey, _ := keys.LockTableSingleKey(key, nil)

iter := reader.NewEngineIterator(IterOptions{Prefix: true, LowerBound: lbKey})
defer iter.Close()

valid, err := iter.SeekEngineKeyGE(EngineKey{Key: lbKey})
if err != nil {
return intent, err
}
if valid {
engineKey, err := iter.EngineKey()
if err != nil {
return intent, err
}
checkKey, err := keys.DecodeLockTableSingleKey(engineKey.Key)
if err != nil {
return intent, err
}
if !checkKey.Equal(key) {
// This should not be possible, a key and using prefix match means that it
// must match.
return intent, errors.AssertionFailedf("key does not match expected %v != %v", checkKey, key)
}
var meta enginepb.MVCCMetadata
if err = protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil {
return intent, err
}
// TODO: Why can't this be inlined?
var x = roachpb.MakeIntent(meta.Txn, checkKey)
intent = &x

hasNext, err := iter.NextEngineKey()
if err != nil {
// we expect false, but not an error
return intent, err
}
// This should not be possible. There can only be one outstanding write
// intent for a key and with prefix match we don't find additional names.
if hasNext {
engineKey, err := iter.EngineKey()
if err != nil {
return intent, err
}
return intent, errors.AssertionFailedf("unexpected additional key found %v while looking for %v", engineKey, key)
}
}
return intent, nil
}

// Scan returns up to max key/value objects starting from start (inclusive)
// and ending at end (non-inclusive). Specify max=0 for unbounded scans. Since
// this code may use an intentInterleavingIter, the caller should not attempt
Expand Down

0 comments on commit 80f1337

Please sign in to comment.