Skip to content

Commit

Permalink
storage: don't use leases not acquired by the current process
Browse files Browse the repository at this point in the history
Before this patch, leases held before a restart could be used after the
restart. This is incorrect, since the command queue has been wiped in
the restart and so reads and writes are not properly sequenced with
possible in-flight commands.
Another problem is that a lease that was in the process of being
transferred away before the restart is used, and that's no good because
it breaks the lease holder's promise to not use that lease.

This patch adds a bit of in-memory state to a replica - whether the
lease has changed since the process has been started. If it hasn't, a
lease is not used _at propose time_ and instead the holder pretends it
doesn't have a valid lease and tries to acquire one. Nodes other than
the holder do not modify their behavior.
No similar check is done at apply time (nor could it) since there we
naturally rely on Raft to order writes; so, if the proposer is still the
lease holder, there's no reason not to apply a command.

fixes cockroachdb#7996
  • Loading branch information
andreimatei committed Oct 25, 2016
1 parent ca89f45 commit 5287df6
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
31 changes: 26 additions & 5 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ type Replica struct {
raftLogSize int64
// pendingLeaseRequest is used to coalesce RequestLease requests.
pendingLeaseRequest pendingLeaseRequest
// leaseResetAfterStart starts up false and is set whenever a new lease
// (with any owner) has been applied. It is used to prevent a node from
// using leases it thinks it owns immediately after a restart. Such leases
// can't be used because the command queue has been wiped in the restart, so
// reads might return incorrect results, as they don't synchronize with
// in-flight writes.
leaseResetAfterStart bool
// Max bytes before split.
maxBytes int64
// proposals stores the Raft in-flight commands which
Expand Down Expand Up @@ -787,7 +794,7 @@ func (r *Replica) redirectOnOrAcquireLease(ctx context.Context) *roachpb.Error {
r.mu.Lock()
defer r.mu.Unlock()
lease := r.mu.state.Lease
if lease.Covers(timestamp) {
if lease.Covers(timestamp) && r.mu.leaseResetAfterStart {
if !lease.OwnedBy(r.store.StoreID()) {
// If lease is currently held by another, redirect to holder.
return nil, roachpb.NewError(
Expand Down Expand Up @@ -2838,6 +2845,10 @@ func (r *Replica) acquireSplitLock(split *roachpb.SplitTrigger) func(pErr *roach
if err != nil {
return nil
}
// We've just created this replica, so it's lease is usable.
rightRng.mu.Lock()
rightRng.mu.leaseResetAfterStart = true
rightRng.mu.Unlock()

// It would be nice to assert that rightRng is not initialized
// here. Unfortunately, due to reproposals and retries we might be executing
Expand Down Expand Up @@ -3558,13 +3569,18 @@ func (r *Replica) maybeGossipSystemConfig() {
return
}

ctx := r.AnnotateCtx(context.TODO())

if lease, _ := r.getLease(); !lease.OwnedBy(r.store.StoreID()) || !lease.Covers(r.store.Clock().Now()) {
r.mu.Lock()
lease := r.mu.state.Lease
leaseReset := r.mu.leaseResetAfterStart
r.mu.Unlock()
if !lease.OwnedBy(r.store.StoreID()) || !lease.Covers(r.store.Clock().Now()) ||
!leaseReset {
// Do not gossip when a range lease is not held.
return
}

ctx := r.AnnotateCtx(context.TODO())

// TODO(marc): check for bad split in the middle of the SystemConfig span.
kvs, hash, err := r.loadSystemConfigSpan()
if err != nil {
Expand Down Expand Up @@ -3605,7 +3621,12 @@ func (r *Replica) maybeGossipNodeLiveness(span roachpb.Span) {
return
}

if lease, _ := r.getLease(); !lease.OwnedBy(r.store.StoreID()) || !lease.Covers(r.store.Clock().Now()) {
r.mu.Lock()
lease := r.mu.state.Lease
leaseReset := r.mu.leaseResetAfterStart
r.mu.Unlock()
if !lease.OwnedBy(r.store.StoreID()) || !lease.Covers(r.store.Clock().Now()) ||
!leaseReset {
// Do not gossip when a range lease is not held.
return
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,6 +1762,7 @@ func (r *Replica) applyNewLeaseLocked(
if err := setLease(ctx, batch, ms, r.RangeID, &lease); err != nil {
return reply, newFailedLeaseTrigger(), err
}
r.mu.leaseResetAfterStart = true

var pd ProposalData
// If we didn't block concurrent reads here, there'd be a chance that
Expand Down
3 changes: 0 additions & 3 deletions pkg/storage/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,6 @@ func (r *Replica) requestLeaseLocked(timestamp hlc.Timestamp) <-chan *roachpb.Er
// blocks until the transfer is done. If a transfer is already in progress,
// this method joins in waiting for it to complete if it's transferring to the
// same replica. Otherwise, a NotLeaderError is returned.
//
// TODO(andrei): figure out how to persist the "not serving" state across node
// restarts.
func (r *Replica) AdminTransferLease(target roachpb.StoreID) error {
// initTransferHelper inits a transfer if no extension is in progress.
// It returns a channel for waiting for the result of a pending
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,10 @@ func TestReplicaRangeBoundsChecking(t *testing.T) {
// hasLease returns whether the most recent range lease was held by the given
// range replica and whether it's expired for the given timestamp.
func hasLease(rng *Replica, timestamp hlc.Timestamp) (owned bool, expired bool) {
l, _ := rng.getLease()
return l.OwnedBy(rng.store.StoreID()), !l.Covers(timestamp)
rng.mu.Lock()
defer rng.mu.Unlock()
l := rng.mu.state.Lease
return l.OwnedBy(rng.store.StoreID()) && rng.mu.leaseResetAfterStart, !l.Covers(timestamp)
}

func TestReplicaLease(t *testing.T) {
Expand Down

0 comments on commit 5287df6

Please sign in to comment.