Skip to content

Commit

Permalink
kv: don't error on pushed intent during QueryIntent, increase write t…
Browse files Browse the repository at this point in the history
…imestamp

Fixes #95225.

This commit updates QueryIntent's behavior when `ErrorIfMissing` to only return
an error if no matching intent is found. Notably, the request no longer returns
an error if a matching intent is found, but has been pushed.

Previously, this case would result in a `RETRY_SERIALIZABLE` ("intent pushed")
error, causing the querying transaction to immediately refresh and retry. At the
time, we thought that this behavior was "an optimization" because it allowed a
transaction to notice that it had been pushed and may abort earlier, instead of
waiting until commit time. In practice, this does not seem to be a meaningful
improvement. Meanwhile, it makes any solution to #95227 more difficult because
it allows a stream of high-priority reading transaction to starve a low-priority
writing transaction before that writing transaction even reaches its commit
phase. We'd like to solve these starvation cases through some form of
commit-time coordination (either blocking pushers temporarily or refreshing into
the future). This change helps unlock those solutions.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 20, 2023
1 parent 50bc51e commit 77640e5
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 116 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
qiReply.reply = qiBa.CreateReply()
for _, ru := range qiReply.reply.Responses {
ru.GetQueryIntent().FoundIntent = true
ru.GetQueryIntent().FoundUnpushedIntent = true
}
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,12 +702,19 @@ func (tp *txnPipeliner) updateLockTrackingInner(
// Remove any in-flight writes that were proven to exist.
// It shouldn't be possible for a QueryIntentRequest with
// the ErrorIfMissing option set to return without error
// and with with FoundIntent=false, but we handle that
// case here because it happens a lot in tests.
if resp.(*kvpb.QueryIntentResponse).FoundIntent {
// and with FoundIntent=false, but we handle that case here
// because it happens a lot in tests.
// TODO(nvanbenschoten): we only need to check FoundIntent, but this field
// was not set before v23.2, so for now, we check both fields. Remove this
// in the future.
qiResp := resp.(*kvpb.QueryIntentResponse)
if qiResp.FoundIntent || qiResp.FoundUnpushedIntent {
tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence)
// Move to lock footprint.
tp.lockFootprint.insert(roachpb.Span{Key: qiReq.Key})
} else {
log.Warningf(ctx,
"QueryIntent(ErrorIfMissing=true) found no intent, but did not error; resp=%+v", qiResp)
}
} else if kvpb.IsLocking(req) {
// If the request intended to acquire locks, track its lock spans.
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,16 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
br = ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.COMMITTED
br.Responses[1].GetQueryIntent().FoundIntent = true
// NOTE: expected response from a v23.1 node.
// TODO(nvanbenschoten): update this case when v23.1 compatibility is no
// longer required.
br.Responses[2].GetQueryIntent().FoundIntent = false
br.Responses[1].GetQueryIntent().FoundUnpushedIntent = true
// NOTE: expected responses from a v23.2 node.
br.Responses[2].GetQueryIntent().FoundIntent = true
br.Responses[2].GetQueryIntent().FoundUnpushedIntent = true
br.Responses[3].GetQueryIntent().FoundIntent = true
br.Responses[2].GetQueryIntent().FoundUnpushedIntent = false
return br, nil
})

Expand Down
38 changes: 26 additions & 12 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1290,27 +1290,36 @@ message QueryTxnResponse {

// A QueryIntentRequest is arguments to the QueryIntent() method. It visits
// the specified key and checks whether an intent is present for the given
// transaction. If the intent is found to be missing then it is prevented
// from ever being written in the future.
// transaction. If the intent is found, it checks whether the intent has a
// timestamp at or below the given transaction's timestamp to determine full
// vs. partial matches. If, on the other hand, the intent is found to be
// missing then it is prevented from ever being written in the future.
message QueryIntentRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];

