From f78c17a070a2379acb2f0759606a9cdacaff419b Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 27 Mar 2019 20:05:05 -0400 Subject: [PATCH] storage: support no-op PUSH_TIMESTAMP pushes on STAGING transactions Caught while manually testing parallel commits last week. PR #35763 made it an error to resolve an intent with a STAGING status. This isn't a crazy thing to do in every circumstance though. The case where this comes up is a PUSH_TIMESTAMP push on a STAGING transaction whose timestamp is already sufficiently high. In this case, the pusher can move the intent up out of its way without modifying the transaction record or interfering with the parallel commit. Concretely, this is allowed in pushes that hit this case: https://github.com/cockroachdb/cockroach/blob/4ab679d978a8f566c1427b372547380ee012292f/pkg/storage/batcheval/cmd_push_txn.go#L197 To test this, the commit extends `TestStoreResolveWriteIntentPushOnRead` to test scenarios in two different dimensions: PENDING vs. STAGING transaction records and already pushed txns vs. not already pushed txns. To test the latter dimensions, the commit had to refine how far into the future transactions push conflicting intents. Previously, transactions would push them all the way to hlc.Now() on the pushing node so that there was no chance that they would be in their uncertainty window after the push. This was pessimistic. The transaction only needs to push the conflicting intent up to its observed timestamp for the pushing node, which may be significantly lower than the current timestamp on the pushing node. This should increase the number of no-op pushes we see in the wild. Release note: None --- pkg/kv/integration_test.go | 2 +- pkg/storage/batcheval/cmd_resolve_intent.go | 4 - .../batcheval/cmd_resolve_intent_range.go | 4 - pkg/storage/engine/mvcc.go | 20 +- pkg/storage/rangefeed/task.go | 6 +- pkg/storage/store.go | 33 ++- pkg/storage/store_test.go | 253 +++++++++++++----- pkg/storage/testing_knobs.go | 14 +- 8 files changed, 227 insertions(+), 109 deletions(-) diff --git a/pkg/kv/integration_test.go b/pkg/kv/integration_test.go index fe7f72c51097..338b17e5821a 100644 --- a/pkg/kv/integration_test.go +++ b/pkg/kv/integration_test.go @@ -113,7 +113,7 @@ func TestWaiterOnRejectedCommit(t *testing.T) { } return 0, nil }, - TxnWait: txnwait.TestingKnobs{ + TxnWaitKnobs: txnwait.TestingKnobs{ OnPusherBlocked: func(ctx context.Context, push *roachpb.PushTxnRequest) { // We'll trap a reader entering the wait queue for our txn. v := txnID.Load() diff --git a/pkg/storage/batcheval/cmd_resolve_intent.go b/pkg/storage/batcheval/cmd_resolve_intent.go index 4d865a06b3f7..96d16184e4e6 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent.go +++ b/pkg/storage/batcheval/cmd_resolve_intent.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/pkg/errors" ) func init() { @@ -81,9 +80,6 @@ func ResolveIntent( if h.Txn != nil { return result.Result{}, ErrTransactionUnsupported } - if args.Status == roachpb.STAGING { - return result.Result{}, errors.Errorf("cannot resolve intent with STAGING status") - } intent := roachpb.Intent{ Span: args.Span(), diff --git a/pkg/storage/batcheval/cmd_resolve_intent_range.go b/pkg/storage/batcheval/cmd_resolve_intent_range.go index 1385fc3fcada..9ff395861bc2 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent_range.go +++ b/pkg/storage/batcheval/cmd_resolve_intent_range.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/spanset" - "github.com/pkg/errors" ) func init() { @@ -46,9 +45,6 @@ func ResolveIntentRange( if h.Txn != nil { return result.Result{}, ErrTransactionUnsupported } - if args.Status == roachpb.STAGING { - return result.Result{}, errors.Errorf("cannot resolve intent with STAGING status") - } intent := roachpb.Intent{ Span: args.Span(), diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index d3eecbf07180..4d3e31ae56c9 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -2312,7 +2312,7 @@ func mvccResolveWriteIntent( // | restart | | // | write@2 | | // | | resolve@1 | - // ============================ + // ============================= // // In this case, if we required the epochs to match, we would not push the // intent forward, and client B would upon retrying after its successful @@ -2323,9 +2323,15 @@ func mvccResolveWriteIntent( // used for resolving), but that costs latency. // TODO(tschottdorf): various epoch-related scenarios here deserve more // testing. - pushed := intent.Status == roachpb.PENDING && - hlc.Timestamp(meta.Timestamp).Less(intent.Txn.Timestamp) && - meta.Txn.Epoch >= intent.Txn.Epoch + inProgress := !intent.Status.IsFinalized() && meta.Txn.Epoch >= intent.Txn.Epoch + pushed := inProgress && hlc.Timestamp(meta.Timestamp).Less(intent.Txn.Timestamp) + + // There's nothing to do if meta's epoch is greater than or equal txn's + // epoch and the state is still in progress but the intent was not pushed + // to a larger timestamp. + if inProgress && !pushed { + return false, nil + } // If we're committing, or if the commit timestamp of the intent has been moved forward, and if // the proposed epoch matches the existing epoch: update the meta.Txn. For commit, it's set to @@ -2420,12 +2426,6 @@ func mvccResolveWriteIntent( // - writer2 dispatches ResolveIntent to key0 (with epoch 0) // - ResolveIntent with epoch 0 aborts intent from epoch 1. - // There's nothing to do if meta's epoch is greater than or equal txn's epoch - // and the state is still PENDING. - if intent.Status == roachpb.PENDING && meta.Txn.Epoch >= intent.Txn.Epoch { - return false, nil - } - // First clear the intent value. latestKey := MVCCKey{Key: intent.Key, Timestamp: hlc.Timestamp(meta.Timestamp)} if err := engine.Clear(latestKey); err != nil { diff --git a/pkg/storage/rangefeed/task.go b/pkg/storage/rangefeed/task.go index 51e9d131fd7b..e9afb4c4e76b 100644 --- a/pkg/storage/rangefeed/task.go +++ b/pkg/storage/rangefeed/task.go @@ -182,16 +182,14 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { var toCleanup []roachpb.Transaction for i, txn := range pushedTxns { switch txn.Status { - case roachpb.PENDING: - // The transaction is still pending but its timestamp was moved + case roachpb.PENDING, roachpb.STAGING: + // The transaction is still in progress but its timestamp was moved // forward to the current time. Inform the Processor that it can // forward the txn's timestamp in its unresolvedIntentQueue. ops[i].SetValue(&enginepb.MVCCUpdateIntentOp{ TxnID: txn.ID, Timestamp: txn.Timestamp, }) - case roachpb.STAGING: - log.Fatalf(ctx, "unexpected pushed txn with STAGING status: %v", txn) case roachpb.COMMITTED: // The transaction is committed and its timestamp may have moved // forward since we last saw an intent. Inform the Processor diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 28645acd55a8..b9ad7442b335 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2949,6 +2949,9 @@ func (s *Store) Send( pErr = nil case *roachpb.IndeterminateCommitError: + if s.cfg.TestingKnobs.DontRecoverIndeterminateCommits { + return nil, pErr + } // On an indeterminate commit error, attempt to recover and finalize // the stuck transaction. Retry immediately if successful. if _, err := s.recoveryMgr.ResolveIndeterminateCommit(ctx, t); err != nil { @@ -2980,18 +2983,24 @@ func (s *Store) Send( // Make a copy of the header for the upcoming push; we will update // the timestamp. h := ba.Header - // We must push at least to h.Timestamp, but in fact we want to - // go all the way up to a timestamp which was taken off the HLC - // after our operation started. This allows us to not have to - // restart for uncertainty as we come back and read. - h.Timestamp.Forward(now) - // We are going to hand the header (and thus the transaction proto) - // to the RPC framework, after which it must not be changed (since - // that could race). Since the subsequent execution of the original - // request might mutate the transaction, make a copy here. - // - // See #9130. if h.Txn != nil { + // We must push at least to h.Timestamp, but in fact we want to + // go all the way up to a timestamp which was taken off the HLC + // after our operation started. This allows us to not have to + // restart for uncertainty as we come back and read. + obsTS, ok := h.Txn.GetObservedTimestamp(ba.Replica.NodeID) + if !ok { + // This was set earlier in this method, so it's + // completely unexpected to not be found now. + log.Fatalf(ctx, "missing observed timestamp: %+v", h.Txn) + } + h.Timestamp.Forward(obsTS) + // We are going to hand the header (and thus the transaction proto) + // to the RPC framework, after which it must not be changed (since + // that could race). Since the subsequent execution of the original + // request might mutate the transaction, make a copy here. + // + // See #9130. h.Txn = h.Txn.Clone() } // Handle the case where we get more than one write intent error; @@ -4389,7 +4398,7 @@ func (s *Store) setScannerActive(active bool) { // GetTxnWaitKnobs is part of txnwait.StoreInterface. func (s *Store) GetTxnWaitKnobs() txnwait.TestingKnobs { - return s.TestingKnobs().TxnWait + return s.TestingKnobs().TxnWaitKnobs } // GetTxnWaitMetrics is called by txnwait.Queue instances to get a reference to diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index f14b2225911c..e4fc24834c14 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -68,10 +68,10 @@ var testIdent = roachpb.StoreIdent{ func (s *Store) TestSender() client.Sender { return client.Wrap(s, func(ba roachpb.BatchRequest) roachpb.BatchRequest { - rangeID := roachpb.RangeID(1) if ba.RangeID != 0 { return ba } + // If the client hasn't set ba.Range, we do it a favor and figure out the // range to which the request needs to go. // @@ -83,15 +83,18 @@ func (s *Store) TestSender() client.Sender { log.Fatal(context.TODO(), err) } - visitor := newStoreReplicaVisitor(s) - visitor.Visit(func(repl *Replica) bool { - if repl.Desc().ContainsKeyRange(key, key) { - rangeID = repl.RangeID - return false + ba.RangeID = roachpb.RangeID(1) + if repl := s.LookupReplica(key); repl != nil { + ba.RangeID = repl.RangeID + + // Attempt to assign a Replica descriptor to the batch if + // necessary, but don't throw an error if this fails. + if ba.Replica == (roachpb.ReplicaDescriptor{}) { + if desc, err := repl.GetReplicaDescriptor(); err == nil { + ba.Replica = desc + } } - return true - }) - ba.RangeID = rangeID + } return ba }) } @@ -1646,83 +1649,197 @@ func TestStoreResolveWriteIntentRollback(t *testing.T) { } // TestStoreResolveWriteIntentPushOnRead verifies that resolving a write intent -// for a read will push the timestamp. On failure to push, verify a write -// intent error is returned with !Resolvable. +// for a read will push the timestamp. It tests this along a few dimensions: +// - high-priority pushes vs. low-priority pushes +// - already pushed pushee txns vs. not already pushed pushee txns +// - PENDING pushee txn records vs. STAGING pushee txn records func TestStoreResolveWriteIntentPushOnRead(t *testing.T) { defer leaktest.AfterTest(t)() storeCfg := TestStoreConfig(nil) storeCfg.TestingKnobs.DontRetryPushTxnFailures = true + storeCfg.TestingKnobs.DontRecoverIndeterminateCommits = true stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) store := createTestStoreWithConfig(t, stopper, testStoreOpts{createSystemRanges: true}, &storeCfg) - for i, resolvable := range []bool{true, false} { - key := roachpb.Key(fmt.Sprintf("key-%d", i)) - pusher := newTransaction("test", key, 1, store.cfg.Clock) - pushee := newTransaction("test", key, 1, store.cfg.Clock) - - if resolvable { - pushee.Priority = enginepb.MinTxnPriority - pusher.Priority = enginepb.MaxTxnPriority // Pusher will win. - } else { - pushee.Priority = enginepb.MaxTxnPriority - pusher.Priority = enginepb.MinTxnPriority // Pusher will lose. - } - // First, write original value. + testCases := []struct { + pusherWillWin bool // if true, pusher will have a high enough priority to push the pushee + pusheeAlreadyPushed bool // if true, pushee's timestamp will be set above pusher's target timestamp + pusheeStagingRecord bool // if true, pushee's record is STAGING, otherwise PENDING + expPushError string // regexp pattern to match on run error, if not empty + expPusheeRetry bool // do we expect the pushee to hit a retry error when committing? + }{ { - args := putArgs(key, []byte("value1")) - if _, pErr := client.SendWrapped(context.Background(), store.TestSender(), &args); pErr != nil { - t.Fatal(pErr) + // Insufficient priority to push. + pusherWillWin: false, + pusheeAlreadyPushed: false, + pusheeStagingRecord: false, + expPushError: "failed to push", + expPusheeRetry: false, + }, + { + // Successful push. + pusherWillWin: true, + pusheeAlreadyPushed: false, + pusheeStagingRecord: false, + expPushError: "", + expPusheeRetry: true, + }, + { + // Already pushed, no-op. + pusherWillWin: false, + pusheeAlreadyPushed: true, + pusheeStagingRecord: false, + expPushError: "", + expPusheeRetry: false, + }, + { + // Already pushed, no-op. + pusherWillWin: true, + pusheeAlreadyPushed: true, + pusheeStagingRecord: false, + expPushError: "", + expPusheeRetry: false, + }, + { + // Insufficient priority to push. + pusherWillWin: false, + pusheeAlreadyPushed: false, + pusheeStagingRecord: true, + expPushError: "failed to push", + expPusheeRetry: false, + }, + { + // Cannot push STAGING txn record. + pusherWillWin: true, + pusheeAlreadyPushed: false, + pusheeStagingRecord: true, + expPushError: "found txn in indeterminate STAGING state", + expPusheeRetry: false, + }, + { + // Already pushed the STAGING record, no-op. + pusherWillWin: false, + pusheeAlreadyPushed: true, + pusheeStagingRecord: true, + expPushError: "", + expPusheeRetry: false, + }, + { + // Already pushed the STAGING record, no-op. + pusherWillWin: true, + pusheeAlreadyPushed: true, + pusheeStagingRecord: true, + expPushError: "", + expPusheeRetry: false, + }, + } + for _, tc := range testCases { + name := fmt.Sprintf("pusherWillWin=%t,pusheePushed=%t,pusheeStaging=%t", + tc.pusherWillWin, tc.pusheeAlreadyPushed, tc.pusheeStagingRecord) + t.Run(name, func(t *testing.T) { + ctx := context.Background() + key := roachpb.Key(fmt.Sprintf("key-%s", name)) + pusher := newTransaction("pusher", key, 1, store.cfg.Clock) + pushee := newTransaction("pushee", key, 1, store.cfg.Clock) + + // Set transaction priorities. + if tc.pusherWillWin { + pushee.Priority = enginepb.MinTxnPriority + pusher.Priority = enginepb.MaxTxnPriority // Pusher will win. + } else { + pushee.Priority = enginepb.MaxTxnPriority + pusher.Priority = enginepb.MinTxnPriority // Pusher will lose. } - } - // Second, lay down intent using the pushee's txn. - { - args := putArgs(key, []byte("value2")) - assignSeqNumsForReqs(pushee, &args) - if _, pErr := client.SendWrappedWith( - context.Background(), store.TestSender(), roachpb.Header{Txn: pushee}, &args, - ); pErr != nil { - t.Fatal(pErr) + // First, write original value. + { + args := putArgs(key, []byte("value1")) + if _, pErr := client.SendWrapped(ctx, store.TestSender(), &args); pErr != nil { + t.Fatal(pErr) + } } - } - // Now, try to read value using the pusher's txn. - now := store.Clock().Now() - pusher.OrigTimestamp.Forward(now) - pusher.Timestamp.Forward(now) - gArgs := getArgs(key) - assignSeqNumsForReqs(pusher, &gArgs) - firstReply, pErr := client.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Txn: pusher}, &gArgs) - if resolvable { - if pErr != nil { - t.Errorf("%d: expected read to succeed: %s", i, pErr) - } else if replyBytes, err := firstReply.(*roachpb.GetResponse).Value.GetBytes(); err != nil { - t.Fatal(err) - } else if !bytes.Equal(replyBytes, []byte("value1")) { - t.Errorf("%d: expected bytes to be %q, got %q", i, "value1", replyBytes) + // Second, lay down intent using the pushee's txn. + { + args := putArgs(key, []byte("value2")) + assignSeqNumsForReqs(pushee, &args) + if _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: pushee}, &args); pErr != nil { + t.Fatal(pErr) + } } - // Finally, try to end the pushee's transaction; if we have - // SNAPSHOT isolation, the commit should work: verify the txn - // commit timestamp is greater than pusher's Timestamp. - // Otherwise, verify commit fails with TransactionRetryError. - etArgs, h := endTxnArgs(pushee, true) - assignSeqNumsForReqs(pushee, &etArgs) - _, cErr := client.SendWrappedWith(context.Background(), store.TestSender(), h, &etArgs) - if _, ok := cErr.GetDetail().(*roachpb.TransactionRetryError); !ok { - t.Errorf("expected transaction retry error; got %s", cErr) + // Determine the timestamp to read at. + readTs := store.cfg.Clock.Now() + // Give the pusher a previous observed timestamp equal to this read + // timestamp. This ensures that the pusher doesn't need to push the + // intent any higher just to push it out of its uncertainty window. + pusher.UpdateObservedTimestamp(store.Ident.NodeID, readTs) + + // If the pushee is already pushed, update the transaction record. + if tc.pusheeAlreadyPushed { + pushedTs := store.cfg.Clock.Now() + pushee.Timestamp.Forward(pushedTs) + pushee.RefreshedTimestamp.Forward(pushedTs) + hb, hbH := heartbeatArgs(pushee, store.cfg.Clock.Now()) + if _, pErr := client.SendWrappedWith(ctx, store.TestSender(), hbH, &hb); pErr != nil { + t.Fatal(pErr) + } } - } else { - // Verify we receive a transaction retry error (because we max out - // retries). - if pErr == nil { - t.Errorf("expected read to fail") + + // If the pushee is staging, update the transaction record. + if tc.pusheeStagingRecord { + // TODO(nvanbenschoten): Avoid writing directly to the engine once + // there's a way to create a STAGING transaction record. + txnKey := keys.TransactionKey(pushee.Key, pushee.ID) + txnRecord := pushee.AsRecord() + txnRecord.Status = roachpb.STAGING + if err := engine.MVCCPutProto(ctx, store.Engine(), nil, txnKey, hlc.Timestamp{}, nil, &txnRecord); err != nil { + t.Fatal(err) + } } - if _, ok := pErr.GetDetail().(*roachpb.TransactionPushError); !ok { - t.Errorf("expected transaction push error; got %T", pErr.GetDetail()) + + // Now, try to read value using the pusher's txn. + pusher.OrigTimestamp.Forward(readTs) + pusher.Timestamp.Forward(readTs) + gArgs := getArgs(key) + assignSeqNumsForReqs(pusher, &gArgs) + repl, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: pusher}, &gArgs) + if tc.expPushError == "" { + if pErr != nil { + t.Errorf("expected read to succeed: %s", pErr) + } else if replyBytes, err := repl.(*roachpb.GetResponse).Value.GetBytes(); err != nil { + t.Fatal(err) + } else if !bytes.Equal(replyBytes, []byte("value1")) { + t.Errorf("expected bytes to be %q, got %q", "value1", replyBytes) + } + } else { + if !testutils.IsPError(pErr, tc.expPushError) { + t.Fatalf("expected error %q, found %v", tc.expPushError, pErr) + } } - } + + // Finally, try to end the pushee's transaction. Check whether + // the commit succeeds or fails. + etArgs, etH := endTxnArgs(pushee, true) + assignSeqNumsForReqs(pushee, &etArgs) + _, pErr = client.SendWrappedWith(ctx, store.TestSender(), etH, &etArgs) + if tc.pusheeStagingRecord { + // TODO(nvanbenschoten): We don't support committing STAGING + // transaction records yet. This will need to change once we do. + if !testutils.IsPError(pErr, "TransactionStatusError: bad txn status") { + t.Fatal(pErr) + } + } else if tc.expPusheeRetry { + if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok { + t.Errorf("expected transaction retry error; got %s", pErr) + } + } else { + if pErr != nil { + t.Fatalf("expected no commit error; got %s", pErr) + } + } + }) } } diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 6259b0caf075..9fb9fc4ffd2a 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -31,8 +31,10 @@ import ( // particular point is reached) or to change the behavior by returning // an error (which aborts all further processing for the command). type StoreTestingKnobs struct { - EvalKnobs storagebase.BatchEvalTestingKnobs - IntentResolverKnobs storagebase.IntentResolverTestingKnobs + EvalKnobs storagebase.BatchEvalTestingKnobs + IntentResolverKnobs storagebase.IntentResolverTestingKnobs + TxnWaitKnobs txnwait.TestingKnobs + ConsistencyTestingKnobs ConsistencyTestingKnobs // TestingRequestFilter is called before evaluating each command on a // replica. The filter is run before the request acquires latches, so @@ -172,14 +174,14 @@ type StoreTestingKnobs struct { SystemLogsGCPeriod time.Duration // SystemLogsGCGCDone is used to notify when system logs GC is done. SystemLogsGCGCDone chan<- struct{} - // TxnWait contains knobs for txnwait.Queue instances. - TxnWait txnwait.TestingKnobs // DontRetryPushTxnFailures will propagate a push txn failure immediately // instead of utilizing the txn wait queue to wait for the transaction to // finish or be pushed by a higher priority contender. DontRetryPushTxnFailures bool - - ConsistencyTestingKnobs ConsistencyTestingKnobs + // DontRecoverIndeterminateCommits will propagate indeterminate commit + // errors from failed txn pushes immediately instead of utilizing the txn + // recovery manager to recovery from the indeterminate state. + DontRecoverIndeterminateCommits bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.