Skip to content

Commit

Permalink
storage: force new lease sequence number after restart
Browse files Browse the repository at this point in the history
While we were already making sure that a lease obtained before a node
restart was not used after, the new requested lease would usually be an
extension of the old. As such, commands proposed under both would be
able to apply under the new one, which could theoretically cause
consistency issues as the previous commands would not be tracked by the
command queue (though it would be hard to engineer and has not been
observed in practice, to the best of our knowledge).

This change plugs that hole by preventing an extension of a previously
held lease post restart.

Touches cockroachdb#10420.

Release note (bug fix): Prevent potential consistency issues when a node
is stopped and restarted in rapid succession.
  • Loading branch information
tbg committed Mar 6, 2018
1 parent d4d5065 commit e91c50b
Show file tree
Hide file tree
Showing 8 changed files with 790 additions and 651 deletions.
1,252 changes: 654 additions & 598 deletions pkg/roachpb/api.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,10 @@ message RequestLeaseRequest {
// The previous lease is specified by the caller to verify
// it has not changed when executing this command.
Lease prev_lease = 3 [(gogoproto.nullable) = false];
// The MinLeaseProposedTS of the proposing replica to make sure that leases
// issued after a node restart receive a new sequence number (instead of
// counting as a lease extension). See #23204.
util.hlc.Timestamp min_proposed_ts = 4 [(gogoproto.customname) = "MinProposedTS"];
}

// A TransferLeaseRequest represents the arguments to the TransferLease()
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func RequestLease(
if newLease.DeprecatedStartStasis == nil {
newLease.DeprecatedStartStasis = newLease.Expiration
}

isExtension := prevLease.Replica.StoreID == newLease.Replica.StoreID
effectiveStart := newLease.Start

Expand All @@ -72,6 +73,25 @@ func RequestLease(
// the absence of replay protection.
if prevLease.Replica.StoreID == 0 || isExtension {
effectiveStart.Backward(prevLease.Start)
// If the lease holder promised to not propose any commands below
// MinProposedTS, it must also not be allowed to extend a lease before that
// timestamp. We make sure that when a node restarts, its earlier in-flight
// commands (which are not tracked by the command queue post restart)
// receive an error under the new lease by making sure the sequence number
// of that lease is higher. This in turn is achieved by forwarding its start
// time here, which makes it not Equivalent() to the preceding lease for the
// same store.
//
// Note also that leastPostApply makes sure to update the timestamp cache in
// this case: even though the lease holder does not change, the the sequence
// number does and this triggers a low water mark bump.
//
// The bug prevented with this is unlikely to occur in practice
// since earlier commands usually apply before this lease will.
if ts := args.MinProposedTS; isExtension && ts != nil {
effectiveStart.Forward(*ts)
}

} else if prevLease.Type() == roachpb.LeaseExpiration {
effectiveStart.Backward(prevLease.Expiration.Next())
}
Expand Down
29 changes: 27 additions & 2 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,8 @@ func TestLeaseMetricsOnSplitAndTransfer(t *testing.T) {
// See replica.mu.minLeaseProposedTS for the reasons why this isn't allowed.
func TestLeaseNotUsedAfterRestart(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

sc := storage.TestStoreConfig(nil)
var leaseAcquisitionTrap atomic.Value
// Disable the split queue so that no ranges are split. This makes it easy
Expand All @@ -965,10 +967,18 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) {

// Send a read, to acquire a lease.
getArgs := getArgs([]byte("a"))
if _, err := client.SendWrapped(context.Background(), rg1(mtc.stores[0]), getArgs); err != nil {
if _, err := client.SendWrapped(ctx, rg1(mtc.stores[0]), getArgs); err != nil {
t.Fatal(err)
}

preRepl1, err := mtc.stores[0].GetReplica(1)
if err != nil {
t.Fatal(err)
}
preRestartLease, _ := preRepl1.GetLease()

mtc.manualClock.Increment(1E9)

// Restart the mtc. Before we do that, we're installing a callback used to
// assert that a new lease has been requested. The callback is installed
// before the restart, as the lease might be requested at any time and for
Expand All @@ -980,11 +990,13 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) {
close(leaseAcquisitionCh)
})
})

log.Info(ctx, "restarting")
mtc.restart()

// Send another read and check that the pre-existing lease has not been used.
// Concretely, we check that a new lease is requested.
if _, err := client.SendWrapped(context.Background(), rg1(mtc.stores[0]), getArgs); err != nil {
if _, err := client.SendWrapped(ctx, rg1(mtc.stores[0]), getArgs); err != nil {
t.Fatal(err)
}
// Check that the Send above triggered a lease acquisition.
Expand All @@ -993,6 +1005,19 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) {
case <-time.After(time.Second):
t.Fatalf("read did not acquire a new lease")
}

postRepl1, err := mtc.stores[0].GetReplica(1)
if err != nil {
t.Fatal(err)
}
postRestartLease, _ := postRepl1.GetLease()

// Verify that not only is a new lease requested, it also gets a new sequence
// number. This makes sure that previously proposed commands actually fail at
// apply time.
if preRestartLease.Sequence == postRestartLease.Sequence {
t.Fatalf("lease was not replaced:\nprev: %v\nnow: %v", preRestartLease, postRestartLease)
}
}

