Skip to content

Commit

Permalink
Merge pull request #13508 from serathius/checkpoints-fix
Browse files Browse the repository at this point in the history
Lease Checkpoints fix
  • Loading branch information
ptabor authored Dec 2, 2021
2 parents 3e391f4 + 48a7aab commit 170d9b9
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 77 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.1...v3.5.2) and

### etcd server
- Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13476).
- Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to enable checkpoint persisting.
- Fix [Lease checkpoints don't prevent to reset ttl on leader change](https://github.com/etcd-io/etcd/pull/13508), requires enabling checkpoint persisting.

<hr>

Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG-3.6.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).

- Add [`etcd --log-format`](https://github.com/etcd-io/etcd/pull/13339) flag to support log format.
- Add [`etcd --experimental-max-learners`](https://github.com/etcd-io/etcd/pull/13377) flag to allow configuration of learner max membership.
- Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to handle upgrade from v3.5.2 clusters with this feature enabled.
- Fix [non mutating requests pass through quotaKVServer when NOSPACE](https://github.com/etcd-io/etcd/pull/13435)
- Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13467).
- Fix [Provide a better liveness probe for when etcd runs as a Kubernetes pod](https://github.com/etcd-io/etcd/pull/13399)
- Fix [Lease checkpoints don't prevent to reset ttl on leader change](https://github.com/etcd-io/etcd/pull/13508).

### tools/benchmark

Expand Down
4 changes: 3 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,12 @@ type ServerConfig struct {

ForceNewCluster bool

// EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
// EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
EnableLeaseCheckpoint bool
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
LeaseCheckpointInterval time.Duration
// LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
LeaseCheckpointPersist bool

EnableGRPCGateway bool

Expand Down
17 changes: 15 additions & 2 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,14 @@ type Config struct {
// Deprecated in v3.5.
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
// Requires experimental-enable-lease-checkpoint to be enabled.
// Deprecated in v3.6.
// TODO: Delete in v3.7
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
// ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop.
ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
Expand Down Expand Up @@ -704,6 +709,14 @@ func (cfg *Config) Validate() error {
}
}

if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint {
cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist")
}

if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}

return nil
}

Expand Down
50 changes: 50 additions & 0 deletions server/embed/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,56 @@ func TestPeerURLsMapAndTokenFromSRV(t *testing.T) {
}
}

func TestLeaseCheckpointValidate(t *testing.T) {
tcs := []struct {
name string
configFunc func() Config
expectError bool
}{
{
name: "Default config should pass",
configFunc: func() Config {
return *NewConfig()
},
},
{
name: "Enabling checkpoint leases should pass",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpoint = true
return cfg
},
},
{
name: "Enabling checkpoint leases and persist should pass",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpoint = true
cfg.ExperimentalEnableLeaseCheckpointPersist = true
return cfg
},
},
{
name: "Enabling checkpoint leases persist without checkpointing itself should fail",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpointPersist = true
return cfg
},
expectError: true,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
cfg := tc.configFunc()
err := cfg.Validate()
if (err != nil) != tc.expectError {
t.Errorf("config.Validate() = %q, expected error: %v", err, tc.expectError)
}
})
}
}

func TestLogRotation(t *testing.T) {
tests := []struct {
name string
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
Expand Down
4 changes: 3 additions & 1 deletion server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")

fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
Expand Down
3 changes: 2 additions & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
CheckpointPersist: cfg.LeaseCheckpointPersist,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})

Expand Down
6 changes: 3 additions & 3 deletions server/lease/leasehttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand All @@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down Expand Up @@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down
41 changes: 35 additions & 6 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/storage/backend"
Expand All @@ -37,6 +38,8 @@ const NoLease = LeaseID(0)
// MaxLeaseTTL is the maximum lease TTL value
const MaxLeaseTTL = 9000000000

var v3_6 = semver.Version{Major: 3, Minor: 6}

var (
forever = time.Time{}

Expand Down Expand Up @@ -180,19 +183,29 @@ type lessor struct {
checkpointInterval time.Duration
// the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration
// whether lessor should always persist remaining TTL (always enabled in v3.6).
checkpointPersist bool
// cluster is used to adapt lessor logic based on cluster version
cluster cluster
}

type cluster interface {
// Version is the cluster-wide minimum major.minor version.
Version() *semver.Version
}

type LessorConfig struct {
MinLeaseTTL int64
CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration
CheckpointPersist bool
}

func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg)
func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
return newLessor(lg, b, cluster, cfg)
}

func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 {
Expand All @@ -210,11 +223,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
checkpointPersist: cfg.CheckpointPersist,
// expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
lg: lg,
cluster: cluster,
}
l.initAndRecover()

Expand Down Expand Up @@ -351,6 +366,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL
if le.shouldPersistCheckpoints() {
l.persistTo(le.b)
}
if le.isPrimary() {
// schedule the next checkpoint as needed
le.scheduleCheckpointIfNeeded(l)
Expand All @@ -359,6 +377,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
return nil
}

func (le *lessor) shouldPersistCheckpoints() bool {
cv := le.cluster.Version()
return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6))
}

func greaterOrEqual(first, second semver.Version) bool {
return !first.LessThan(second)
}

// Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) {
Expand Down Expand Up @@ -446,6 +473,7 @@ func (le *lessor) Promote(extend time.Duration) {
l.refresh(extend)
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.scheduleCheckpointIfNeeded(l)
}

if len(le.leaseMap) < leaseRevokeRate {
Expand Down Expand Up @@ -783,9 +811,10 @@ func (le *lessor) initAndRecover() {
ttl: lpb.TTL,
// itemSet will be filled in when recover key-value pairs
// set expiry to forever, refresh when promoted
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
remainingTTL: lpb.RemainingTTL,
}
}
le.leaseExpiredNotifier.Init()
Expand Down
2 changes: 1 addition & 1 deletion server/lease/lessor_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) {
be, _ := betesting.NewDefaultTmpBackend(t)
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le.SetRangeDeleter(func() TxnDelete {
ftd := &FakeTxnDelete{be.BatchTx()}
ftd.Lock()
Expand Down
Loading

0 comments on commit 170d9b9

Please sign in to comment.