Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: support no-op PUSH_TIMESTAMP pushes on STAGING transactions #36253

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
}
return 0, nil
},
TxnWait: txnwait.TestingKnobs{
TxnWaitKnobs: txnwait.TestingKnobs{
OnPusherBlocked: func(ctx context.Context, push *roachpb.PushTxnRequest) {
// We'll trap a reader entering the wait queue for our txn.
v := txnID.Load()
Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
)

func init() {
Expand Down Expand Up @@ -81,9 +80,6 @@ func ResolveIntent(
if h.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
}
if args.Status == roachpb.STAGING {
return result.Result{}, errors.Errorf("cannot resolve intent with STAGING status")
}

intent := roachpb.Intent{
Span: args.Span(),
Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/pkg/errors"
)

func init() {
Expand All @@ -46,9 +45,6 @@ func ResolveIntentRange(
if h.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
}
if args.Status == roachpb.STAGING {
return result.Result{}, errors.Errorf("cannot resolve intent with STAGING status")
}

intent := roachpb.Intent{
Span: args.Span(),
Expand Down
20 changes: 10 additions & 10 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2312,7 +2312,7 @@ func mvccResolveWriteIntent(
// | restart | |
// | write@2 | |
// | | resolve@1 |
// ============================
// =============================
//
// In this case, if we required the epochs to match, we would not push the
// intent forward, and client B would upon retrying after its successful
Expand All @@ -2323,9 +2323,15 @@ func mvccResolveWriteIntent(
// used for resolving), but that costs latency.
// TODO(tschottdorf): various epoch-related scenarios here deserve more
// testing.
pushed := intent.Status == roachpb.PENDING &&
hlc.Timestamp(meta.Timestamp).Less(intent.Txn.Timestamp) &&
meta.Txn.Epoch >= intent.Txn.Epoch
inProgress := !intent.Status.IsFinalized() && meta.Txn.Epoch >= intent.Txn.Epoch
pushed := inProgress && hlc.Timestamp(meta.Timestamp).Less(intent.Txn.Timestamp)

// There's nothing to do if meta's epoch is greater than or equal txn's
// epoch and the state is still in progress but the intent was not pushed
// to a larger timestamp.
if inProgress && !pushed {
return false, nil
}

// If we're committing, or if the commit timestamp of the intent has been moved forward, and if
// the proposed epoch matches the existing epoch: update the meta.Txn. For commit, it's set to
Expand Down Expand Up @@ -2420,12 +2426,6 @@ func mvccResolveWriteIntent(
// - writer2 dispatches ResolveIntent to key0 (with epoch 0)
// - ResolveIntent with epoch 0 aborts intent from epoch 1.

// There's nothing to do if meta's epoch is greater than or equal txn's epoch
// and the state is still PENDING.
if intent.Status == roachpb.PENDING && meta.Txn.Epoch >= intent.Txn.Epoch {
return false, nil
}

// First clear the intent value.
latestKey := MVCCKey{Key: intent.Key, Timestamp: hlc.Timestamp(meta.Timestamp)}
if err := engine.Clear(latestKey); err != nil {
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,14 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
var toCleanup []roachpb.Transaction
for i, txn := range pushedTxns {
switch txn.Status {
case roachpb.PENDING:
// The transaction is still pending but its timestamp was moved
case roachpb.PENDING, roachpb.STAGING:
// The transaction is still in progress but its timestamp was moved
// forward to the current time. Inform the Processor that it can
// forward the txn's timestamp in its unresolvedIntentQueue.
ops[i].SetValue(&enginepb.MVCCUpdateIntentOp{
TxnID: txn.ID,
Timestamp: txn.Timestamp,
})
case roachpb.STAGING:
log.Fatalf(ctx, "unexpected pushed txn with STAGING status: %v", txn)
case roachpb.COMMITTED:
// The transaction is committed and its timestamp may have moved
// forward since we last saw an intent. Inform the Processor
Expand Down
33 changes: 21 additions & 12 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2949,6 +2949,9 @@ func (s *Store) Send(
pErr = nil

case *roachpb.IndeterminateCommitError:
if s.cfg.TestingKnobs.DontRecoverIndeterminateCommits {
return nil, pErr
}
// On an indeterminate commit error, attempt to recover and finalize
// the stuck transaction. Retry immediately if successful.
if _, err := s.recoveryMgr.ResolveIndeterminateCommit(ctx, t); err != nil {
Expand Down Expand Up @@ -2980,18 +2983,24 @@ func (s *Store) Send(
// Make a copy of the header for the upcoming push; we will update
// the timestamp.
h := ba.Header
// We must push at least to h.Timestamp, but in fact we want to
// go all the way up to a timestamp which was taken off the HLC
// after our operation started. This allows us to not have to
// restart for uncertainty as we come back and read.
h.Timestamp.Forward(now)
// We are going to hand the header (and thus the transaction proto)
// to the RPC framework, after which it must not be changed (since
// that could race). Since the subsequent execution of the original
// request might mutate the transaction, make a copy here.
//
// See #9130.
if h.Txn != nil {
// We must push at least to h.Timestamp, but in fact we want to
// go all the way up to a timestamp which was taken off the HLC
// after our operation started. This allows us to not have to
// restart for uncertainty as we come back and read.
obsTS, ok := h.Txn.GetObservedTimestamp(ba.Replica.NodeID)
if !ok {
// This was set earlier in this method, so it's
// completely unexpected to not be found now.
log.Fatalf(ctx, "missing observed timestamp: %+v", h.Txn)
}
h.Timestamp.Forward(obsTS)
// We are going to hand the header (and thus the transaction proto)
// to the RPC framework, after which it must not be changed (since
// that could race). Since the subsequent execution of the original
// request might mutate the transaction, make a copy here.
//
// See #9130.
h.Txn = h.Txn.Clone()
}
// Handle the case where we get more than one write intent error;
Expand Down Expand Up @@ -4389,7 +4398,7 @@ func (s *Store) setScannerActive(active bool) {

// GetTxnWaitKnobs is part of txnwait.StoreInterface.
func (s *Store) GetTxnWaitKnobs() txnwait.TestingKnobs {
return s.TestingKnobs().TxnWait
return s.TestingKnobs().TxnWaitKnobs
}

// GetTxnWaitMetrics is called by txnwait.Queue instances to get a reference to
Expand Down
Loading