diff --git a/CHANGELOG-3.5.md b/CHANGELOG-3.5.md index f9ec40db53d..397570e0971 100644 --- a/CHANGELOG-3.5.md +++ b/CHANGELOG-3.5.md @@ -147,6 +147,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change. - Fix [memory leak in follower nodes](https://github.com/etcd-io/etcd/pull/11731). - https://github.com/etcd-io/etcd/issues/11495 - https://github.com/etcd-io/etcd/issues/11730 +- Make sure [grant/revoke won't be applied repeatedly after restarting etcd](https://github.com/etcd-io/etcd/pull/11935). ### Package `wal` diff --git a/clientv3/snapshot/v3_snapshot.go b/clientv3/snapshot/v3_snapshot.go index f3a3c19f9e5..0e883e7727b 100644 --- a/clientv3/snapshot/v3_snapshot.go +++ b/clientv3/snapshot/v3_snapshot.go @@ -406,11 +406,12 @@ func (s *v3Manager) saveDB() error { // having a new raft instance be := backend.NewDefaultBackend(dbpath) - // a lessor never timeouts leases - lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) - ci := cindex.NewConsistentIndex(be.BatchTx()) ci.SetConsistentIndex(uint64(commit)) + + // a lessor never timeouts leases + lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci) + mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) txn := mvs.Write(traceutil.TODO()) btx := be.BatchTx() diff --git a/etcdserver/server.go b/etcdserver/server.go index 5e1925ce090..94d9fc0d79b 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -523,7 +523,9 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval, ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), - }) + }, + srv.consistIndex, + ) tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken, func(index uint64) <-chan struct{} { diff --git a/lease/leasehttp/http_test.go b/lease/leasehttp/http_test.go index b57409541aa..41d162f526e 100644 --- a/lease/leasehttp/http_test.go +++ b/lease/leasehttp/http_test.go @@ -33,7 +33,7 @@ func TestRenewHTTP(t *testing.T) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -58,7 +58,7 @@ func TestTimeToLiveHTTP(t *testing.T) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -100,7 +100,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { diff --git a/lease/lessor.go b/lease/lessor.go index bcc8eb0fccc..45ef01bd8d2 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "go.etcd.io/etcd/v3/etcdserver/cindex" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" "go.etcd.io/etcd/v3/lease/leasepb" "go.etcd.io/etcd/v3/mvcc/backend" @@ -181,6 +182,7 @@ type lessor struct { checkpointInterval time.Duration // the interval to check if the expired lease is revoked expiredLeaseRetryInterval time.Duration + ci cindex.ConsistentIndexer } type LessorConfig struct { @@ -189,11 +191,11 @@ type LessorConfig struct { ExpiredLeasesRetryInterval time.Duration } -func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { - return newLessor(lg, b, cfg) +func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) Lessor { + return newLessor(lg, b, cfg, ci) } -func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { +func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) *lessor { checkpointInterval := cfg.CheckpointInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval if checkpointInterval == 0 { @@ -216,6 +218,7 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { stopC: make(chan struct{}), doneC: make(chan struct{}), lg: lg, + ci: ci, } l.initAndRecover() @@ -291,7 +294,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { } le.leaseMap[id] = l - l.persistTo(le.b) + l.persistTo(le.b, le.ci) leaseTotalTTLs.Observe(float64(l.ttl)) leaseGranted.Inc() @@ -338,6 +341,10 @@ func (le *lessor) Revoke(id LeaseID) error { // kv deletion. Or we might end up with not executing the revoke or not // deleting the keys if etcdserver fails in between. le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) + // if len(keys) > 0, txn.End() will call ci.UnsafeSave function. + if le.ci != nil && len(keys) == 0 { + le.ci.UnsafeSave(le.b.BatchTx()) + } txn.End() @@ -821,7 +828,7 @@ func (l *Lease) expired() bool { return l.Remaining() <= 0 } -func (l *Lease) persistTo(b backend.Backend) { +func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) { key := int64ToBytes(int64(l.ID)) lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} @@ -832,6 +839,9 @@ func (l *Lease) persistTo(b backend.Backend) { b.BatchTx().Lock() b.BatchTx().UnsafePut(leaseBucketName, key, val) + if ci != nil { + ci.UnsafeSave(b.BatchTx()) + } b.BatchTx().Unlock() } diff --git a/lease/lessor_bench_test.go b/lease/lessor_bench_test.go index cb8f3e0d7b2..4858aad09fa 100644 --- a/lease/lessor_bench_test.go +++ b/lease/lessor_bench_test.go @@ -69,7 +69,7 @@ func setUp() (le *lessor, tearDown func()) { be, tmpPath := backend.NewDefaultTmpBackend() // MinLeaseTTL is negative, so we can grant expired lease in benchmark. // ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease. - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) + le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}, nil) le.SetRangeDeleter(func() TxnDelete { ftd := &FakeTxnDelete{be.BatchTx()} ftd.Lock() diff --git a/lease/lessor_test.go b/lease/lessor_test.go index defe4951781..200171eb6c5 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -45,7 +45,7 @@ func TestLessorGrant(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) defer le.Stop() le.Promote(0) @@ -107,7 +107,7 @@ func TestLeaseConcurrentKeys(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -156,7 +156,7 @@ func TestLessorRevoke(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) defer le.Stop() var fd *fakeDeleter le.SetRangeDeleter(func() TxnDelete { @@ -209,7 +209,7 @@ func TestLessorRenew(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) defer le.Stop() le.Promote(0) @@ -242,7 +242,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) @@ -291,7 +291,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) ttl := int64(10) for i := 1; i <= leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { @@ -310,7 +310,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -340,7 +340,7 @@ func TestLessorDetach(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -381,7 +381,7 @@ func TestLessorRecover(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) defer le.Stop() l1, err1 := le.Grant(1, 10) l2, err2 := le.Grant(2, 20) @@ -390,7 +390,7 @@ func TestLessorRecover(t *testing.T) { } // Create a new lessor with the same backend - nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) defer nle.Stop() nl1 := nle.Lookup(l1.ID) if nl1 == nil || nl1.ttl != l1.ttl { @@ -411,7 +411,7 @@ func TestLessorExpire(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil) defer le.Stop() le.Promote(1 * time.Second) @@ -464,7 +464,7 @@ func TestLessorExpireAndDemote(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil) defer le.Stop() le.Promote(1 * time.Second) @@ -513,7 +513,7 @@ func TestLessorMaxTTL(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) defer le.Stop() _, err := le.Grant(1, MaxLeaseTTL+1) @@ -529,7 +529,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}, nil) le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { @@ -564,7 +564,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) defer le.Stop() l, err := le.Grant(1, 10) if err != nil {