Skip to content

Commit

Permalink
storage: return TransactionRetryError from QueryIntent on pushed intent
Browse files Browse the repository at this point in the history
This change adjusts how QueryIntent handles pushed intents. First, it
changes how pushed intents interact with SNAPSHOT transactions. Next,
it makes sure to update the response transaction in the case of pushed
intents. Finally, it returns a RETRY_SERIALIZABLE TransactionRetryError
for SERIALIZABLE transactions who observe a pushed intent with the
RETURN_ERROR behavior.

Release note: None
  • Loading branch information
nvanbenschoten committed Jul 10, 2018
1 parent 5ce1e07 commit 6dad7b6
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 115 deletions.
13 changes: 9 additions & 4 deletions pkg/roachpb/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -749,9 +749,12 @@ message QueryIntentRequest {
// be committed by the provided transaction. If an intent is found at the
// specified key, the intent is only considered a match if it has the same ID,
// the same epoch, and a provisional commit timestamp that is equal to or less
// than that in the provided transaction. If the intent's timestamp is greater
// than that in the provided transaction, it would prevent the transaction
// from committing and is therefore not a match.
// than that in the provided transaction. For SERIALIZABLE transactions, if
// the intent's timestamp is greater than that in the provided transaction, it
// would prevent the transaction from committing and is therefore not a match.
// However, for SNAPSHOT transactions, if the intent's timestamp is greater
// than that in the provided transaction, it would not prevent the transaction
// from committing and therefore is a match.
//
// Additionally, the intent is only considered a match if its sequence number
// is equal to or greater than the expected txn's sequence number. The
Expand All @@ -767,7 +770,9 @@ message QueryIntentRequest {
// Don't do anything special, just note that the intent was not found in the
// response.
DO_NOTHING = 0;
// Return an IntentMissingError.
// Return an IntentMissingError. Special-cased to return a SERIALIZABLE
// retry error if a SERIALIZABLE transaction queries its own intent and
// finds it has been pushed.
RETURN_ERROR = 1;
// Prevent the intent from ever being written in the future. If set as the
// behavior, a response with found_intent=false implies that an intent will
Expand Down
33 changes: 32 additions & 1 deletion pkg/storage/batcheval/cmd_query_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func QueryIntent(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*roachpb.QueryIntentRequest)
h := cArgs.Header
reply := resp.(*roachpb.QueryIntentResponse)

// Snapshot transactions cannot be prevented using a QueryIntent command.
Expand Down Expand Up @@ -65,7 +66,11 @@ func QueryIntent(
return result.Result{}, err
}

// Determine if the request is querying an intent in its own transaction.
ownTxn := h.Txn != nil && h.Txn.ID == args.Txn.ID

var curIntent *roachpb.Intent
var curIntentPushed bool
switch len(intents) {
case 0:
reply.FoundIntent = false
Expand All @@ -75,8 +80,27 @@ func QueryIntent(
// comparison.
reply.FoundIntent = (args.Txn.ID == curIntent.Txn.ID) &&
(args.Txn.Epoch == curIntent.Txn.Epoch) &&
(!args.Txn.Timestamp.Less(curIntent.Txn.Timestamp)) &&
(args.Txn.Sequence <= curIntent.Txn.Sequence)

// Check whether the intent was pushed past its expected timestamp.
if reply.FoundIntent && args.Txn.Timestamp.Less(curIntent.Txn.Timestamp) {
// The intent matched but was pushed to a later timestamp.
curIntentPushed = true

// If the transaction is SERIALIZABLE, consider a pushed intent as a
// missing intent. If the transaction is SNAPSHOT, don't.
if args.Txn.Isolation == enginepb.SERIALIZABLE {
reply.FoundIntent = false
}

// If the request was querying an intent in its own transaction, update
// the response transaction.
if ownTxn {
clonedTxn := h.Txn.Clone()
reply.Txn = &clonedTxn
reply.Txn.Timestamp.Forward(curIntent.Txn.Timestamp)
}
}
default:
log.Fatalf(ctx, "more than 1 intent on single key: %v", intents)
}
Expand All @@ -86,6 +110,13 @@ func QueryIntent(
case roachpb.QueryIntentRequest_DO_NOTHING:
// Do nothing.
case roachpb.QueryIntentRequest_RETURN_ERROR:
if ownTxn && curIntentPushed {
// If the transaction's own intent was pushed, go ahead and
// return a TransactionRetryError immediately with an updated
// transaction proto. This is an optimization that can help
// the txn use refresh spans more effectively.
return result.Result{}, roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE)
}
return result.Result{}, roachpb.NewIntentMissingError(curIntent)
case roachpb.QueryIntentRequest_PREVENT:
// The intent will be prevented by bumping the timestamp cache for
Expand Down
231 changes: 125 additions & 106 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5791,120 +5791,139 @@ func TestPushTxnSerializableRestart(t *testing.T) {
func TestQueryIntentRequest(t *testing.T) {
defer leaktest.AfterTest(t)()

for _, behavior := range []roachpb.QueryIntentRequest_IfMissingBehavior{
roachpb.QueryIntentRequest_DO_NOTHING,
roachpb.QueryIntentRequest_RETURN_ERROR,
roachpb.QueryIntentRequest_PREVENT,
} {
t.Run(fmt.Sprintf("behavior=%s", behavior), func(t *testing.T) {
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)
for _, iso := range []enginepb.IsolationType{enginepb.SNAPSHOT, enginepb.SERIALIZABLE} {
t.Run(iso.String(), func(t *testing.T) {
for _, behavior := range []roachpb.QueryIntentRequest_IfMissingBehavior{
roachpb.QueryIntentRequest_DO_NOTHING,
roachpb.QueryIntentRequest_RETURN_ERROR,
roachpb.QueryIntentRequest_PREVENT,
} {
if iso == enginepb.SNAPSHOT && behavior == roachpb.QueryIntentRequest_PREVENT {
// Cannot prevent SNAPSHOT transaction with QueryIntent.
continue
}
t.Run(fmt.Sprintf("behavior=%s", behavior), func(t *testing.T) {

key1 := roachpb.Key("a")
key2 := roachpb.Key("b")
txn := newTransaction("test", key1, 1, enginepb.SERIALIZABLE, tc.Clock())
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

pArgs := putArgs(key1, []byte("value1"))
assignSeqNumsForReqs(txn, &pArgs)
if _, pErr := tc.SendWrappedWith(roachpb.Header{Txn: txn}, &pArgs); pErr != nil {
t.Fatal(pErr)
}
key1 := roachpb.Key("a")
key2 := roachpb.Key("b")
txn := newTransaction("test", key1, 1, iso, tc.Clock())
txn2 := newTransaction("test2", key2, 1, iso, tc.Clock())

queryIntent := func(
key []byte,
txnMeta enginepb.TxnMeta,
baTxn *roachpb.Transaction,
expectIntent bool,
) {
t.Helper()
qiArgs := queryIntentArgs(key, txnMeta, behavior)
qiRes, pErr := tc.SendWrappedWith(roachpb.Header{Txn: baTxn}, &qiArgs)
if behavior == roachpb.QueryIntentRequest_RETURN_ERROR && !expectIntent {
if _, ok := pErr.GetDetail().(*roachpb.IntentMissingError); !ok {
t.Fatalf("expected IntentMissingError, found %v", pErr)
pArgs := putArgs(key1, []byte("value1"))
assignSeqNumsForReqs(txn, &pArgs)
if _, pErr := tc.SendWrappedWith(roachpb.Header{Txn: txn}, &pArgs); pErr != nil {
t.Fatal(pErr)
}
} else {

queryIntent := func(
key []byte,
txnMeta enginepb.TxnMeta,
baTxn *roachpb.Transaction,
expectIntent bool,
) {
t.Helper()
qiArgs := queryIntentArgs(key, txnMeta, behavior)
qiRes, pErr := tc.SendWrappedWith(roachpb.Header{Txn: baTxn}, &qiArgs)
if behavior == roachpb.QueryIntentRequest_RETURN_ERROR && !expectIntent {
ownIntent := baTxn != nil && baTxn.ID == txnMeta.ID
if ownIntent && txnMeta.Timestamp.Less(txn.Timestamp) {
if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok {
t.Fatalf("expected TransactionRetryError, found %v %v", txnMeta, pErr)
}
} else {
if _, ok := pErr.GetDetail().(*roachpb.IntentMissingError); !ok {
t.Fatalf("expected IntentMissingError, found %v", pErr)
}
}
} else {
if pErr != nil {
t.Fatal(pErr)
}
if e, a := expectIntent, qiRes.(*roachpb.QueryIntentResponse).FoundIntent; e != a {
t.Fatalf("expected FoundIntent=%t but FoundIntent=%t", e, a)
}
}
}

for _, baTxn := range []*roachpb.Transaction{nil, txn, txn2} {
// Query the intent with the correct txn meta. Should see intent regardless
// of whether we're inside the txn or not.
queryIntent(key1, txn.TxnMeta, baTxn, true)

// Query an intent on a different key for the same transaction. Should not
// see an intent.
queryIntent(key2, txn.TxnMeta, baTxn, false)

// Query an intent on the same key for a different transaction. Should not
// see an intent.
diffIDMeta := txn.TxnMeta
diffIDMeta.ID = txn2.ID
queryIntent(key1, diffIDMeta, baTxn, false)

// Query the intent with a larger epoch. Should not see an intent.
largerEpochMeta := txn.TxnMeta
largerEpochMeta.Epoch++
queryIntent(key1, largerEpochMeta, baTxn, false)

// Query the intent with a smaller epoch. Should not see an intent.
smallerEpochMeta := txn.TxnMeta
smallerEpochMeta.Epoch--
queryIntent(key1, smallerEpochMeta, baTxn, false)

// Query the intent with a larger timestamp. Should see an intent.
// See the comment on QueryIntentRequest.Txn for an explanation of why
// the request behaves like this.
largerTSMeta := txn.TxnMeta
largerTSMeta.Timestamp = largerTSMeta.Timestamp.Next()
queryIntent(key1, largerTSMeta, baTxn, true)

// Query the intent with a smaller timestamp. Should not see an intent
// if transaction is serializable. Should see an intent if the transaction
// is SNAPSHOT.
smallerTSMeta := txn.TxnMeta
smallerTSMeta.Timestamp = smallerTSMeta.Timestamp.Prev()
queryIntent(key1, smallerTSMeta, baTxn, iso == enginepb.SNAPSHOT)

// Query the intent with a larger sequence number. Should not see an intent.
largerSeqMeta := txn.TxnMeta
largerSeqMeta.Sequence++
queryIntent(key1, largerSeqMeta, baTxn, false)

// Query the intent with a smaller sequence number. Should see an intent.
// See the comment on QueryIntentRequest.Txn for an explanation of why
// the request behaves like this.
smallerSeqMeta := txn.TxnMeta
smallerSeqMeta.Sequence--
queryIntent(key1, smallerSeqMeta, baTxn, true)
}

// Perform a write at key2. Depending on the behavior of the queryIntent
// that queried that key, this write should have different results.
pArgs2 := putArgs(key2, []byte("value2"))
assignSeqNumsForReqs(txn, &pArgs2)
ba := roachpb.BatchRequest{}
ba.Header = roachpb.Header{Txn: txn}
ba.Add(&pArgs2)
br, pErr := tc.Sender().Send(context.Background(), ba)
if pErr != nil {
t.Fatal(pErr)
}
if e, a := expectIntent, qiRes.(*roachpb.QueryIntentResponse).FoundIntent; e != a {
t.Fatalf("expected FoundIntent=%T but FoundIntent=%T", e, a)
tsBumped := br.Txn.Timestamp != br.Txn.OrigTimestamp
if behavior == roachpb.QueryIntentRequest_PREVENT {
if !tsBumped {
t.Fatalf("transaction timestamp not bumped: %v", br.Txn)
}
} else {
if tsBumped {
t.Fatalf("unexpected transaction timestamp bumped: %v", br.Txn)
}
}
}
}

for _, baTxn := range []*roachpb.Transaction{nil, txn} {
// Query the intent with the correct txn meta. Should see intent regardless
// of whether we're inside the txn or not.
queryIntent(key1, txn.TxnMeta, baTxn, true)

// Query an intent on a different key for the same transaction. Should not
// see an intent.
queryIntent(key2, txn.TxnMeta, baTxn, false)

// Query an intent on the same key for a different transaction. Should not
// see an intent.
diffIDMeta := txn.TxnMeta
diffIDMeta.ID = uuid.MakeV4()
queryIntent(key1, diffIDMeta, baTxn, false)

// Query the intent with a larger epoch. Should not see an intent.
largerEpochMeta := txn.TxnMeta
largerEpochMeta.Epoch++
queryIntent(key1, largerEpochMeta, baTxn, false)

// Query the intent with a smaller epoch. Should not see an intent.
smallerEpochMeta := txn.TxnMeta
smallerEpochMeta.Epoch--
queryIntent(key1, smallerEpochMeta, baTxn, false)

// Query the intent with a larger timestamp. Should see an intent.
// See the comment on QueryIntentRequest.Txn for an explanation of why
// the request behaves like this.
largerTSMeta := txn.TxnMeta
largerTSMeta.Timestamp = largerTSMeta.Timestamp.Next()
queryIntent(key1, largerTSMeta, baTxn, true)

// Query the intent with a smaller timestamp. Should not see an intent.
smallerTSMeta := txn.TxnMeta
smallerTSMeta.Timestamp = smallerTSMeta.Timestamp.Prev()
queryIntent(key1, smallerTSMeta, baTxn, false)

// Query the intent with a larger sequence number. Should not see an intent.
largerSeqMeta := txn.TxnMeta
largerSeqMeta.Sequence++
queryIntent(key1, largerSeqMeta, baTxn, false)

// Query the intent with a smaller sequence number. Should see an intent.
// See the comment on QueryIntentRequest.Txn for an explanation of why
// the request behaves like this.
smallerSeqMeta := txn.TxnMeta
smallerSeqMeta.Sequence--
queryIntent(key1, smallerSeqMeta, baTxn, true)
}

// Perform a write at key2. Depending on the behavior of the queryIntent
// that queried that key, this write should have different results.
pArgs2 := putArgs(key2, []byte("value2"))
assignSeqNumsForReqs(txn, &pArgs2)
ba := roachpb.BatchRequest{}
ba.Header = roachpb.Header{Txn: txn}
ba.Add(&pArgs2)
br, pErr := tc.Sender().Send(context.Background(), ba)
if pErr != nil {
t.Fatal(pErr)
}
tsBumped := br.Txn.Timestamp != br.Txn.OrigTimestamp
if behavior == roachpb.QueryIntentRequest_PREVENT {
if !tsBumped {
t.Fatalf("transaction timestamp not bumped: %v", br.Txn)
}
} else {
if tsBumped {
t.Fatalf("unexpected transaction timestamp bumped: %v", br.Txn)
}
})
}
})
}
Expand Down

0 comments on commit 6dad7b6

Please sign in to comment.