Skip to content

Commit

Permalink
kv: prevent lease interval regression during expiration-to-epoch prom…
Browse files Browse the repository at this point in the history
…otion

Fixes cockroachdb#121480.
Fixes cockroachdb#122016.

This commit resolves a bug in the expiration-based to epoch-based lease
promotion transition, where the lease's effective expiration could be
allowed to regress. To prevent this, we detect when such cases are about
to occur and synchronously heartbeat the leaseholder's liveness record.
This works because the liveness record interval and the expiration-based
lease interval are the same, so a synchronous heartbeat ensures that the
liveness record has a later expiration than the prior lease by the time
the lease promotion goes into effect.

The code structure here leaves a lot to be desired, but since we're
going to be cleaning up and/or removing a lot of this code soon anyway,
I'm prioritizing backportability. This is therefore more targeted and
less general than it could be.

The resolution here also leaves something to be desired. A nicer fix
would be to introduce a minimum_lease_expiration field on epoch-based
leases so that we can locally ensure that the expiration does not
regress. This is what we plan to do for leader leases in the upcoming
release. We don't make this change because it would be require a version
gate to avoid replica divergence, so it would not be backportable.

Release note (bug fix): Fixed a rare bug where a lease transfer could
lead to a `side-transport update saw closed timestamp regression` panic.
The bug could occur when a node was overloaded and failing to heartbeat
its node liveness record.
  • Loading branch information
nvanbenschoten committed May 2, 2024
1 parent fd8bba4 commit 6dd54b4
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 18 deletions.
82 changes: 82 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/raft"
Expand Down Expand Up @@ -1663,3 +1664,84 @@ func TestLeaseRequestBumpsEpoch(t *testing.T) {
require.Greater(t, liveness.Epoch, prevLease.Epoch)
})
}

// TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration tests that a
// promotion from an expiration-based lease to an epoch-based lease does not
// permit the expiration time of the lease to regress. This is enforced by
// detecting cases where the leaseholder's liveness record's expiration trails
// its expiration-based lease's expiration and synchronously heartbeating the
// leaseholder's liveness record before promoting the lease.
func TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true) // override metamorphism

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
},
})
defer tc.Stopper().Stop(ctx)

// Create scratch range.
key := tc.ScratchRange(t)
desc := tc.LookupRangeOrFatal(t, key)

// Pause n1's node liveness heartbeats, to allow its liveness expiration to
// fall behind.
l0 := tc.Server(0).NodeLiveness().(*liveness.NodeLiveness)
l0.PauseHeartbeatLoopForTest()
l, ok := l0.GetLiveness(tc.Server(0).NodeID())
require.True(t, ok)

// Make sure n1 has an expiration-based lease.
s0 := tc.GetFirstStoreFromServer(t, 0)
repl := s0.LookupReplica(desc.StartKey)
require.NotNil(t, repl)
expLease := repl.CurrentLeaseStatus(ctx)
require.True(t, expLease.IsValid())
require.Equal(t, roachpb.LeaseExpiration, expLease.Lease.Type())

// Wait for the expiration-based lease to have a later expiration than the
// expiration timestamp in n1's liveness record.
testutils.SucceedsSoon(t, func() error {
expLease = repl.CurrentLeaseStatus(ctx)
if expLease.Expiration().Less(l.Expiration.ToTimestamp()) {
return errors.Errorf("lease %v not extended beyond liveness %v", expLease, l)
}
return nil
})

// Enable epoch-based leases. This will cause automatic lease renewal to try
// to promote the expiration-based lease to an epoch-based lease.
//
// Since we have disabled the background node liveness heartbeat loop, it is
// critical that this lease promotion synchronously heartbeats node liveness
// before acquiring the epoch-based lease.
kvserver.ExpirationLeasesOnly.Override(ctx, &s0.ClusterSettings().SV, false)

// Wait for that lease promotion to occur.
var epochLease kvserverpb.LeaseStatus
testutils.SucceedsSoon(t, func() error {
epochLease = repl.CurrentLeaseStatus(ctx)
if epochLease.Lease.Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease %v not upgraded to epoch-based", epochLease)
}
return nil
})

// Once the lease has been promoted to an epoch-based lease, the effective
// expiration (maintained indirectly in the liveness record) must be greater
// than that in the preceding expiration-based lease. If this were to regress,
// a non-cooperative lease failover to a third lease held by a different node
// could overlap in MVCC time with the first lease (i.e. its start time could
// precede expLease.Expiration), violating the lease disjointness property.
//
// If we disable the `expToEpochPromo` branch in replica_range_lease.go, this
// assertion fails.
require.True(t, expLease.Expiration().Less(epochLease.Expiration()))
}
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3236,6 +3236,12 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
RaftConfig: base.RaftConfig{
// We plan to increment the manual clock by MinStatsDuration a few
// times below and would like for leases to not expire. Configure a
// longer lease duration to achieve this.
RangeLeaseDuration: 10 * replicastats.MinStatsDuration,
},
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
Expand All @@ -3253,8 +3259,6 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))

tc.IncrClockForLeaseUpgrade(t, manualClock)
tc.WaitForLeaseUpgrade(ctx, t, desc)