// The TxnMeta that the intent is expected to have. Specifically, whether an
// intent is a match or not is defined as whether an intent exists that could
// be committed by the provided transaction. If an intent is found at the
// specified key, the intent is only considered a match if it has the same ID,
// the same epoch, and a write timestamp that is equal to or less than that in
// the provided transaction.
// specified key, the intent is only considered a "full match" if it has the
// same ID, the same epoch, and a write timestamp that is equal to or less
// than that in the provided transaction.
//
// Additionally, the intent is only considered a match if its sequence number
// is equal to or greater than the expected txn's sequence number. The
// Additionally, the intent is only considered a "full match" if its sequence
// number is equal to or greater than the expected txn's sequence number. The
// requests doesn't require an exact sequence number match because the
// transaction could have performed overlapping writes, in which case only the
// latest sequence number will remain. We assume that if a transaction has
// successfully written an intent at a larger sequence number then it must
// have succeeded in writing an intent at the smaller sequence number as
// well.
//
// The intent is considered to be "pushed" and a "partial match" if it
// satisfies all conditions above except the timestamp condition. In these
// cases, the intent can not be committed by the provided transaction at its
// current provisional commit timestamp, but it could be committed by that
// transaction at a later commit timestamp. The intent's value can also be
// read by the provided transaction, even though it has been pushed.
//
// QueryIntentRequests may be issued in non-transactional BatchRequests or in
// transactional BatchRequests. If issued inside of a transaction, the TxnMeta
// must be a reference to the same transaction as the batch's transaction, or
Expand All @@ -1323,17 +1332,22 @@ message QueryIntentRequest {
// own intent after having successfully refreshed.
storage.enginepb.TxnMeta txn = 2 [(gogoproto.nullable) = false];

// If true, return an IntentMissingError if a matching intent is not found.
// Special-cased to return a SERIALIZABLE retry error if a SERIALIZABLE
// transaction queries its own intent and finds it has been pushed.
// If true, return an IntentMissingError if no matching intent (neither a
// "partial match" nor a "full match") is found.
bool error_if_missing = 3;
}

// A QueryIntentResponse is the return value from the QueryIntent() method.
message QueryIntentResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
// Whether an intent matching the expected transaction was found at the key.
bool found_intent = 2;
// Whether an intent matching the expected transaction was found at the key
// with a timestamp that is equal to or less than that in the provided
// transaction - a "full match" as defined above.
bool found_unpushed_intent = 2;
// Whether an intent matching the expected transaction was found at the key,
// regardless of the intent's timestamp — a "partial match" as defined above.
// found_unpushed_intent implies found_intent.
bool found_intent = 3;
}

