Skip to content

Commit

Permalink
server: Require either cluster version v3.6 or --experimental-enable-…
Browse files Browse the repository at this point in the history
…lease-checkpoint-persist to persist lease remainingTTL

To avoid inconsistant behavior during cluster upgrade we are feature
gating persistance behind cluster version. This should ensure that
all cluster members are upgraded to v3.6 before changing behavior.

To allow backporting this fix to v3.5 we are also introducing flag
--experimental-enable-lease-checkpoint-persist that will allow for
smooth upgrade in v3.5 clusters with this feature enabled.

Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Jul 21, 2022
1 parent 8d83691 commit 7079484
Show file tree
Hide file tree
Showing 12 changed files with 1,045 additions and 33 deletions.
2 changes: 1 addition & 1 deletion clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (s *v3Manager) saveDB() error {
be := backend.NewDefaultBackend(dbpath)

// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
lessor := lease.NewLessor(s.lg, be, nil, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})

mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
txn := mvs.Write(traceutil.TODO())
Expand Down
21 changes: 17 additions & 4 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,15 @@ type Config struct {
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
// ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses (array and map are supported types).
ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
// Requires experimental-enable-lease-checkpoint to be enabled.
// Deprecated in v3.6.
// TODO: Delete in v3.7
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
Expand Down Expand Up @@ -637,6 +642,14 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
}

if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint {
cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist")
}

if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
Expand Down
4 changes: 3 additions & 1 deletion etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,12 @@ type ServerConfig struct {

ForceNewCluster bool

// EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
// EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
EnableLeaseCheckpoint bool
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
LeaseCheckpointInterval time.Duration
// LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
LeaseCheckpointPersist bool

EnableGRPCGateway bool

Expand Down
2 changes: 2 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
srv.lessor = lease.NewLessor(
srv.getLogger(),
srv.be,
srv.cluster,
lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
CheckpointPersist: cfg.LeaseCheckpointPersist,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})

Expand Down
4 changes: 4 additions & 0 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type ClusterConfig struct {

EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration
LeaseCheckpointPersist bool

WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
Expand Down Expand Up @@ -298,6 +299,7 @@ func (c *cluster) mustNewMember(t testing.TB) *member {
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
useIP: c.cfg.UseIP,
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
CorruptCheckTime: c.cfg.CorruptCheckTime,
Expand Down Expand Up @@ -590,6 +592,7 @@ type memberConfig struct {
useIP bool
enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration
leaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
}
Expand Down Expand Up @@ -684,6 +687,7 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member {
m.useIP = mcfg.useIP
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
m.LeaseCheckpointPersist = mcfg.leaseCheckpointPersist

m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval

Expand Down
14 changes: 13 additions & 1 deletion integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func TestV3LeaseCheckpoint(t *testing.T) {
checkpointingEnabled bool
ttl time.Duration
checkpointingInterval time.Duration
checkpointingPersist bool
leaderChanges int
clusterSize int
expectTTLIsGT time.Duration
Expand All @@ -340,14 +341,24 @@ func TestV3LeaseCheckpoint(t *testing.T) {
expectTTLIsLT: 290 * time.Second,
},
{
name: "Checkpointing enabled 10s, lease TTL is preserved after cluster restart",
name: "Checkpointing enabled 10s with persist, lease TTL is preserved after cluster restart",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
checkpointingPersist: true,
leaderChanges: 1,
clusterSize: 1,
expectTTLIsLT: 290 * time.Second,
},
{
name: "Checkpointing enabled 10s, lease TTL is reset after restart",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 1,
clusterSize: 1,
expectTTLIsGT: 298 * time.Second,
},
{
// Checking if checkpointing continues after the first leader change.
name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes",
Expand All @@ -365,6 +376,7 @@ func TestV3LeaseCheckpoint(t *testing.T) {
Size: tc.clusterSize,
EnableLeaseCheckpoint: tc.checkpointingEnabled,
LeaseCheckpointInterval: tc.checkpointingInterval,
LeaseCheckpointPersist: tc.checkpointingPersist,
}
clus := NewClusterV3(t, config)
defer clus.Terminate(t)
Expand Down
6 changes: 3 additions & 3 deletions lease/leasehttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestRenewHTTP(t *testing.T) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand All @@ -58,7 +58,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down Expand Up @@ -100,7 +100,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down
34 changes: 30 additions & 4 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease/leasepb"
"go.etcd.io/etcd/mvcc/backend"
Expand All @@ -37,6 +38,8 @@ const NoLease = LeaseID(0)
// MaxLeaseTTL is the maximum lease TTL value
const MaxLeaseTTL = 9000000000

var v3_6 = semver.Version{Major: 3, Minor: 6}

var (
forever = time.Time{}

Expand Down Expand Up @@ -182,19 +185,29 @@ type lessor struct {
checkpointInterval time.Duration
// the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration
// whether lessor should always persist remaining TTL (always enabled in v3.6).
checkpointPersist bool
// cluster is used to adapt lessor logic based on cluster version
cluster cluster
}

type cluster interface {
// Version is the cluster-wide minimum major.minor version.
Version() *semver.Version
}

type LessorConfig struct {
MinLeaseTTL int64
CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration
CheckpointPersist bool
}

func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg)
func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
return newLessor(lg, b, cluster, cfg)
}

func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 {
Expand All @@ -212,11 +225,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
checkpointPersist: cfg.CheckpointPersist,
// expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
lg: lg,
cluster: cluster,
}
l.initAndRecover()

Expand Down Expand Up @@ -353,7 +368,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL
l.persistTo(le.b)
if le.shouldPersistCheckpoints() {
l.persistTo(le.b)
}
if le.isPrimary() {
// schedule the next checkpoint as needed
le.scheduleCheckpointIfNeeded(l)
Expand All @@ -362,6 +379,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
return nil
}

func (le *lessor) shouldPersistCheckpoints() bool {
cv := le.cluster.Version()
return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6))
}

func greaterOrEqual(first, second semver.Version) bool {
return !first.LessThan(second)
}

// Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) {
Expand Down
2 changes: 1 addition & 1 deletion lease/lessor_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func setUp() (le *lessor, tearDown func()) {
be, tmpPath := backend.NewDefaultTmpBackend()
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le.SetRangeDeleter(func() TxnDelete {
ftd := &FakeTxnDelete{be.BatchTx()}
ftd.Lock()
Expand Down
Loading

0 comments on commit 7079484

Please sign in to comment.