// Test that a lease extension (a RequestLeaseRequest that doesn't change the
Expand Down
24 changes: 18 additions & 6 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,19 +700,31 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(
// reloading the raft state below, it isn't safe to use the existing raft
// group.
r.mu.internalRaftGroup = nil

var err error

if r.mu.state, err = r.mu.stateLoader.Load(ctx, r.store.Engine(), desc); err != nil {
return err
}

// Init the minLeaseProposedTS such that we won't use an existing lease (if
// any). This is so that, after a restart, we don't propose under old leases.
// If the replica is being created through a split, this value will be
// overridden.
if !r.store.cfg.TestingKnobs.DontPreventUseOfOldLeaseOnStart {
r.mu.minLeaseProposedTS = clock.Now()
// Only do this if there was a previous lease. This shouldn't be important
// to do but consider that the first lease which is obtained is back-dated
// to a zero start timestamp (and this de-flakes some tests). If we set the
// min proposed TS here, this lease could not be renewed (by the semantics
// of minLeaseProposedTS); and since minLeaseProposedTS is copied on splits,
// this problem would multiply to a number of replicas at cluster bootstrap.
// Instead, we make the first lease special (which is OK) and the problem
// disappears.
if r.mu.state.Lease.Sequence > 0 {
r.mu.minLeaseProposedTS = clock.Now()
}
}

var err error

if r.mu.state, err = r.mu.stateLoader.Load(ctx, r.store.Engine(), desc); err != nil {
return err
}
r.rangeStr.store(0, r.mu.state.Desc)

r.mu.lastIndex, err = r.mu.stateLoader.LoadLastIndex(ctx, r.store.Engine())
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) {
r.mu.Unlock()

iAmTheLeaseHolder := newLease.Replica.ReplicaID == replicaID
leaseChangingHands := prevLease.Replica.StoreID != newLease.Replica.StoreID
// NB: in the case in which a node restarts, minLeaseProposedTS forces it to
// get a new lease and we make sure it gets a new sequence number, thus
// causing the right half of the disjunction to fire so that we update the
// timestamp cache.
leaseChangingHands := prevLease.Replica.StoreID != newLease.Replica.StoreID || prevLease.Sequence != newLease.Sequence

if iAmTheLeaseHolder {
// Always log lease acquisition for epoch-based leases which are
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,12 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
PrevLease: status.Lease,
}
} else {
minProposedTS := p.repl.mu.minLeaseProposedTS
leaseReq = &roachpb.RequestLeaseRequest{
Span: reqSpan,
Lease: reqLease,
PrevLease: status.Lease,
Span: reqSpan,
Lease: reqLease,
PrevLease: status.Lease,
MinProposedTS: &minProposedTS,
}
}