cap, err := s.Capacity(ctx, false /* useCached */)
Expand Down
49 changes: 44 additions & 5 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
ProposedTS: &status.Now,
}

var reqLeaseLiveness livenesspb.Liveness
if p.repl.shouldUseExpirationLeaseRLocked() ||
(transfer &&
TransferExpirationLeasesFirstEnabled.Get(&p.repl.store.ClusterSettings().SV)) {
Expand Down Expand Up @@ -365,6 +366,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
return llHandle
}
reqLease.Epoch = l.Epoch
reqLeaseLiveness = l.Liveness
}

var leaseReq kvpb.Request
Expand Down Expand Up @@ -395,7 +397,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
}
}

err := p.requestLeaseAsync(ctx, nextLeaseHolder, status, leaseReq, limiter)
err := p.requestLeaseAsync(ctx, status, reqLease, reqLeaseLiveness, leaseReq, limiter)
if err != nil {
if errors.Is(err, stop.ErrThrottled) {
llHandle.resolve(kvpb.NewError(err))
Expand Down Expand Up @@ -426,10 +428,14 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
//
// The status argument is used as the expected value for liveness operations.
// leaseReq must be consistent with the LeaseStatus.
//
// The reqLeaseLiveness argument is provided when reqLease is an epoch-based
// lease.
func (p *pendingLeaseRequest) requestLeaseAsync(
parentCtx context.Context,
nextLeaseHolder roachpb.ReplicaDescriptor,
status kvserverpb.LeaseStatus,
reqLease roachpb.Lease,
reqLeaseLiveness livenesspb.Liveness,
leaseReq kvpb.Request,
limiter *quotapool.IntPool,
) error {
Expand Down Expand Up @@ -465,7 +471,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// RPC, but here we submit the request directly to the local replica.
growstack.Grow()

err := p.requestLease(ctx, nextLeaseHolder, status, leaseReq)
err := p.requestLease(ctx, status, reqLease, reqLeaseLiveness, leaseReq)
// Error will be handled below.

// We reset our state below regardless of whether we've gotten an error or
Expand Down Expand Up @@ -505,24 +511,57 @@ var logFailedHeartbeatOwnLiveness = log.Every(10 * time.Second)
// requestLease sends a synchronous transfer lease or lease request to the
// specified replica. It is only meant to be called from requestLeaseAsync,
// since it does not coordinate with other in-flight lease requests.
//
// The reqLeaseLiveness argument is provided when reqLease is an epoch-based
// lease.
func (p *pendingLeaseRequest) requestLease(
ctx context.Context,
nextLeaseHolder roachpb.ReplicaDescriptor,
status kvserverpb.LeaseStatus,
reqLease roachpb.Lease,
reqLeaseLiveness livenesspb.Liveness,
leaseReq kvpb.Request,
) error {
started := timeutil.Now()
defer func() {
p.repl.store.metrics.LeaseRequestLatency.RecordValue(timeutil.Since(started).Nanoseconds())
}()

nextLeaseHolder := reqLease.Replica
extension := status.OwnedBy(nextLeaseHolder.StoreID)

// If we are promoting an expiration-based lease to an epoch-based lease, we
// must make sure the expiration does not regress. We do this here because the
// expiration is stored directly in the lease for expiration-based leases but
// indirectly in liveness record for epoch-based leases. To ensure this, we
// manually heartbeat our liveness record if necessary. This is expected to
// work because the liveness record interval and the expiration-based lease
// interval are the same.
expToEpochPromo := extension && status.Lease.Type() == roachpb.LeaseExpiration && reqLease.Type() == roachpb.LeaseEpoch
if expToEpochPromo && reqLeaseLiveness.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, reqLeaseLiveness)
if err != nil {
if logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err))
}
// Assert that the liveness record expiration is now greater than the
// expiration of the lease we're promoting.
l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID)
if !ok || l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
return errors.AssertionFailedf("expiration of liveness record %s is not greater than "+
"expiration of the previous lease %s after liveness heartbeat", l, status.Lease)
}
}

// If we're replacing an expired epoch-based lease, we must increment the
// epoch of the prior owner to invalidate its leases. If we were the owner,
// then we instead heartbeat to become live.
if status.Lease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED {
var err error
// If this replica is previous & next lease holder, manually heartbeat to become live.
if status.OwnedBy(nextLeaseHolder.StoreID) {
if extension {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil && logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
Expand Down
11 changes: 0 additions & 11 deletions pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,17 +1112,6 @@ func (tc *TestCluster) TransferRangeLeaseOrFatal(
}
}

// IncrClockForLeaseUpgrade run up the clock to force a lease renewal (and thus
// the change in lease types).
func (tc *TestCluster) IncrClockForLeaseUpgrade(
t serverutils.TestFataler, clock *hlc.HybridManualClock,
) {
clock.Increment(
tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() +
time.Second.Nanoseconds(),
)
}

// MaybeWaitForLeaseUpgrade waits until the lease held for the given range
// descriptor is upgraded to an epoch-based one, but only if we expect the lease
// to be upgraded.
Expand Down

0 comments on commit 6dd54b4

Please sign in to comment.