// A QueryLocksRequest is arguments to the QueryLocks() method. It requests
Expand Down
32 changes: 12 additions & 20 deletions pkg/kv/kvserver/batcheval/cmd_query_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ func declareKeysQueryIntent(
// happens during the timestamp cache update).
//
// QueryIntent returns an error if the intent is missing and its ErrorIfMissing
// field is set to true. This error is typically an IntentMissingError, but the
// request is special-cased to return a SERIALIZABLE retry error if a transaction
// queries its own intent and finds it has been pushed.
// field is set to true.
func QueryIntent(
ctx context.Context, reader storage.Reader, cArgs CommandArgs, resp kvpb.Response,
) (result.Result, error) {
Expand Down Expand Up @@ -81,8 +79,8 @@ func QueryIntent(
return result.Result{}, err
}

var curIntentPushed bool

reply.FoundIntent = false
reply.FoundUnpushedIntent = false
if intent != nil {
// See comment on QueryIntentRequest.Txn for an explanation of this
// comparison.
Expand All @@ -93,44 +91,38 @@ func QueryIntent(
(args.Txn.Epoch == intent.Txn.Epoch) &&
(args.Txn.Sequence <= intent.Txn.Sequence)

// If we found a matching intent, check whether the intent was pushed
// past its expected timestamp.
if !reply.FoundIntent {
log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v",
args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence)
} else {
// If we found a matching intent, check whether the intent was pushed past
// its expected timestamp.
cmpTS := args.Txn.WriteTimestamp
if ownTxn {
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
cmpTS.Forward(h.Txn.WriteTimestamp)
}
if cmpTS.Less(intent.Txn.WriteTimestamp) {
// The intent matched but was pushed to a later timestamp. Consider a
// pushed intent a missing intent.
curIntentPushed = true
log.VEventf(ctx, 2, "found pushed intent")
reply.FoundIntent = false
reply.FoundUnpushedIntent = intent.Txn.WriteTimestamp.LessEq(cmpTS)

if !reply.FoundUnpushedIntent {
log.VEventf(ctx, 2, "found pushed intent")
// If the request was querying an intent in its own transaction, update
// the response transaction.
// TODO(nvanbenschoten): if this is necessary for correctness, say so.
// And then add a test to demonstrate that.
if ownTxn {
reply.Txn = h.Txn.Clone()
reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp)
}
}
}
} else {
log.VEventf(ctx, 2, "found no intent")
}

if !reply.FoundIntent && args.ErrorIfMissing {
if ownTxn && curIntentPushed {
// If the transaction's own intent was pushed, go ahead and return a
// TransactionRetryError immediately with an updated transaction proto.
// This is an optimization that can help the txn use refresh spans more
// effectively.
return result.Result{}, kvpb.NewTransactionRetryError(kvpb.RETRY_SERIALIZABLE, "intent pushed")
}
return result.Result{}, kvpb.NewIntentMissingError(args.Key, intent)
}
return result.Result{}, nil
Expand Down
98 changes: 62 additions & 36 deletions pkg/kv/kvserver/batcheval/cmd_query_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,59 +59,62 @@ func TestQueryIntent(t *testing.T) {
clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Unix(0, 10)))
evalCtx := &MockEvalCtx{ClusterSettings: st, Clock: clock}

// Since we can't move the intents clock after they are written, created
// cloned transactions with the clock shifted instead.
// Since we can't move the intents timestamp after they are written, created
// cloned transactions with the timestamp shifted instead.
txABack := *txA.Clone()
txAForward := *txA.Clone()
txABack.WriteTimestamp = txABack.WriteTimestamp.Add(-2, 0)
txAForward.WriteTimestamp = txAForward.WriteTimestamp.Add(20, 0)

type Success struct{}
type NotFound struct{}
success := Success{}
notFound := NotFound{}
type response int
const (
_ response = iota
expAssertionError
expIntentMissingError
expNotFound
expFoundIntent
expFoundUnpushedIntent
)

tests := []struct {
name string
hTransaction roachpb.Transaction
argTransaction roachpb.Transaction
key roachpb.Key
errorFlag bool
response interface{}
resp response
}{
// Perform standard reading of all three keys.
{"readA", txA, txA, keyA, true, success},
{"readAA", txAA, txAA, keyAA, true, success},
{"readB", txB, txB, keyB, true, success},

{"readC", txA, txA, keyC, true, &kvpb.IntentMissingError{}},
{"readA", txA, txA, keyA, true, expFoundUnpushedIntent},
{"readAA", txAA, txAA, keyAA, true, expFoundUnpushedIntent},
{"readB", txB, txB, keyB, true, expFoundUnpushedIntent},
{"readC", txA, txA, keyC, true, expIntentMissingError},

// This tries reading a different key than this tx was written with. The
// returned error depends on the error flag setting.
{"wrongTxE", txA, txA, keyB, true, &kvpb.IntentMissingError{}},
{"wrongTx", txA, txA, keyB, false, notFound},
{"wrongTxErr", txA, txA, keyB, true, expIntentMissingError},
{"wrongTx", txA, txA, keyB, false, expNotFound},

// This sets a mismatch for transactions in the header and the body. An
// error is returned regardless of the errorFlag.
{"mismatchTxE", txA, txB, keyA, true, errors.AssertionFailedf("")},
{"mismatchTx", txA, txB, keyA, false, errors.AssertionFailedf("")},
{"mismatchTxErr", txA, txB, keyA, true, expAssertionError},
{"mismatchTx", txA, txB, keyA, false, expAssertionError},

// This simulates pushed intents by moving the tx clock backwards in time.
// An error is only returned if the error flag is set.
{"clockBackE", txABack, txABack, keyA, true, &kvpb.TransactionRetryError{}},
{"clockBack", txABack, txABack, keyA, false, notFound},
// This simulates pushed intents by moving the tx timestamp backwards in time.
// An error is not returned, regardless of the error flag.
{"timestampBackErr", txABack, txABack, keyA, true, expFoundIntent},
{"timestampBack", txABack, txABack, keyA, false, expFoundIntent},

// This simulates pushed transactions by moving the tx clock forward in time.
{"clockFwd", txAForward, txAForward, keyA, true, success},
// This simulates pushed transactions by moving the tx timestamp forward in time.
// In two of the cases, the header timestamp leads the argument's timestamp.
{"timestampFwd", txAForward, txAForward, keyA, true, expFoundUnpushedIntent},
{"timestampFwdHeaderAhead", txAForward, txA, keyA, true, expFoundUnpushedIntent},
{"timestampEqualHeaderAhead", txA, txABack, keyA, true, expFoundUnpushedIntent},

// This simulates a mismatch in the header and arg write timestamps. This is
// always an error regardless of flag.
{"mismatchTxClockE", txA, txAForward, keyA, true, errors.AssertionFailedf("")},
{"mismatchTxClock", txA, txAForward, keyA, false, errors.AssertionFailedf("")},

// It is OK if the time on the arg transaction is moved backwards, its
// unclear if this happens in practice.
{"mismatchTxClock", txA, txABack, keyA, true, success},
{"headerBehindErr", txA, txAForward, keyA, true, expAssertionError},
{"headerBehind", txA, txAForward, keyA, false, expAssertionError},
}

for _, test := range tests {
Expand All @@ -127,16 +130,39 @@ func TestQueryIntent(t *testing.T) {
cArgs.EvalCtx = evalCtx.EvalContext()
var resp kvpb.QueryIntentResponse
_, err := QueryIntent(ctx, db, cArgs, &resp)
switch test.response {
case success:
require.NoError(t, err)
require.True(t, resp.FoundIntent)
case notFound:
require.NoError(t, err)
switch test.resp {
case expAssertionError:
require.NotNil(t, err)
require.IsType(t, errors.AssertionFailedf(""), err)
require.False(t, resp.FoundIntent)
default:
require.IsType(t, test.response, err, "received %v", err)
require.False(t, resp.FoundUnpushedIntent)
require.Nil(t, resp.Txn)
case expIntentMissingError:
require.NotNil(t, err)
require.IsType(t, &kvpb.IntentMissingError{}, err)
require.False(t, resp.FoundIntent)
require.False(t, resp.FoundUnpushedIntent)
require.Nil(t, resp.Txn)
case expNotFound:
require.Nil(t, err)
require.False(t, resp.FoundIntent)
require.False(t, resp.FoundUnpushedIntent)
require.Nil(t, resp.Txn)
case expFoundIntent:
require.Nil(t, err)
require.True(t, resp.FoundIntent)
require.False(t, resp.FoundUnpushedIntent)
// If the intent was found but was pushed, the response also carries the
// updated write timestamp.
require.NotNil(t, resp.Txn)
require.True(t, test.hTransaction.WriteTimestamp.Less(resp.Txn.WriteTimestamp))
case expFoundUnpushedIntent:
require.Nil(t, err)
require.True(t, resp.FoundIntent)
require.True(t, resp.FoundUnpushedIntent)
require.Nil(t, resp.Txn)
default:
t.Fatalf("unexpected response: %v", test.resp)
}
})
}
Expand Down
Loading

0 comments on commit 77640e5

Please sign in to comment.