Expand Down
98 changes: 57 additions & 41 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1802,50 +1802,66 @@ func TestAcquireLease(t *testing.T) {
&pArgs,
} {
t.Run("", func(t *testing.T) {
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)
// This is a single-replica test; since we're automatically pushing back
// the start of a lease as far as possible, and since there is an auto-
// matic lease for us at the beginning, we'll basically create a lease from
// then on.
lease, _ := tc.repl.GetLease()
expStart := lease.Start
tc.manualClock.Set(leaseExpiry(tc.repl))

ts := tc.Clock().Now().Next()
if _, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: ts}, test); pErr != nil {
t.Error(pErr)
}
if held, expired := hasLease(tc.repl, ts); !held || expired {
t.Errorf("expected lease acquisition")
}
lease, _ = tc.repl.GetLease()
if lease.Start != expStart {
t.Errorf("unexpected lease start: %s; expected %s", lease.Start, expStart)
}
testutils.RunTrueAndFalse(t, "withMinLeaseProposedTS", func(t *testing.T, withMinLeaseProposedTS bool) {
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

lease, _ := tc.repl.GetLease()

// This is a single-replica test; since we're automatically pushing back
// the start of a lease as far as possible, and since there is an auto-
// matic lease for us at the beginning, we'll basically create a lease
// from then on. That is, unless the minLeaseProposedTS which gets set
// automatically at server start forces us to get a new lease. We
// simulate both cases.
var expStart hlc.Timestamp

if *lease.DeprecatedStartStasis != *lease.Expiration {
t.Errorf("%s already in stasis (or beyond): %+v", ts, lease)
}
if !ts.Less(*lease.Expiration) {
t.Errorf("%s already expired: %+v", ts, lease)
}
tc.repl.mu.Lock()
if !withMinLeaseProposedTS {
tc.repl.mu.minLeaseProposedTS = hlc.Timestamp{}
expStart = lease.Start
} else {
expStart = tc.repl.mu.minLeaseProposedTS
}
tc.repl.mu.Unlock()

shouldRenewTS := lease.Expiration.Add(-1, 0)
tc.manualClock.Set(shouldRenewTS.WallTime + 1)
if _, pErr := tc.SendWrapped(test); pErr != nil {
t.Error(pErr)
}
// Since the command we sent above does not get blocked on the lease
// extension, we need to wait for it to go through.
testutils.SucceedsSoon(t, func() error {
newLease, _ := tc.repl.GetLease()
if !lease.Expiration.Less(*newLease.Expiration) {
return errors.Errorf("lease did not get extended: %+v to %+v", lease, newLease)
tc.manualClock.Set(leaseExpiry(tc.repl))

ts := tc.Clock().Now().Next()
if _, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: ts}, test); pErr != nil {
t.Error(pErr)
}
return nil
if held, expired := hasLease(tc.repl, ts); !held || expired {
t.Errorf("expected lease acquisition")
}
lease, _ = tc.repl.GetLease()
if lease.Start != expStart {
t.Errorf("unexpected lease start: %s; expected %s", lease.Start, expStart)
}

if *lease.DeprecatedStartStasis != *lease.Expiration {
t.Errorf("%s already in stasis (or beyond): %+v", ts, lease)
}
if !ts.Less(*lease.Expiration) {
t.Errorf("%s already expired: %+v", ts, lease)
}

shouldRenewTS := lease.Expiration.Add(-1, 0)
tc.manualClock.Set(shouldRenewTS.WallTime + 1)
if _, pErr := tc.SendWrapped(test); pErr != nil {
t.Error(pErr)
}
// Since the command we sent above does not get blocked on the lease
// extension, we need to wait for it to go through.
testutils.SucceedsSoon(t, func() error {
newLease, _ := tc.repl.GetLease()
if !lease.Expiration.Less(*newLease.Expiration) {
return errors.Errorf("lease did not get extended: %+v to %+v", lease, newLease)
}
return nil
})
})
})
}
Expand Down

0 comments on commit e91c50b

Please sign in to comment.