Skip to content

Commit

Permalink
*: ensure grant/revoke won't be applied repeatedly after restarting etcd
Browse files Browse the repository at this point in the history
  • Loading branch information
tangcong committed May 22, 2020
1 parent a4ada8c commit 62e6ab4
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 25 deletions.
7 changes: 4 additions & 3 deletions clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,12 @@ func (s *v3Manager) saveDB() error {
// having a new raft instance
be := backend.NewDefaultBackend(dbpath)

// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})

ci := cindex.NewConsistentIndex(be.BatchTx())
ci.SetConsistentIndex(uint64(commit))

// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci)

mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
txn := mvs.Write(traceutil.TODO())
btx := be.BatchTx()
Expand Down
4 changes: 3 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,9 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
},
srv.consistIndex,
)

tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
Expand Down
20 changes: 15 additions & 5 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"go.etcd.io/etcd/v3/etcdserver/cindex"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
"go.etcd.io/etcd/v3/lease/leasepb"
"go.etcd.io/etcd/v3/mvcc/backend"
Expand Down Expand Up @@ -181,6 +182,7 @@ type lessor struct {
checkpointInterval time.Duration
// the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration
ci cindex.ConsistentIndexer
}

type LessorConfig struct {
Expand All @@ -189,11 +191,11 @@ type LessorConfig struct {
ExpiredLeasesRetryInterval time.Duration
}

func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg)
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) Lessor {
return newLessor(lg, b, cfg, ci)
}

func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 {
Expand All @@ -216,6 +218,7 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
stopC: make(chan struct{}),
doneC: make(chan struct{}),
lg: lg,
ci: ci,
}
l.initAndRecover()

Expand Down Expand Up @@ -291,7 +294,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
}

le.leaseMap[id] = l
l.persistTo(le.b)
l.persistTo(le.b, le.ci)

leaseTotalTTLs.Observe(float64(l.ttl))
leaseGranted.Inc()
Expand Down Expand Up @@ -338,6 +341,10 @@ func (le *lessor) Revoke(id LeaseID) error {
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
// if len(keys) > 0, txn.End() will call ci.UnsafeSave function.
if le.ci != nil && len(keys) == 0 {
le.ci.UnsafeSave(le.b.BatchTx())
}

txn.End()

Expand Down Expand Up @@ -821,7 +828,7 @@ func (l *Lease) expired() bool {
return l.Remaining() <= 0
}

func (l *Lease) persistTo(b backend.Backend) {
func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) {
key := int64ToBytes(int64(l.ID))

lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
Expand All @@ -832,6 +839,9 @@ func (l *Lease) persistTo(b backend.Backend) {

b.BatchTx().Lock()
b.BatchTx().UnsafePut(leaseBucketName, key, val)
if ci != nil {
ci.UnsafeSave(b.BatchTx())
}
b.BatchTx().Unlock()
}

Expand Down
2 changes: 1 addition & 1 deletion lease/lessor_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func setUp() (le *lessor, tearDown func()) {
be, tmpPath := backend.NewDefaultTmpBackend()
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}, nil)
le.SetRangeDeleter(func() TxnDelete {
ftd := &FakeTxnDelete{be.BatchTx()}
ftd.Lock()
Expand Down
31 changes: 16 additions & 15 deletions lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
"go.etcd.io/etcd/v3/mvcc/backend"
"go.etcd.io/etcd/v3/etcdserver/cindex"
"go.uber.org/zap"
)

Expand All @@ -45,7 +46,7 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
le.Promote(0)

Expand Down Expand Up @@ -107,7 +108,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })

Expand Down Expand Up @@ -156,7 +157,7 @@ func TestLessorRevoke(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
var fd *fakeDeleter
le.SetRangeDeleter(func() TxnDelete {
Expand Down Expand Up @@ -209,7 +210,7 @@ func TestLessorRenew(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
le.Promote(0)

Expand Down Expand Up @@ -242,7 +243,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
Expand Down Expand Up @@ -291,7 +292,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
Expand All @@ -310,7 +311,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()

// extend after recovery should extend expiration on lease pile-up
Expand Down Expand Up @@ -340,7 +341,7 @@ func TestLessorDetach(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })

Expand Down Expand Up @@ -381,7 +382,7 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20)
Expand All @@ -390,7 +391,7 @@ func TestLessorRecover(t *testing.T) {
}

// Create a new lessor with the same backend
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer nle.Stop()
nl1 := nle.Lookup(l1.ID)
if nl1 == nil || nl1.ttl != l1.ttl {
Expand All @@ -411,7 +412,7 @@ func TestLessorExpire(t *testing.T) {

testMinTTL := int64(1)

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil)
defer le.Stop()

le.Promote(1 * time.Second)
Expand Down Expand Up @@ -464,7 +465,7 @@ func TestLessorExpireAndDemote(t *testing.T) {

testMinTTL := int64(1)

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil)
defer le.Stop()

le.Promote(1 * time.Second)
Expand Down Expand Up @@ -513,7 +514,7 @@ func TestLessorMaxTTL(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()

_, err := le.Grant(1, MaxLeaseTTL+1)
Expand All @@ -529,7 +530,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}, nil)
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
Expand Down Expand Up @@ -564,7 +565,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
l, err := le.Grant(1, 10)
if err != nil {
Expand Down

0 comments on commit 62e6ab4

Please sign in to comment.