Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lease: ensure grant/revoke won't be applied repeatedly after restarting etcd #11935

Merged
merged 2 commits into from
May 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Fix [memory leak in follower nodes](https://github.com/etcd-io/etcd/pull/11731).
- https://github.com/etcd-io/etcd/issues/11495
- https://github.com/etcd-io/etcd/issues/11730
- Make sure [grant/revoke won't be applied repeatedly after restarting etcd](https://github.com/etcd-io/etcd/pull/11935).

### Package `wal`

Expand Down
7 changes: 4 additions & 3 deletions clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,12 @@ func (s *v3Manager) saveDB() error {
// having a new raft instance
be := backend.NewDefaultBackend(dbpath)

// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})

ci := cindex.NewConsistentIndex(be.BatchTx())
ci.SetConsistentIndex(uint64(commit))

// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci)

mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
txn := mvs.Write(traceutil.TODO())
btx := be.BatchTx()
Expand Down
4 changes: 3 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,9 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
},
srv.consistIndex,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the field was an existing field, it seems naming the field consistentIndex is more intuitive.

)

tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
Expand Down
6 changes: 3 additions & 3 deletions lease/leasehttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestRenewHTTP(t *testing.T) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil)
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand All @@ -58,7 +58,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil)
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down Expand Up @@ -100,7 +100,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil)
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"go.etcd.io/etcd/v3/etcdserver/cindex"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
"go.etcd.io/etcd/v3/lease/leasepb"
"go.etcd.io/etcd/v3/mvcc/backend"
Expand Down Expand Up @@ -181,6 +182,7 @@ type lessor struct {
checkpointInterval time.Duration
// the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration
ci cindex.ConsistentIndexer
}

type LessorConfig struct {
Expand All @@ -189,11 +191,11 @@ type LessorConfig struct {
ExpiredLeasesRetryInterval time.Duration
}

func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg)
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) Lessor {
return newLessor(lg, b, cfg, ci)
}

func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 {
Expand All @@ -216,6 +218,7 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
stopC: make(chan struct{}),
doneC: make(chan struct{}),
lg: lg,
ci: ci,
}
l.initAndRecover()

Expand Down Expand Up @@ -291,7 +294,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
}

le.leaseMap[id] = l
l.persistTo(le.b)
l.persistTo(le.b, le.ci)

leaseTotalTTLs.Observe(float64(l.ttl))
leaseGranted.Inc()
Expand Down Expand Up @@ -338,6 +341,10 @@ func (le *lessor) Revoke(id LeaseID) error {
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
// if len(keys) > 0, txn.End() will call ci.UnsafeSave function.
if le.ci != nil && len(keys) == 0 {
le.ci.UnsafeSave(le.b.BatchTx())
}

txn.End()

Expand Down Expand Up @@ -821,7 +828,7 @@ func (l *Lease) expired() bool {
return l.Remaining() <= 0
}

func (l *Lease) persistTo(b backend.Backend) {
func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) {
key := int64ToBytes(int64(l.ID))

lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
Expand All @@ -832,6 +839,9 @@ func (l *Lease) persistTo(b backend.Backend) {

b.BatchTx().Lock()
b.BatchTx().UnsafePut(leaseBucketName, key, val)
if ci != nil {
ci.UnsafeSave(b.BatchTx())
}
b.BatchTx().Unlock()
}

Expand Down
2 changes: 1 addition & 1 deletion lease/lessor_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func setUp() (le *lessor, tearDown func()) {
be, tmpPath := backend.NewDefaultTmpBackend()
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}, nil)
le.SetRangeDeleter(func() TxnDelete {
ftd := &FakeTxnDelete{be.BatchTx()}
ftd.Lock()
Expand Down
30 changes: 15 additions & 15 deletions lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
le.Promote(0)

Expand Down Expand Up @@ -107,7 +107,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })

Expand Down Expand Up @@ -156,7 +156,7 @@ func TestLessorRevoke(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
var fd *fakeDeleter
le.SetRangeDeleter(func() TxnDelete {
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestLessorRenew(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
le.Promote(0)

Expand Down Expand Up @@ -242,7 +242,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
Expand Down Expand Up @@ -291,7 +291,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
Expand All @@ -310,7 +310,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()

// extend after recovery should extend expiration on lease pile-up
Expand Down Expand Up @@ -340,7 +340,7 @@ func TestLessorDetach(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })

Expand Down Expand Up @@ -381,7 +381,7 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20)
Expand All @@ -390,7 +390,7 @@ func TestLessorRecover(t *testing.T) {
}

// Create a new lessor with the same backend
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer nle.Stop()
nl1 := nle.Lookup(l1.ID)
if nl1 == nil || nl1.ttl != l1.ttl {
Expand All @@ -411,7 +411,7 @@ func TestLessorExpire(t *testing.T) {

testMinTTL := int64(1)

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil)
defer le.Stop()

le.Promote(1 * time.Second)
Expand Down Expand Up @@ -464,7 +464,7 @@ func TestLessorExpireAndDemote(t *testing.T) {

testMinTTL := int64(1)

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil)
defer le.Stop()

le.Promote(1 * time.Second)
Expand Down Expand Up @@ -513,7 +513,7 @@ func TestLessorMaxTTL(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()

_, err := le.Grant(1, MaxLeaseTTL+1)
Expand All @@ -529,7 +529,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}, nil)
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
Expand Down Expand Up @@ -564,7 +564,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
defer le.Stop()
l, err := le.Grant(1, 10)
if err != nil {
Expand Down