Skip to content

Commit

Permalink
storage: persist proposed lease transfers to disk
Browse files Browse the repository at this point in the history
... so we don't use those leases after a restart

fixes cockroachdb#7996
  • Loading branch information
andreimatei committed Sep 24, 2016
1 parent f11d132 commit 91d83b3
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 16 deletions.
3 changes: 3 additions & 0 deletions keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ var (
// localStoreGossipSuffix stores gossip bootstrap metadata for this
// store, updated any time new gossip hosts are encountered.
localStoreGossipSuffix = []byte("goss")
// localStoreSafeStartSuffix stores the minimum timestamp when it's safe for
// the store to start serving after a restart.
localStoreSafeStartSuffix = []byte("safe-start")

// LocalRangeIDPrefix is the prefix identifying per-range data
// indexed by Range ID. The Range ID is appended to this prefix,
Expand Down
6 changes: 6 additions & 0 deletions keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ func NodeLastUsageReportKey(nodeID int32) roachpb.Key {
return encoding.EncodeUvarintAscending(prefix, uint64(nodeID))
}

// StoreSafeStartKey returns a store-local key containing the minimum timestamp
// when it's safe for a store to start serving commands after a restart.
func StoreSafeStartKey() roachpb.Key {
return MakeStoreKey(localStoreSafeStartSuffix, nil)
}

func makePrefixWithRangeID(prefix []byte, rangeID roachpb.RangeID, infix roachpb.RKey) roachpb.Key {
// Size the key buffer so that it is large enough for most callers.
key := make(roachpb.Key, 0, 32)
Expand Down
25 changes: 25 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,31 @@ func (s *Server) Start(ctx context.Context) error {
log.Infof(s.Ctx(), "starting postgres server at unix:%s", s.ctx.SocketFile)
}

// We might have to wait a bit: we might have proposed lease transfers
// before restarting, and stopped before applying that transfer. See
// Store.MaybeUpdateSafeStart() for details.
var safeStart hlc.Timestamp
err = s.node.stores.VisitStores(func(s *storage.Store) error {
ts, err := s.GetSafeStartTimestamp(ctx)
if err != nil {
return err
}
if safeStart.Less(ts) {
safeStart = ts
}
return nil
})
if err != nil {
return errors.Wrap(err, "error reading stores' safe start")
}
if s.clock.Now().Less(safeStart) {
log.Infof(ctx, "waiting for engines' safe start timestamp...")
time.Sleep(
// Add 1 nanosecond because we're ignoring the logical part.
time.Duration(safeStart.WallTime-s.clock.PhysicalNow()+1) * time.Nanosecond)
log.Infof(ctx, "waiting for engines' safe start timestamp... done")
}

s.stopper.RunWorker(func() {
netutil.FatalIfUnexpected(m.Serve())
})
Expand Down
104 changes: 100 additions & 4 deletions storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func TestRangeTransferLease(t *testing.T) {
// Transferring the lease to ourself should be a no-op.
origLeasePtr, _ := replica0.GetLease()
origLease := *origLeasePtr
if err := replica0.AdminTransferLease(replica0Desc.StoreID); err != nil {
if err := replica0.AdminTransferLease(context.Background(), replica0Desc.StoreID); err != nil {
t.Fatal(err)
}
newLeasePtr, _ := replica0.GetLease()
Expand All @@ -537,7 +537,8 @@ func TestRangeTransferLease(t *testing.T) {
{
// An invalid target should result in an error.
const expected = "unable to find store .* in range"
if err := replica0.AdminTransferLease(1000); !testutils.IsError(err, expected) {
if err := replica0.AdminTransferLease(
context.Background(), 1000); !testutils.IsError(err, expected) {
t.Fatalf("expected %s, but found %v", expected, err)
}
}
Expand All @@ -550,7 +551,7 @@ func TestRangeTransferLease(t *testing.T) {
return err
})

if err := replica0.AdminTransferLease(newHolderDesc.StoreID); err != nil {
if err := replica0.AdminTransferLease(context.Background(), newHolderDesc.StoreID); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -635,7 +636,7 @@ func TestRangeTransferLease(t *testing.T) {
go func() {
defer wg.Done()
// Transfer back from replica1 to replica0.
if err := replica1.AdminTransferLease(replica0Desc.StoreID); err != nil {
if err := replica1.AdminTransferLease(context.Background(), replica0Desc.StoreID); err != nil {
panic(err)
}
}()
Expand All @@ -658,6 +659,101 @@ func TestRangeTransferLease(t *testing.T) {
wg.Wait()
}

// Test that, upon a lease transfer, the stasis of the current lease is
// persisted in the store. See Store.MaybeUpdateSafeStart() for details about
// why this is done.
func TestLeaseTransferIsPersisted(t *testing.T) {
defer leaktest.AfterTest(t)()
tc := testcluster.StartTestCluster(t, 3,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop()

key := []byte("a")
rangeDesc := new(roachpb.RangeDescriptor)
var err error
*rangeDesc, err = tc.LookupRange(key)
if err != nil {
t.Fatal(err)
}

rangeDesc, err = tc.AddReplicas(
rangeDesc.StartKey.AsRawKey(), tc.Target(1), tc.Target(2),
)
if err != nil {
t.Fatal(err)
}
if len(rangeDesc.Replicas) != 3 {
t.Fatalf("expected 3 replicas, got %+v", rangeDesc.Replicas)
}
replicas := make([]roachpb.ReplicaDescriptor, 3)
for i := 0; i < 3; i++ {
var ok bool
replicas[i], ok = rangeDesc.GetReplicaDescriptor(tc.Servers[i].GetFirstStoreID())
if !ok {
t.Fatalf("expected to find replica on server %d", i)
}
}

// Run a LeaseInfoRequest to get the stasis, so we can assert it in the read
// value below.
leaseReq := roachpb.LeaseInfoRequest{
Span: roachpb.Span{
Key: rangeDesc.StartKey.AsRawKey(),
},
}
leaseResp, pErr := client.SendWrappedWith(
tc.Servers[0].DB().GetSender(), nil,
roachpb.Header{
ReadConsistency: roachpb.INCONSISTENT,
},
&leaseReq)
if pErr != nil {
t.Fatal(pErr)
}
lease := leaseResp.(*roachpb.LeaseInfoResponse).Lease
if lease == nil || !lease.Covers(tc.Servers[0].Clock().Now()) {
t.Fatalf("no active lease")
}

args := roachpb.AdminTransferLeaseRequest{
Span: roachpb.Span{
Key: key,
},
Target: tc.Servers[1].GetFirstStoreID(),
}
store0ID := tc.Servers[0].GetFirstStoreID()
repDesc, ok := rangeDesc.GetReplicaDescriptor(store0ID)
if !ok {
t.Fatalf("failed to find replica on server 0")
}
hdr := roachpb.Header{Replica: repDesc}
// Send directly through the store (as opposed to going through a DistSender)
// so we get an error if this store is not the lease holder. We need to assert
// in this way that server 0 is the lease holder, since we'll read directly
// from its engine below.
_, pErr = client.SendWrappedWith(
tc.Servers[0].Stores(), context.Background(), hdr, &args)
if pErr != nil {
t.Fatal(pErr)
}

store0, err := tc.Servers[0].Stores().GetStore(store0ID)
if err != nil {
t.Fatal(err)
}
ts, err := store0.GetSafeStartTimestamp(context.Background())
if err != nil {
t.Fatal(err)
}

if ts != lease.StartStasis {
t.Fatalf("expected store to have persisted ts: %s, found %s",
lease.StartStasis, ts)
}
}

// Test that a lease extension (a RequestLeaseRequest that doesn't change the
// lease holder) is not blocked by ongoing reads.
// The test relies on two things:
Expand Down
12 changes: 6 additions & 6 deletions storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,12 @@ func MVCCSetRangeStats(ctx context.Context,
return MVCCPutProto(ctx, engine, nil, keys.RangeStatsKey(rangeID), hlc.ZeroTimestamp, nil, ms)
}

// MVCCGetProto fetches the value at the specified key and unmarshals
// it using a protobuf decoder. Returns true on success or false if
// the key was not found. In the event of a WriteIntentError when
// consistent=false, we return the error and the decoded result; for
// all other errors (or when consistent=true) the decoded value is
// invalid.
// MVCCGetProto fetches the value at the specified key and unmarshals it using a
// protobuf decoder. Returns true on success or false if the key was not found
// (in which case `msg` is not modified).
// In the event of a WriteIntentError when consistent=false, we return the error
// and the decoded result; for all other errors (or when consistent=true) the
// decoded value is invalid.
func MVCCGetProto(
ctx context.Context,
engine Reader,
Expand Down
2 changes: 1 addition & 1 deletion storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ func (r *Replica) addAdminCmd(ctx context.Context, ba roachpb.BatchRequest) (*ro
reply, pErr = r.AdminMerge(ctx, *tArgs, r.Desc())
resp = &reply
case *roachpb.AdminTransferLeaseRequest:
pErr = roachpb.NewError(r.AdminTransferLease(tArgs.Target))
pErr = roachpb.NewError(r.AdminTransferLease(ctx, tArgs.Target))
resp = &roachpb.AdminTransferLeaseResponse{}
case *roachpb.CheckConsistencyRequest:
var reply roachpb.CheckConsistencyResponse
Expand Down
16 changes: 11 additions & 5 deletions storage/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,17 @@ func (r *Replica) requestLeaseLocked(timestamp hlc.Timestamp) <-chan *roachpb.Er
// even serve reads or propose commands with timestamps lower than the start of
// the new lease because it could lead to read your own write violations (see
// comments on the stasis period in the Lease proto). We could, in principle,
// serve reads more than the maximum clock offset in the past.
// serve reads more than the maximum clock offset in the past. After a transfer
// is initiated, Replica.mu.pendingLeaseRequest.TransferInProgress() will start
// returning true.
//
// The method waits for any in-progress lease extension to be done, and it also
// blocks until the transfer is done. If a transfer is already in progress,
// this method joins in waiting for it to complete if it's transferring to the
// same replica. Otherwise, a NotLeaderError is returned.
//
// TODO(andrei): figure out how to persist the "not serving" state across node
// restarts.
func (r *Replica) AdminTransferLease(target roachpb.StoreID) error {
func (r *Replica) AdminTransferLease(
ctx context.Context, target roachpb.StoreID,
) error {
// initTransferHelper inits a transfer if no extension is in progress.
// It returns a channel for waiting for the result of a pending
// extension (if any is in progress) and a channel for waiting for the
Expand Down Expand Up @@ -286,6 +287,11 @@ func (r *Replica) AdminTransferLease(target roachpb.StoreID) error {
// served higher timestamps before the restart.
nextLeaseBegin.Forward(
hlc.ZeroTimestamp.Add(r.store.startedAt+int64(r.store.Clock().MaxOffset()), 0))

if err := r.store.MaybeUpdateSafeStart(ctx, lease); err != nil {
return nil, nil, errors.Wrap(err, "error persisting lease transfer")
}

transfer := r.mu.pendingLeaseRequest.InitOrJoinRequest(
r, nextLeaseHolder, nextLeaseBegin,
desc.StartKey.AsRawKey(), true /* transfer */)
Expand Down
56 changes: 56 additions & 0 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,9 @@ type Store struct {
// raft.
droppedPlaceholders int32
}

// safeStartMu serializes access to the engine's "safe start" key.
safeStartMu syncutil.Mutex
}

var _ client.Sender = &Store{}
Expand Down Expand Up @@ -3332,6 +3335,59 @@ func (s *Store) Reserve(ctx context.Context, req ReservationRequest) Reservation
return s.bookie.Reserve(ctx, req, s.deadReplicas().Replicas)
}

// GetSafeStartTimestamp returns the timestamp at or after which it's safe for
// this store to start serving. If no restrictions exist, the zero value is
// returned.
func (s *Store) GetSafeStartTimestamp(ctx context.Context) (hlc.Timestamp, error) {
s.safeStartMu.Lock()
defer s.safeStartMu.Unlock()

var val hlc.Timestamp
_, err := engine.MVCCGetProto(
ctx, s.Engine(), keys.StoreSafeStartKey(), hlc.ZeroTimestamp,
true /* consistent */, nil /* transaction*/, &val)
return val, err
}

// MaybeUpdateSafeStart updates the store's "safe start" timestamp (the minimum
// timestamp at which it's safe to start serving after a restart) if the new
// value is higher than the existing one.
// In particular, on lease transfers, we need to persist to disk the fact that
// the current lease (`lease`) has been transferred, so that the lease is no
// longer used for serving in case the node restarts (the lease is not used for
// serving before the restart through another mechanism).
//
// Instead of keeping track of specifically which leases have been transferred,
// we keep track of the highest expiration of a transferred lease in a local key
// and, upon restart, we don't serve anything until this timestamp (technically,
// we keep track of the start of the stasis periods, since leases are not used
// during their stasis).
// If writing to this shared (per-store) key becomes contentious, we can improve
// the situation by either splitting the single key into k keys, or using the
// engine's merge (max) operator instead of read-modify-write operations.
func (s *Store) MaybeUpdateSafeStart(ctx context.Context, lease *roachpb.Lease) error {
if !lease.OwnedBy(s.StoreID()) {
return errors.Errorf("Trying to save transferred state for a lease the store "+
"doesn't own. What the hell is going on? Lease: %s. Store: %s.", lease, s)
}
s.safeStartMu.Lock()
defer s.safeStartMu.Unlock()

var oldVal hlc.Timestamp
_, err := engine.MVCCGetProto(
ctx, s.Engine(), keys.StoreSafeStartKey(), hlc.ZeroTimestamp,
true /* consistent */, nil /* transaction*/, &oldVal)
if err != nil {
return err
}
if oldVal.Less(lease.StartStasis) {
err = engine.MVCCPutProto(
ctx, s.engine, nil, keys.StoreSafeStartKey(),
hlc.ZeroTimestamp, nil, &lease.StartStasis)
}
return nil
}

// The methods below can be used to control a store's queues. Stopping a queue
// is only meant to happen in tests.

Expand Down

0 comments on commit 91d83b3

Please sign in to comment.