Skip to content

Commit

Permalink
storage: persist proposed lease transfers to disk
Browse files Browse the repository at this point in the history
... so we don't use those leases after a restart

fixes #7996
  • Loading branch information
andreimatei committed Oct 11, 2016
1 parent 4ae5899 commit 81eee1d
Show file tree
Hide file tree
Showing 12 changed files with 506 additions and 194 deletions.
12 changes: 7 additions & 5 deletions keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,20 @@ var (
// abort cache protects a transaction from re-reading its own intents
// after it's been aborted.
LocalAbortCacheSuffix = []byte("abc-")
// localRangeFrozenStatusSuffix is the suffix for a frozen status.
// LocalRangeFrozenStatusSuffix is the suffix for a frozen status.
LocalRangeFrozenStatusSuffix = []byte("fzn-")
// localRangeLastGCSuffix is the suffix for the last GC.
LocalRangeLastGCSuffix = []byte("lgc-")
// LocalRaftAppliedIndexSuffix is the suffix for the raft applied index.
// localRaftAppliedIndexSuffix is the suffix for the raft applied index.
LocalRaftAppliedIndexSuffix = []byte("rfta")
// localRaftTombstoneSuffix is the suffix for the raft tombstone.
// LocalRaftTombstoneSuffix is the suffix for the raft tombstone.
LocalRaftTombstoneSuffix = []byte("rftb")
// localRaftTruncatedStateSuffix is the suffix for the RaftTruncatedState.
// LocalRaftTruncatedStateSuffix is the suffix for the RaftTruncatedState.
LocalRaftTruncatedStateSuffix = []byte("rftt")
// localRangeLeaseSuffix is the suffix for a range lease.
// LocalRangeLeaseSuffix is the suffix for a range lease.
LocalRangeLeaseSuffix = []byte("rll-")
// LocalNextLeaseSuffix is the suffix for a range's "next lease".
LocalNextLeaseSuffix = []byte("rnl-")
// LocalLeaseAppliedIndexSuffix is the suffix for the applied lease index.
LocalLeaseAppliedIndexSuffix = []byte("rlla")
// localRangeStatsSuffix is the suffix for range statistics.
Expand Down
6 changes: 6 additions & 0 deletions keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ func RangeLeaseKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDReplicatedKey(rangeID, LocalRangeLeaseSuffix, nil)
}

// ReplicaNextLeaseKey returns a system-local key for info on the last lease
// that was proposed, but not yet applied, by the local replica.
func ReplicaNextLeaseKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDUnreplicatedKey(rangeID, LocalNextLeaseSuffix, nil)
}

