Skip to content

Commit

Permalink
Merge pull request #10305 from cockroachdb/spencerkimball/epoch-range…
Browse files Browse the repository at this point in the history
…-leases

Epoch-based range leases implementation
  • Loading branch information
spencerkimball authored Dec 8, 2016
2 parents 2b74120 + 4c2b798 commit 2b9df0d
Show file tree
Hide file tree
Showing 46 changed files with 1,772 additions and 974 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func tryRaftLogEntry(kv engine.MVCCKeyValue) (string, error) {
return "", err
}
ent.Data = nil
return fmt.Sprintf("%s by %v\n%s\n%s\n", &ent, cmd.OriginReplica, cmd.BatchRequest, &cmd), nil
return fmt.Sprintf("%s by %s\n%s\n%s\n", &ent, cmd.OriginLease, cmd.BatchRequest, &cmd), nil
}
return fmt.Sprintf("%s: EMPTY\n", &ent), nil
} else if ent.Type == raftpb.EntryConfChange {
Expand Down
65 changes: 52 additions & 13 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,26 +949,65 @@ func (t Transaction) GetObservedTimestamp(nodeID NodeID) (hlc.Timestamp, bool) {
var _ fmt.Stringer = &Lease{}

func (l Lease) String() string {
start := time.Unix(0, l.Start.WallTime).UTC()
expiration := time.Unix(0, l.Expiration.WallTime).UTC()
return fmt.Sprintf("replica %s %s %s", l.Replica, start, expiration.Sub(start))
}

// Covers returns true if the given timestamp can be served by the Lease.
// This is the case if the timestamp precedes the Lease's stasis period.
// Note that the fact that a lease covers a timestamp is not enough for the
// holder of the lease to be able to serve a read with that timestamp;
// pendingLeaderLeaseRequest.TransferInProgress() should also be consulted to
// account for possible lease transfers.
func (l Lease) Covers(timestamp hlc.Timestamp) bool {
return timestamp.Less(l.StartStasis)
var proposedSuffix string
if l.ProposedTS != nil {
proposedSuffix = fmt.Sprintf(" pro=%s", l.ProposedTS)
}
if l.Type() == LeaseExpiration {
return fmt.Sprintf("repl=%s start=%s exp=%s%s", l.Replica, l.Start, l.Expiration, proposedSuffix)
}
return fmt.Sprintf("repl=%s start=%s epo=%d%s", l.Replica, l.Start, *l.Epoch, proposedSuffix)
}

// OwnedBy returns whether the given store is the lease owner.
func (l Lease) OwnedBy(storeID StoreID) bool {
return l.Replica.StoreID == storeID
}

// LeaseType describes the type of lease.
type LeaseType int

const (
// LeaseExpiration allows range operations while the wall clock is
// within the expiration timestamp.
LeaseExpiration LeaseType = iota
// LeaseEpoch allows range operations while the node liveness epoch
// is equal to the lease epoch.
LeaseEpoch
)

// Type returns the lease type.
func (l Lease) Type() LeaseType {
if l.Epoch == nil {
return LeaseExpiration
}
return LeaseEpoch
}

// Equivalent determines whether ol is considered the same lease
// for the purposes of matching leases when executing a command.
// For expiration-based leases, extensions are allowed.
// Ignore proposed timestamps for lease verification; for epoch-
// based leases, the start time of the lease is sufficient to
// avoid using an older lease with same epoch.
func (l Lease) Equivalent(ol Lease) error {
l.ProposedTS, ol.ProposedTS = nil, nil
// If both leases are epoch-based, we must dereference the epochs
// and then set to nil.
if l.Type() == LeaseEpoch && ol.Type() == LeaseEpoch && *l.Epoch == *ol.Epoch {
l.Epoch, ol.Epoch = nil, nil
}
// For expiration-based leases, extensions are considered equivalent.
if l.Type() == LeaseExpiration && ol.Type() == LeaseExpiration &&
l.Expiration.Less(ol.Expiration) {
l.Expiration = ol.Expiration
}
if l == ol {
return nil
}
return errors.Errorf("leases %+v and %+v are not equivalent", l, ol)
}

// AsIntents takes a slice of spans and returns it as a slice of intents for
// the given transaction.
func AsIntents(spans []Span, txn *Transaction) []Intent {
Expand Down
292 changes: 156 additions & 136 deletions pkg/roachpb/data.pb.go

Large diffs are not rendered by default.

31 changes: 8 additions & 23 deletions pkg/roachpb/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,7 @@ message Intent {
}

// Lease contains information about range leases including the
// expiration and lease holder. It defines the two intervals
// [start, start_stasis) and [start_stasis, expiration). The
// former encompasses those timestamps for which the lease is
// active, while the latter is a cooldown period which avoids
// inconsistencies during lease holder changes as explained below.
// expiration and lease holder.
message Lease {
option (gogoproto.goproto_stringer) = false;
option (gogoproto.populate) = true;
Expand All @@ -306,24 +302,6 @@ message Lease {
// must be greater than the last lease expiration or the lease request
// is considered invalid.
optional util.hlc.Timestamp start = 1 [(gogoproto.nullable) = false];
// Before the lease expires, it enters a "stasis period" the length of which
// is usually determined by the lease holder's maximum allowed clock offset.
// During this stasis period, the lease must not be used (but can be extended
// by the owner instead). This prevents a failure of linearizability on a
// single register during lease changes. Without that stasis period, the
// following could occur:
// * a range lease gets committed on the new lease holder (but not the old).
// * client proposes and commits a write on new lease holder (with a timestamp
// just greater than the expiration of the old lease).
// * client tries to read what it wrote, but hits a slow coordinator
// (which assigns a timestamp covered by the old lease).
// * the read is served by the old lease holder (which has not processed the
// change in lease holdership).
// * the client fails to read their own write.
//
// Instead, the old lease holder must refuse to serve the client's command on the
// basis that its timestamp falls within the stasis period.
optional util.hlc.Timestamp start_stasis = 4 [(gogoproto.nullable) = false];

// The expiration is a timestamp at which the lease expires. This means that
// a new lease can be granted for a later timestamp.
Expand All @@ -332,6 +310,9 @@ message Lease {
// The address of the would-be lease holder.
optional ReplicaDescriptor replica = 3 [(gogoproto.nullable) = false];

// The start of the lease stasis period. This field is deprecated.
optional util.hlc.Timestamp deprecated_start_stasis = 4 [(gogoproto.nullable) = false];

// The current timestamp when this lease has been proposed. Used after a
// transfer and after a node restart to enforce that a node only uses leases
// proposed after the time of the said transfer or restart. This is nullable
Expand All @@ -340,6 +321,10 @@ message Lease {
// TODO(andrei): Make this non-nullable after the rollout.
optional util.hlc.Timestamp proposed_ts = 5 [(gogoproto.nullable) = true,
(gogoproto.customname) = "ProposedTS"];

// The epoch of the lease holder's node liveness entry. If this value
// is non-zero, the start and expiration values are ignored.
optional int64 epoch = 6 [(gogoproto.nullable) = true];
}

// AbortCacheEntry contains information about a transaction which has
Expand Down
91 changes: 44 additions & 47 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/gogo/protobuf/proto"
)

func makeTS(walltime int64, logical int32) hlc.Timestamp {
Expand Down Expand Up @@ -578,6 +579,49 @@ func TestMakePriorityLimits(t *testing.T) {
}
}

func TestLeaseEquivalence(t *testing.T) {
r1 := ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1}
r2 := ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2}
ts1 := makeTS(1, 1)
ts2 := makeTS(2, 1)
ts3 := makeTS(3, 1)

epoch1 := Lease{Replica: r1, Start: ts1, Epoch: proto.Int64(1)}
epoch2 := Lease{Replica: r1, Start: ts1, Epoch: proto.Int64(2)}
expire1 := Lease{Replica: r1, Start: ts1, Expiration: ts2}
expire2 := Lease{Replica: r1, Start: ts1, Expiration: ts3}
epoch2TS2 := Lease{Replica: r2, Start: ts2, Epoch: proto.Int64(2)}
expire2TS2 := Lease{Replica: r2, Start: ts2, Expiration: ts3}

proposed1 := Lease{Replica: r1, Start: ts1, Epoch: proto.Int64(1), ProposedTS: &ts1}
proposed2 := Lease{Replica: r1, Start: ts1, Epoch: proto.Int64(2), ProposedTS: &ts1}
proposed3 := Lease{Replica: r1, Start: ts1, Epoch: proto.Int64(1), ProposedTS: &ts2}

testCases := []struct {
l, ol Lease
expSuccess bool
}{
{epoch1, epoch1, true}, // same epoch lease
{expire1, expire1, true}, // same expiration lease
{epoch1, epoch2, false}, // different epoch leases
{epoch1, epoch2TS2, false}, // different epoch leases
{expire1, expire2TS2, false}, // different expiration leases
{expire1, expire2, true}, // same expiration lease, extended
{expire2, expire1, false}, // same expiration lease, extended but backwards
{epoch1, expire1, false}, // epoch and expiration leases
{expire1, epoch1, false}, // expiration and epoch leases
{proposed1, proposed1, true}, // exact leases with identical timestamps
{proposed1, proposed2, false}, // same proposed timestamps, but diff epochs
{proposed1, proposed3, true}, // different proposed timestamps, same lease
}

for i, tc := range testCases {
if err := tc.l.Equivalent(tc.ol); tc.expSuccess != (err == nil) {
t.Errorf("%d: expected success? %t; got %s", i, tc.expSuccess, err)
}
}
}

func TestSpanOverlaps(t *testing.T) {
sA := Span{Key: []byte("a")}
sD := Span{Key: []byte("d")}
Expand Down Expand Up @@ -722,53 +766,6 @@ func TestRSpanIntersect(t *testing.T) {
}
}

func TestLeaseCovers(t *testing.T) {
mk := func(ds ...int64) (sl []hlc.Timestamp) {
for _, d := range ds {
sl = append(sl, hlc.ZeroTimestamp.Add(d, 0))
}
return sl
}

ts10 := mk(10)[0]
ts1K := mk(1000)[0]

for i, test := range []struct {
lease Lease
in, out []hlc.Timestamp
}{
{
lease: Lease{
StartStasis: mk(1)[0],
Expiration: ts1K,
},
in: mk(0),
out: mk(1, 100, 500, 999, 1000),
},
{
lease: Lease{
Start: ts10,
StartStasis: mk(500)[0],
Expiration: ts1K,
},
out: mk(500, 999, 1000, 1001, 2000),
// Note that the lease covers timestamps before its start timestamp.
in: mk(0, 9, 10, 300, 499),
},
} {
for _, ts := range test.in {
if !test.lease.Covers(ts) {
t.Errorf("%d: should contain %s", i, ts)
}
}
for _, ts := range test.out {
if test.lease.Covers(ts) {
t.Errorf("%d: must not contain %s", i, ts)
}
}
}
}

func BenchmarkValueSetBytes(b *testing.B) {
v := Value{}
bytes := make([]byte, 16)
Expand Down
7 changes: 6 additions & 1 deletion pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,12 @@ func (e *NotLeaseHolderError) Error() string {
}

func (e *NotLeaseHolderError) message(_ *Error) string {
return fmt.Sprintf("range %d: replica %s not lease holder; %s is", e.RangeID, e.Replica, e.LeaseHolder)
if e.LeaseHolder == nil {
return fmt.Sprintf("range %d: replica %s not lease holder; lease holder unknown", e.RangeID, e.Replica)
} else if e.Lease != nil {
return fmt.Sprintf("range %d: replica %s not lease holder; current lease is %s", e.RangeID, e.Replica, e.Lease)
}
return fmt.Sprintf("range %d: replica %s not lease holder; replica %s is", e.RangeID, e.Replica, *e.LeaseHolder)
}

var _ ErrorDetailInterface = &NotLeaseHolderError{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
RangeLeaseActiveDuration: active,
RangeLeaseRenewalDuration: renewal,
TimeSeriesDataStore: s.tsDB,

EnableEpochRangeLeases: envutil.EnvOrDefaultBool(
"COCKROACH_ENABLE_EPOCH_RANGE_LEASES", false),
}
if s.cfg.TestingKnobs.Store != nil {
storeCfg.TestingKnobs = *s.cfg.TestingKnobs.Store.(*storage.StoreTestingKnobs)
Expand Down Expand Up @@ -726,6 +729,7 @@ func (s *Server) doDrain(modes []serverpb.DrainMode, setTo bool) ([]serverpb.Dra
case mode == serverpb.DrainMode_CLIENT:
err = s.pgServer.SetDraining(setTo)
case mode == serverpb.DrainMode_LEASES:
s.nodeLiveness.PauseHeartbeat(setTo)
err = s.node.SetDraining(setTo)
default:
err = errors.Errorf("unknown drain mode: %v (%d)", mode, mode)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
reflect.TypeOf(&roachpb.Lease{}): {
populatedConstructor: func(r *rand.Rand) proto.Message { return roachpb.NewPopulatedLease(r, false) },
emptySum: 10006158318270644799,
populatedSum: 17421216026521129287,
populatedSum: 1304511461063751549,
},
reflect.TypeOf(&roachpb.RaftTruncatedState{}): {
populatedConstructor: func(r *rand.Rand) proto.Message { return roachpb.NewPopulatedRaftTruncatedState(r, false) },
Expand Down
Loading

0 comments on commit 2b9df0d

Please sign in to comment.