Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rangefeed: fix premature checkpoint due to intent resolution race #114487

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,9 @@ message LeaseInfoResponse{
// still not evaluated by the node it was sent to if that node's replica is a
// learner or the node doesn't have a replica at all.
int32 evaluated_by = 4 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID"];
// LeaseAppliedIndex is the lease applied index of the last applied command at
// the time that the LeaseInfo request is executed.
uint64 lease_applied_index = 5 [(gogoproto.casttype) = "LeaseAppliedIndex"];
}

// A RequestLeaseResponse is the response to a RequestLease() or TransferLease()
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ go_test(
"cmd_export_test.go",
"cmd_get_test.go",
"cmd_is_span_empty_test.go",
"cmd_lease_info_test.go",
"cmd_lease_test.go",
"cmd_query_intent_test.go",
"cmd_query_resolved_timestamp_test.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_lease_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ func LeaseInfo(
reply.Lease = lease
}
reply.EvaluatedBy = cArgs.EvalCtx.StoreID()
reply.LeaseAppliedIndex = cArgs.EvalCtx.GetLeaseAppliedIndex()
return result.Result{}, nil
}
69 changes: 69 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_lease_info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestLeaseInfo(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
lease := roachpb.Lease{Sequence: 1}
nextLease := roachpb.Lease{Sequence: 2}
storeID := roachpb.StoreID(7)
lai := kvpb.LeaseAppliedIndex(314)

// Test with and without pending lease transfer.
testutils.RunTrueAndFalse(t, "transfer", func(t *testing.T, transfer bool) {
evalCtx := batcheval.MockEvalCtx{
Lease: lease,
StoreID: storeID,
LeaseAppliedIndex: lai,
}
if transfer {
evalCtx.NextLease = nextLease
}

var resp kvpb.LeaseInfoResponse
_, err := batcheval.LeaseInfo(ctx, nil /* reader */, batcheval.CommandArgs{
EvalCtx: evalCtx.EvalContext(),
Args: &kvpb.LeaseInfoRequest{},
}, &resp)
require.NoError(t, err)

if transfer {
require.Equal(t, kvpb.LeaseInfoResponse{
Lease: nextLease,
CurrentLease: &lease,
LeaseAppliedIndex: lai,
EvaluatedBy: storeID,
}, resp)
} else {
require.Equal(t, kvpb.LeaseInfoResponse{
Lease: lease,
LeaseAppliedIndex: lai,
EvaluatedBy: storeID,
}, resp)
}
})
}
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ type MockEvalCtx struct {
CanCreateTxnRecordFn func() (bool, kvpb.TransactionAbortedReason)
MinTxnCommitTSFn func() hlc.Timestamp
Lease roachpb.Lease
NextLease roachpb.Lease
LeaseAppliedIndex kvpb.LeaseAppliedIndex
CurrentReadSummary rspb.ReadSummary
ClosedTimestamp hlc.Timestamp
RevokedLeaseSeq roachpb.LeaseSequence
Expand Down Expand Up @@ -240,7 +242,7 @@ func (m *mockEvalCtxImpl) GetTerm(kvpb.RaftIndex) (kvpb.RaftTerm, error) {
return m.Term, nil
}
func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex {
panic("unimplemented")
return m.LeaseAppliedIndex
}
func (m *mockEvalCtxImpl) Desc() *roachpb.RangeDescriptor {
return m.MockEvalCtx.Desc
Expand Down Expand Up @@ -283,7 +285,7 @@ func (m *mockEvalCtxImpl) GetLastReplicaGCTimestamp(context.Context) (hlc.Timest
panic("unimplemented")
}
func (m *mockEvalCtxImpl) GetLease() (roachpb.Lease, roachpb.Lease) {
return m.Lease, roachpb.Lease{}
return m.Lease, m.NextLease
}
func (m *mockEvalCtxImpl) GetRangeInfo(ctx context.Context) roachpb.RangeInfo {
return roachpb.RangeInfo{Desc: *m.Desc(), Lease: m.Lease}
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool {
return rts.intentQ.UpdateTS(t.TxnID, t.Timestamp)

case *enginepb.MVCCCommitIntentOp:
// NB: this assertion can be violated in mixed-version clusters, we choose
// to trip the assertion rather than violate checkpoint guarantees.
// See: https://github.com/cockroachdb/cockroach/issues/104309
//
// TODO(erikgrinaker): check that this won't end up with crash loops.
rts.assertOpAboveRTS(op, t.Timestamp)
return rts.intentQ.DecrRef(t.TxnID, t.Timestamp)

case *enginepb.MVCCAbortIntentOp:
Expand Down
36 changes: 36 additions & 0 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ type TxnPusher interface {
PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
// ResolveIntents resolves the specified intents.
ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error
// GetLeaseholderLAI fetches the leaseholder's current lease applied index.
GetLeaseholderLAI(ctx context.Context) (kvpb.LeaseAppliedIndex, error)
// GetReplicaLAI fetches the local replica's current lease applied index.
GetReplicaLAI(ctx context.Context) kvpb.LeaseAppliedIndex
}

// txnPushAttempt pushes all old transactions that have unresolved intents on
Expand Down Expand Up @@ -261,6 +265,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
// Inform the Processor of the results of the push for each transaction.
ops := make([]enginepb.MVCCLogicalOp, len(pushedTxns))
var intentsToCleanup []roachpb.LockUpdate
var sawAborted bool
for i, txn := range pushedTxns {
switch txn.Status {
case roachpb.PENDING, roachpb.STAGING:
Expand Down Expand Up @@ -303,6 +308,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
ops[i].SetValue(&enginepb.MVCCAbortTxnOp{
TxnID: txn.ID,
})
sawAborted = true

// We just informed the Processor about this txn being aborted, so from
// its perspective, there's nothing more to do — the txn's intents are no
Expand All @@ -320,6 +326,36 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
}
}

// It's possible that the ABORTED state is a false negative, where the
// transaction was in fact committed but the txn record has been removed after
// resolving all intents (see batcheval.SynthesizeTxnFromMeta and
// Replica.CanCreateTxnRecord). If we are not the leaseholder, then we must
// make sure all resolved intents have been applied on the local replica and
// emitted across the rangefeed before we emit the MVCCAbortTxnOp, otherwise
// we can prematurely advance the resolved ts before the committed intents.
// See: https://github.com/cockroachdb/cockroach/issues/104309
//
// MVCC logical ops are processed before advancing the LAI, so by the time the
// local replica reaches the leaseholder's LAI it will have enqueued the
// resolved intents in the rangefeed processor's queue. These updates may not
// yet have been applied to the resolved timestamp intent queue, but that's
// ok -- our MVCCAbortTxnOp will be enqueued and processed after them.
if sawAborted {
leaseholderLAI, err := a.pusher.GetLeaseholderLAI(ctx)
if err != nil {
return err
}
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for a.pusher.GetReplicaLAI(ctx) < leaseholderLAI {
select {
case <-ticker.C:
case <-ctx.Done():
return ctx.Err()
}
}
}

// Inform the processor of all logical ops.
a.p.sendEvent(ctx, event{ops: ops}, 0)

Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -374,6 +375,14 @@ func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.L
return tp.resolveIntentsFn(ctx, intents)
}

func (tp *testTxnPusher) GetLeaseholderLAI(ctx context.Context) (kvpb.LeaseAppliedIndex, error) {
return 0, nil
}

func (tp *testTxnPusher) GetReplicaLAI(ctx context.Context) kvpb.LeaseAppliedIndex {
return 0
}

func (tp *testTxnPusher) mockPushTxns(
fn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error),
) {
Expand Down
51 changes: 51 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -182,6 +183,56 @@ func (tp *rangefeedTxnPusher) ResolveIntents(
).GoError()
}

// GetLeaseholderLAI is part of the rangefeed.TxnPusher interface.
func (tp *rangefeedTxnPusher) GetLeaseholderLAI(
ctx context.Context,
) (kvpb.LeaseAppliedIndex, error) {
// Fast path for local leaseholder.
if lai := tp.getLocalLeaseLAI(ctx); lai > 0 {
return lai, nil
}

var ba kvpb.BatchRequest
ba.Add(&kvpb.LeaseInfoRequest{
RequestHeader: kvpb.RequestHeader{
Key: tp.r.Desc().StartKey.AsRawKey(),
},
})

sender := tp.r.store.db.NonTransactionalSender()
br, pErr := sender.Send(ctx, &ba)
if pErr != nil {
return 0, pErr.GoError()
}
resp := br.Responses[0].GetLeaseInfo()
if resp == nil {
return 0, errors.AssertionFailedf("invalid LeaseInfo response: %s", br)
}
if resp.LeaseAppliedIndex == 0 {
// LeaseInfoResponse.LeaseAppliedIndex was added in a 23.2 backport, so
// older nodes may not set it. Return a 0 LAI in that case.
if tp.r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V24_1) {
return 0, errors.AssertionFailedf("leaseholder did not return a lease applied index")
}
}
return resp.LeaseAppliedIndex, nil
}

func (tp *rangefeedTxnPusher) getLocalLeaseLAI(ctx context.Context) kvpb.LeaseAppliedIndex {
now := tp.r.Clock().NowAsClockTimestamp()
tp.r.mu.RLock()
defer tp.r.mu.RUnlock()
if tp.r.ownsValidLeaseRLocked(ctx, now) {
return tp.r.mu.state.LeaseAppliedIndex
}
return 0
}

// GetReplicaLAI is part of the rangefeed.TxnPusher interface.
func (tp *rangefeedTxnPusher) GetReplicaLAI(ctx context.Context) kvpb.LeaseAppliedIndex {
return tp.r.GetLeaseAppliedIndex()
}

// RangeFeed registers a rangefeed over the specified span. It sends updates to
// the provided stream and returns with a future error when the rangefeed is
// complete. The surrounding store's ConcurrentRequestLimiter is used to limit
Expand Down