// RangeStatsKey returns the key for accessing the MVCCStats struct
// for the specified Range ID.
func RangeStatsKey(rangeID roachpb.RangeID) roachpb.Key {
Expand Down
39 changes: 10 additions & 29 deletions storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func TestRangeTransferLease(t *testing.T) {
// Transferring the lease to ourself should be a no-op.
origLeasePtr, _ := replica0.GetLease()
origLease := *origLeasePtr
if err := replica0.AdminTransferLease(replica0Desc.StoreID); err != nil {
if err := replica0.AdminTransferLease(context.Background(), replica0Desc.StoreID); err != nil {
t.Fatal(err)
}
newLeasePtr, _ := replica0.GetLease()
Expand All @@ -537,7 +537,8 @@ func TestRangeTransferLease(t *testing.T) {
{
// An invalid target should result in an error.
const expected = "unable to find store .* in range"
if err := replica0.AdminTransferLease(1000); !testutils.IsError(err, expected) {
if err := replica0.AdminTransferLease(
context.Background(), 1000); !testutils.IsError(err, expected) {
t.Fatalf("expected %s, but found %v", expected, err)
}
}
Expand All @@ -550,7 +551,7 @@ func TestRangeTransferLease(t *testing.T) {
return err
})

if err := replica0.AdminTransferLease(newHolderDesc.StoreID); err != nil {
if err := replica0.AdminTransferLease(context.Background(), newHolderDesc.StoreID); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -635,7 +636,7 @@ func TestRangeTransferLease(t *testing.T) {
go func() {
defer wg.Done()
// Transfer back from replica1 to replica0.
if err := replica1.AdminTransferLease(replica0Desc.StoreID); err != nil {
if err := replica1.AdminTransferLease(context.Background(), replica0Desc.StoreID); err != nil {
panic(err)
}
}()
Expand Down Expand Up @@ -736,27 +737,6 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) {
}
}

// LeaseInfo runs a LeaseInfoRequest using the specified server.
func LeaseInfo(
t *testing.T,
db *client.DB,
rangeDesc roachpb.RangeDescriptor,
readConsistency roachpb.ReadConsistencyType,
) roachpb.LeaseInfoResponse {
leaseInfoReq := &roachpb.LeaseInfoRequest{
Span: roachpb.Span{
Key: rangeDesc.StartKey.AsRawKey(),
},
}
reply, pErr := client.SendWrappedWith(db.GetSender(), nil, roachpb.Header{
ReadConsistency: readConsistency,
}, leaseInfoReq)
if pErr != nil {
t.Fatal(pErr)
}
return *(reply.(*roachpb.LeaseInfoResponse))
}

func TestLeaseInfoRequest(t *testing.T) {
defer leaktest.AfterTest(t)()
tc := testcluster.StartTestCluster(t, 3,
Expand Down Expand Up @@ -795,7 +775,8 @@ func TestLeaseInfoRequest(t *testing.T) {
}

// Lease should start on Server 0, since nobody told it to move.
leaseHolderReplica := LeaseInfo(t, kvDB0, *rangeDesc, roachpb.INCONSISTENT).Lease.Replica
leaseHolderReplica := storage.LeaseInfo(
t, kvDB0, *rangeDesc, roachpb.INCONSISTENT).Lease.Replica
if leaseHolderReplica != replicas[0] {
t.Fatalf("lease holder should be replica %+v, but is: %+v", replicas[0], leaseHolderReplica)
}
Expand All @@ -809,7 +790,7 @@ func TestLeaseInfoRequest(t *testing.T) {
// An inconsistent LeaseInfoReqeust on the old lease holder should give us the
// right answer immediately, since the old holder has definitely applied the
// transfer before TransferRangeLease returned.
leaseHolderReplica = LeaseInfo(t, kvDB0, *rangeDesc, roachpb.INCONSISTENT).Lease.Replica
leaseHolderReplica = storage.LeaseInfo(t, kvDB0, *rangeDesc, roachpb.INCONSISTENT).Lease.Replica
if leaseHolderReplica != replicas[1] {
t.Fatalf("lease holder should be replica %+v, but is: %+v",
replicas[1], leaseHolderReplica)
Expand All @@ -822,7 +803,7 @@ func TestLeaseInfoRequest(t *testing.T) {
// from the supposed lease holder, because this node might initially be
// unaware of the new lease and so the request might bounce around for a
// while (see #8816).
leaseHolderReplica = LeaseInfo(t, kvDB1, *rangeDesc, roachpb.INCONSISTENT).Lease.Replica
leaseHolderReplica = storage.LeaseInfo(t, kvDB1, *rangeDesc, roachpb.INCONSISTENT).Lease.Replica
if leaseHolderReplica != replicas[1] {
return errors.Errorf("lease holder should be replica %+v, but is: %+v",
replicas[1], leaseHolderReplica)
Expand All @@ -836,7 +817,7 @@ func TestLeaseInfoRequest(t *testing.T) {
if err != nil {
t.Fatal(err)
}
leaseHolderReplica = LeaseInfo(t, kvDB1, *rangeDesc, roachpb.INCONSISTENT).Lease.Replica
leaseHolderReplica = storage.LeaseInfo(t, kvDB1, *rangeDesc, roachpb.INCONSISTENT).Lease.Replica
if leaseHolderReplica != replicas[2] {
t.Fatalf("lease holder should be replica %+v, but is: %+v", replicas[2], leaseHolderReplica)
}
Expand Down
9 changes: 5 additions & 4 deletions storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,10 +875,11 @@ func TestStoreRangeSystemSplits(t *testing.T) {

// runSetupSplitSnapshotRace engineers a situation in which a range has
// been split but node 3 hasn't processed it yet. There is a race
// depending on whether node 3 learns of the split from its left or
// right side. When this function returns most of the nodes will be
// stopped, and depending on the order in which they are restarted, we
// can arrange for both possible outcomes of the race.
// on this follower node between learning about the split through the regular
// way (applying the split command on the lhs range) and receiving a snapshot
// from the rhs. When this function returns most of the nodes will be stopped,
// and depending on the order in which they are restarted, we can arrange for
// both possible outcomes of the race.
//
// Range 1 is the system keyspace, located on node 0.
//
Expand Down
11 changes: 6 additions & 5 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/cockroachdb/cockroach/util/netutil"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/cockroachdb/cockroach/util/syncutil"
"github.com/cockroachdb/cockroach/util/timeutil"
"github.com/cockroachdb/cockroach/util/tracing"
)

Expand Down Expand Up @@ -312,8 +313,8 @@ func (m *multiTestContext) Stop() {
// deadlocks.
var wg sync.WaitGroup
wg.Add(len(m.stoppers))
for _, s := range m.stoppers {
go func(s *stop.Stopper) {
for i, s := range m.stoppers {
go func(i int, s *stop.Stopper) {
defer wg.Done()
// Some Stoppers may be nil if stopStore has been called
// without restartStore.
Expand All @@ -325,7 +326,7 @@ func (m *multiTestContext) Stop() {
// getting stuck in addWriteCommand.
s.Quiesce()
}
}(s)
}(i, s)
}
wg.Wait()

Expand Down Expand Up @@ -353,7 +354,7 @@ func (m *multiTestContext) Stop() {
if m.t.Failed() {
m.t.Error("timed out during shutdown")
} else {
panic("timed out during shutdown")
panic(fmt.Sprintf("timed out during shutdown. current time: %s.", timeutil.Now()))
}
}
}
Expand Down Expand Up @@ -1049,7 +1050,7 @@ func (m *multiTestContext) transferLease(rangeID roachpb.RangeID, destStore *sto
return err
}

return origRepl.AdminTransferLease(destStore.Ident.StoreID)
return origRepl.AdminTransferLease(context.Background(), destStore.Ident.StoreID)
}

// getArgs returns a GetRequest and GetResponse pair addressed to
Expand Down
14 changes: 7 additions & 7 deletions storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,12 @@ func MVCCSetRangeStats(
return MVCCPutProto(ctx, engine, nil, keys.RangeStatsKey(rangeID), hlc.ZeroTimestamp, nil, ms)
}

// MVCCGetProto fetches the value at the specified key and unmarshals
// it using a protobuf decoder. Returns true on success or false if
// the key was not found. In the event of a WriteIntentError when
// consistent=false, we return the error and the decoded result; for
// all other errors (or when consistent=true) the decoded value is
// invalid.
// MVCCGetProto fetches the value at the specified key and unmarshals it using a
// protobuf decoder. Returns true on success or false if the key was not found
// (in which case `msg` is not modified).
// In the event of a WriteIntentError when consistent=false, we return the error
// and the decoded result; for all other errors (or when consistent=true) the
// decoded value is invalid.
func MVCCGetProto(
ctx context.Context,
engine Reader,
Expand Down Expand Up @@ -476,7 +476,7 @@ func MVCCGetProto(
func MVCCPutProto(
ctx context.Context,
engine ReadWriter,
ms *enginepb.MVCCStats,
ms *enginepb.MVCCStats, // can be nil is the key is unreplicated and doesn't affect stats
key roachpb.Key,
timestamp hlc.Timestamp,
txn *roachpb.Transaction,
Expand Down
27 changes: 16 additions & 11 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ type Replica struct {
// process have been truncated.
raftLogSize int64
// pendingLeaseRequest is used to coalesce RequestLease requests.
pendingLeaseRequest pendingLeaseRequest
pendingLeaseRequest *pendingLeaseRequest
// Max bytes before split.
maxBytes int64
// pendingCmds stores the Raft in-flight commands which
Expand Down Expand Up @@ -579,6 +579,11 @@ func (r *Replica) initLocked(
}
r.rangeStr.store(r.mu.state.Desc)

r.mu.pendingLeaseRequest, err = newPendingLeaseRequest(r.ctx, r)
if err != nil {
return err
}

r.mu.lastIndex, err = loadLastIndex(r.ctx, r.store.Engine(), r.RangeID)
if err != nil {
return err
Expand Down Expand Up @@ -764,7 +769,7 @@ func (r *Replica) redirectOnOrAcquireLease(ctx context.Context) *roachpb.Error {
// lease holder. Returns also on context.Done() (timeout or cancellation).
for attempt := 1; ; attempt++ {
timestamp := r.store.Clock().Now()
llChan, pErr := func() (<-chan *roachpb.Error, *roachpb.Error) {
loeChan, pErr := func() (<-chan leaseOrErr, *roachpb.Error) {
r.mu.Lock()
defer r.mu.Unlock()
lease := r.mu.state.Lease
Expand Down Expand Up @@ -800,41 +805,41 @@ func (r *Replica) redirectOnOrAcquireLease(ctx context.Context) *roachpb.Error {
// a lease extension. We don't need to wait for that extension
// to go through and simply ignore the returned channel (which
// is buffered).
_ = r.requestLeaseLocked(timestamp)
_ = r.requestLeaseLocked(ctx, timestamp)
}
// Return a nil chan to signal that we have a valid lease.
return nil, nil
}
log.Eventf(ctx, "request range lease (attempt #%d)", attempt)

// No active lease: Request renewal if a renewal is not already pending.
return r.requestLeaseLocked(timestamp), nil
return r.requestLeaseLocked(ctx, timestamp), nil
}()
if pErr != nil {
return pErr
}
if llChan == nil {
if loeChan == nil {
// We own a covering lease.
return nil
}

// Wait for the range lease to finish, or the context to expire.
select {
case pErr := <-llChan:
if pErr != nil {
case loe := <-loeChan:
if loe.pErr != nil {
// Getting a LeaseRejectedError back means someone else got there
// first, or the lease request was somehow invalid due to a
// concurrent change. Convert the error to a NotLeaseHolderError.
if _, ok := pErr.GetDetail().(*roachpb.LeaseRejectedError); ok {
if _, ok := loe.pErr.GetDetail().(*roachpb.LeaseRejectedError); ok {
lease, _ := r.getLease()
if !lease.Covers(r.store.Clock().Now()) {
lease = nil
}
return roachpb.NewError(newNotLeaseHolderError(lease, r.store.StoreID(), r.Desc()))
}
return pErr
return loe.pErr
}
log.Event(ctx, "lease acquisition succeeded")
log.Eventf(ctx, "a new lease has been applied: %s", loe.lease)
continue
case <-ctx.Done():
log.ErrEventf(ctx, "lease acquisition failed: %s", ctx.Err())
Expand Down Expand Up @@ -1391,7 +1396,7 @@ func (r *Replica) addAdminCmd(
reply, pErr = r.AdminMerge(ctx, *tArgs, r.Desc())
resp = &reply
case *roachpb.AdminTransferLeaseRequest:
pErr = roachpb.NewError(r.AdminTransferLease(tArgs.Target))
pErr = roachpb.NewError(r.AdminTransferLease(ctx, tArgs.Target))
resp = &roachpb.AdminTransferLeaseResponse{}
case *roachpb.CheckConsistencyRequest:
var reply roachpb.CheckConsistencyResponse
Expand Down
Loading

0 comments on commit 81eee1d

Please sign in to comment.