From 707948420eb74a5a63dab7736f919c71f6afc73f Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 1 Dec 2021 15:24:25 +0100 Subject: [PATCH] server: Require either cluster version v3.6 or --experimental-enable-lease-checkpoint-persist to persist lease remainingTTL To avoid inconsistant behavior during cluster upgrade we are feature gating persistance behind cluster version. This should ensure that all cluster members are upgraded to v3.6 before changing behavior. To allow backporting this fix to v3.5 we are also introducing flag --experimental-enable-lease-checkpoint-persist that will allow for smooth upgrade in v3.5 clusters with this feature enabled. Signed-off-by: Marek Siarkowicz --- clientv3/snapshot/v3_snapshot.go | 2 +- embed/config.go | 21 +- etcdmain/config.go | 4 +- etcdserver/config.go | 4 +- etcdserver/server.go | 2 + integration/cluster.go | 4 + integration/v3_lease_test.go | 14 +- lease/leasehttp/http_test.go | 6 +- lease/lessor.go | 34 +- lease/lessor_bench_test.go | 2 +- lease/lessor_test.go | 125 ++++- server/embed/etcd.go | 860 +++++++++++++++++++++++++++++++ 12 files changed, 1045 insertions(+), 33 deletions(-) create mode 100644 server/embed/etcd.go diff --git a/clientv3/snapshot/v3_snapshot.go b/clientv3/snapshot/v3_snapshot.go index b87976188757..25be6fb4aa3d 100644 --- a/clientv3/snapshot/v3_snapshot.go +++ b/clientv3/snapshot/v3_snapshot.go @@ -391,7 +391,7 @@ func (s *v3Manager) saveDB() error { be := backend.NewDefaultBackend(dbpath) // a lessor never timeouts leases - lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) + lessor := lease.NewLessor(s.lg, be, nil, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) txn := mvs.Write(traceutil.TODO()) diff --git a/embed/config.go b/embed/config.go index 70a311c6cb13..247a4c1bd334 100644 --- a/embed/config.go +++ b/embed/config.go @@ -288,10 +288,15 @@ type Config struct { ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` // ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses (array and map are supported types). ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"` - // ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. - ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` - ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` - ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` + // ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. + ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` + // ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. + // Requires experimental-enable-lease-checkpoint to be enabled. + // Deprecated in v3.6. + // TODO: Delete in v3.7 + ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"` + ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` // ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request // takes more time than this value. ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"` @@ -637,6 +642,14 @@ func (cfg *Config) Validate() error { return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode) } + if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint { + cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist") + } + + if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint { + return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint") + } + return nil } diff --git a/etcdmain/config.go b/etcdmain/config.go index 751257c349be..209ec89663db 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -257,7 +257,9 @@ func newConfig() *config { fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.") fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)") - fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") + fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.") + // TODO: delete in v3.7 + fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.") diff --git a/etcdserver/config.go b/etcdserver/config.go index 70208d2f2236..e3084864dc0a 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -158,10 +158,12 @@ type ServerConfig struct { ForceNewCluster bool - // EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. + // EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. EnableLeaseCheckpoint bool // LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints. LeaseCheckpointInterval time.Duration + // LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. + LeaseCheckpointPersist bool EnableGRPCGateway bool diff --git a/etcdserver/server.go b/etcdserver/server.go index 0733fd285473..590eb95d3a4b 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -543,9 +543,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { srv.lessor = lease.NewLessor( srv.getLogger(), srv.be, + srv.cluster, lease.LessorConfig{ MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval, + CheckpointPersist: cfg.LeaseCheckpointPersist, ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), }) diff --git a/integration/cluster.go b/integration/cluster.go index 121414e14370..11d44c965417 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -152,6 +152,7 @@ type ClusterConfig struct { EnableLeaseCheckpoint bool LeaseCheckpointInterval time.Duration + LeaseCheckpointPersist bool WatchProgressNotifyInterval time.Duration CorruptCheckTime time.Duration @@ -298,6 +299,7 @@ func (c *cluster) mustNewMember(t testing.TB) *member { clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, useIP: c.cfg.UseIP, enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, + leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist, leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, CorruptCheckTime: c.cfg.CorruptCheckTime, @@ -590,6 +592,7 @@ type memberConfig struct { useIP bool enableLeaseCheckpoint bool leaseCheckpointInterval time.Duration + leaseCheckpointPersist bool WatchProgressNotifyInterval time.Duration CorruptCheckTime time.Duration } @@ -684,6 +687,7 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member { m.useIP = mcfg.useIP m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval + m.LeaseCheckpointPersist = mcfg.leaseCheckpointPersist m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 2c2ba4958fd5..cffb56e66ed9 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -318,6 +318,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { checkpointingEnabled bool ttl time.Duration checkpointingInterval time.Duration + checkpointingPersist bool leaderChanges int clusterSize int expectTTLIsGT time.Duration @@ -340,14 +341,24 @@ func TestV3LeaseCheckpoint(t *testing.T) { expectTTLIsLT: 290 * time.Second, }, { - name: "Checkpointing enabled 10s, lease TTL is preserved after cluster restart", + name: "Checkpointing enabled 10s with persist, lease TTL is preserved after cluster restart", ttl: 300 * time.Second, checkpointingEnabled: true, checkpointingInterval: 10 * time.Second, + checkpointingPersist: true, leaderChanges: 1, clusterSize: 1, expectTTLIsLT: 290 * time.Second, }, + { + name: "Checkpointing enabled 10s, lease TTL is reset after restart", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 1, + clusterSize: 1, + expectTTLIsGT: 298 * time.Second, + }, { // Checking if checkpointing continues after the first leader change. name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes", @@ -365,6 +376,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { Size: tc.clusterSize, EnableLeaseCheckpoint: tc.checkpointingEnabled, LeaseCheckpointInterval: tc.checkpointingInterval, + LeaseCheckpointPersist: tc.checkpointingPersist, } clus := NewClusterV3(t, config) defer clus.Terminate(t) diff --git a/lease/leasehttp/http_test.go b/lease/leasehttp/http_test.go index 0802515d5103..dc9ee0c8ef83 100644 --- a/lease/leasehttp/http_test.go +++ b/lease/leasehttp/http_test.go @@ -33,7 +33,7 @@ func TestRenewHTTP(t *testing.T) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -58,7 +58,7 @@ func TestTimeToLiveHTTP(t *testing.T) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -100,7 +100,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { diff --git a/lease/lessor.go b/lease/lessor.go index a1ff9a9a0386..9ad8f137d53b 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/coreos/go-semver/semver" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease/leasepb" "go.etcd.io/etcd/mvcc/backend" @@ -37,6 +38,8 @@ const NoLease = LeaseID(0) // MaxLeaseTTL is the maximum lease TTL value const MaxLeaseTTL = 9000000000 +var v3_6 = semver.Version{Major: 3, Minor: 6} + var ( forever = time.Time{} @@ -182,19 +185,29 @@ type lessor struct { checkpointInterval time.Duration // the interval to check if the expired lease is revoked expiredLeaseRetryInterval time.Duration + // whether lessor should always persist remaining TTL (always enabled in v3.6). + checkpointPersist bool + // cluster is used to adapt lessor logic based on cluster version + cluster cluster +} + +type cluster interface { + // Version is the cluster-wide minimum major.minor version. + Version() *semver.Version } type LessorConfig struct { MinLeaseTTL int64 CheckpointInterval time.Duration ExpiredLeasesRetryInterval time.Duration + CheckpointPersist bool } -func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { - return newLessor(lg, b, cfg) +func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor { + return newLessor(lg, b, cluster, cfg) } -func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { +func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval if checkpointInterval == 0 { @@ -212,11 +225,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { minLeaseTTL: cfg.MinLeaseTTL, checkpointInterval: checkpointInterval, expiredLeaseRetryInterval: expiredLeaseRetryInterval, + checkpointPersist: cfg.CheckpointPersist, // expiredC is a small buffered chan to avoid unnecessary blocking. expiredC: make(chan []*Lease, 16), stopC: make(chan struct{}), doneC: make(chan struct{}), lg: lg, + cluster: cluster, } l.initAndRecover() @@ -353,7 +368,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { if l, ok := le.leaseMap[id]; ok { // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry l.remainingTTL = remainingTTL - l.persistTo(le.b) + if le.shouldPersistCheckpoints() { + l.persistTo(le.b) + } if le.isPrimary() { // schedule the next checkpoint as needed le.scheduleCheckpointIfNeeded(l) @@ -362,6 +379,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { return nil } +func (le *lessor) shouldPersistCheckpoints() bool { + cv := le.cluster.Version() + return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6)) +} + +func greaterOrEqual(first, second semver.Version) bool { + return !first.LessThan(second) +} + // Renew renews an existing lease. If the given lease does not exist or // has expired, an error will be returned. func (le *lessor) Renew(id LeaseID) (int64, error) { diff --git a/lease/lessor_bench_test.go b/lease/lessor_bench_test.go index cf51aee70ae3..da115c093884 100644 --- a/lease/lessor_bench_test.go +++ b/lease/lessor_bench_test.go @@ -69,7 +69,7 @@ func setUp() (le *lessor, tearDown func()) { be, tmpPath := backend.NewDefaultTmpBackend() // MinLeaseTTL is negative, so we can grant expired lease in benchmark. // ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease. - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) + le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) le.SetRangeDeleter(func() TxnDelete { ftd := &FakeTxnDelete{be.BatchTx()} ftd.Lock() diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 225e7f4419bf..264a1aa18199 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -27,10 +27,12 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" + "github.com/coreos/go-semver/semver" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease/leasepb" "go.etcd.io/etcd/mvcc/backend" + "go.etcd.io/etcd/pkg/testutil" + "go.etcd.io/etcd/version" "go.uber.org/zap" ) @@ -48,7 +50,7 @@ func TestLessorGrant(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -110,7 +112,7 @@ func TestLeaseConcurrentKeys(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -159,7 +161,7 @@ func TestLessorRevoke(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() var fd *fakeDeleter le.SetRangeDeleter(func() TxnDelete { @@ -212,7 +214,7 @@ func TestLessorRenew(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -245,7 +247,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) @@ -294,7 +296,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) ttl := int64(10) for i := 1; i <= leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { @@ -313,7 +315,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -343,7 +345,7 @@ func TestLessorDetach(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -384,7 +386,7 @@ func TestLessorRecover(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l1, err1 := le.Grant(1, 10) l2, err2 := le.Grant(2, 20) @@ -393,7 +395,7 @@ func TestLessorRecover(t *testing.T) { } // Create a new lessor with the same backend - nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + nle := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer nle.Stop() nl1 := nle.Lookup(l1.ID) if nl1 == nil || nl1.ttl != l1.ttl { @@ -414,7 +416,7 @@ func TestLessorExpire(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -467,7 +469,7 @@ func TestLessorExpireAndDemote(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -516,7 +518,7 @@ func TestLessorMaxTTL(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() _, err := le.Grant(1, MaxLeaseTTL+1) @@ -532,7 +534,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) defer le.Stop() le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) @@ -566,7 +568,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l, err := le.Grant(1, 10) if err != nil { @@ -652,7 +654,76 @@ func TestLeaseBackend(t *testing.T) { defer be2.Close() leases := unsafeGetAllLeases(be2.ReadTx()) - assert.Equal(t, tc.want, leases) + testutil.AssertEqual(t, tc.want, leases) + }) + } +} + +func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) { + const ttl int64 = 10 + const checkpointTTL int64 = 5 + + tcs := []struct { + name string + cluster cluster + checkpointPersist bool + expectRemainingTTL int64 + }{ + { + name: "Etcd v3.6 and newer persist remainingTTL on checkpoint", + cluster: clusterV3_6(), + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set", + cluster: clusterLatest(), + checkpointPersist: true, + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set", + cluster: clusterNil(), + checkpointPersist: true, + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd v3.5 and older reset remainingTTL on checkpoint", + cluster: clusterLatest(), + expectRemainingTTL: ttl, + }, + { + name: "Etcd with version unknown fallbacks to v3.5 behavior", + cluster: clusterNil(), + expectRemainingTTL: ttl, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + lg := zap.NewNop() + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + cfg := LessorConfig{MinLeaseTTL: minLeaseTTL} + cfg.CheckpointPersist = tc.checkpointPersist + le := newLessor(lg, be, tc.cluster, cfg) + l, err := le.Grant(2, ttl) + if err != nil { + t.Fatal(err) + } + if l.RemainingTTL() != ttl { + t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl) + } + le.Checkpoint(2, checkpointTTL) + if l.RemainingTTL() != checkpointTTL { + t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL) + } + le.Stop() + le2 := newLessor(lg, be, clusterV3_6(), cfg) + l = le2.Lookup(2) + if l.RemainingTTL() != tc.expectRemainingTTL { + t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL) + } }) } } @@ -694,3 +765,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) { bcfg.Path = filepath.Join(tmpPath, "be") return tmpPath, backend.New(bcfg) } + +func clusterV3_6() cluster { + return fakeCluster{semver.New("3.6.0")} +} + +func clusterLatest() cluster { + return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")} +} + +func clusterNil() cluster { + return fakeCluster{} +} + +type fakeCluster struct { + version *semver.Version +} + +func (c fakeCluster) Version() *semver.Version { + return c.version +} diff --git a/server/embed/etcd.go b/server/embed/etcd.go new file mode 100644 index 000000000000..db095edef7fe --- /dev/null +++ b/server/embed/etcd.go @@ -0,0 +1,860 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package embed + +import ( + "context" + "crypto/tls" + "fmt" + "io/ioutil" + defaultLog "log" + "net" + "net/http" + "net/url" + "runtime" + "sort" + "strconv" + "sync" + "time" + + "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/client/pkg/v3/transport" + "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/pkg/v3/debugutil" + runtimeutil "go.etcd.io/etcd/pkg/v3/runtime" + "go.etcd.io/etcd/server/v3/config" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp" + "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2http" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2v3" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" + "go.etcd.io/etcd/server/v3/verify" + + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/soheilhy/cmux" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/exporters/otlp" + "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/semconv" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" +) + +const ( + // internal fd usage includes disk usage and transport usage. + // To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs + // at most 2 to read/lock/write WALs. One case that it needs to 2 is to + // read all logs after some snapshot index, which locates at the end of + // the second last and the head of the last. For purging, it needs to read + // directory, so it needs 1. For fd monitor, it needs 1. + // For transport, rafthttp builds two long-polling connections and at most + // four temporary connections with each member. There are at most 9 members + // in a cluster, so it should reserve 96. + // For the safety, we set the total reserved number to 150. + reservedInternalFDNum = 150 +) + +// Etcd contains a running etcd server and its listeners. +type Etcd struct { + Peers []*peerListener + Clients []net.Listener + // a map of contexts for the servers that serves client requests. + sctxs map[string]*serveCtx + metricsListeners []net.Listener + + tracingExporterShutdown func() + + Server *etcdserver.EtcdServer + + cfg Config + stopc chan struct{} + errc chan error + + closeOnce sync.Once +} + +type peerListener struct { + net.Listener + serve func() error + close func(context.Context) error +} + +// StartEtcd launches the etcd server and HTTP handlers for client/server communication. +// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait +// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use. +func StartEtcd(inCfg *Config) (e *Etcd, err error) { + if err = inCfg.Validate(); err != nil { + return nil, err + } + serving := false + e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})} + cfg := &e.cfg + defer func() { + if e == nil || err == nil { + return + } + if !serving { + // errored before starting gRPC server for serveCtx.serversC + for _, sctx := range e.sctxs { + close(sctx.serversC) + } + } + e.Close() + e = nil + }() + + if !cfg.SocketOpts.Empty() { + cfg.logger.Info( + "configuring socket options", + zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress), + zap.Bool("reuse-port", cfg.SocketOpts.ReusePort), + ) + } + e.cfg.logger.Info( + "configuring peer listeners", + zap.Strings("listen-peer-urls", e.cfg.getLPURLs()), + ) + if e.Peers, err = configurePeerListeners(cfg); err != nil { + return e, err + } + + e.cfg.logger.Info( + "configuring client listeners", + zap.Strings("listen-client-urls", e.cfg.getLCURLs()), + ) + if e.sctxs, err = configureClientListeners(cfg); err != nil { + return e, err + } + + for _, sctx := range e.sctxs { + e.Clients = append(e.Clients, sctx.l) + } + + var ( + urlsmap types.URLsMap + token string + ) + memberInitialized := true + if !isMemberInitialized(cfg) { + memberInitialized = false + urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd") + if err != nil { + return e, fmt.Errorf("error setting up initial cluster: %v", err) + } + } + + // AutoCompactionRetention defaults to "0" if not set. + if len(cfg.AutoCompactionRetention) == 0 { + cfg.AutoCompactionRetention = "0" + } + autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention) + if err != nil { + return e, err + } + + backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType) + + srvcfg := config.ServerConfig{ + Name: cfg.Name, + ClientURLs: cfg.ACUrls, + PeerURLs: cfg.APUrls, + DataDir: cfg.Dir, + DedicatedWALDir: cfg.WalDir, + SnapshotCount: cfg.SnapshotCount, + SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries, + MaxSnapFiles: cfg.MaxSnapFiles, + MaxWALFiles: cfg.MaxWalFiles, + InitialPeerURLsMap: urlsmap, + InitialClusterToken: token, + DiscoveryURL: cfg.Durl, + DiscoveryProxy: cfg.Dproxy, + NewCluster: cfg.IsNewCluster(), + PeerTLSInfo: cfg.PeerTLSInfo, + TickMs: cfg.TickMs, + ElectionTicks: cfg.ElectionTicks(), + InitialElectionTickAdvance: cfg.InitialElectionTickAdvance, + AutoCompactionRetention: autoCompactionRetention, + AutoCompactionMode: cfg.AutoCompactionMode, + QuotaBackendBytes: cfg.QuotaBackendBytes, + BackendBatchLimit: cfg.BackendBatchLimit, + BackendFreelistType: backendFreelistType, + BackendBatchInterval: cfg.BackendBatchInterval, + MaxTxnOps: cfg.MaxTxnOps, + MaxRequestBytes: cfg.MaxRequestBytes, + SocketOpts: cfg.SocketOpts, + StrictReconfigCheck: cfg.StrictReconfigCheck, + ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, + AuthToken: cfg.AuthToken, + BcryptCost: cfg.BcryptCost, + TokenTTL: cfg.AuthTokenTTL, + CORS: cfg.CORS, + HostWhitelist: cfg.HostWhitelist, + InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, + CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, + PreVote: cfg.PreVote, + Logger: cfg.logger, + ForceNewCluster: cfg.ForceNewCluster, + EnableGRPCGateway: cfg.EnableGRPCGateway, + ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing, + UnsafeNoFsync: cfg.UnsafeNoFsync, + EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, + LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist, + CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, + WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, + DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, + WarningApplyDuration: cfg.ExperimentalWarningApplyDuration, + ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock, + ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer, + ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes, + V2Deprecation: cfg.V2DeprecationEffective(), + } + + if srvcfg.ExperimentalEnableDistributedTracing { + tctx := context.Background() + tracingExporter, opts, err := e.setupTracing(tctx) + if err != nil { + return e, err + } + if tracingExporter == nil || len(opts) == 0 { + return e, fmt.Errorf("error setting up distributed tracing") + } + e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) } + srvcfg.ExperimentalTracerOptions = opts + } + + print(e.cfg.logger, *cfg, srvcfg, memberInitialized) + + if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { + return e, err + } + + // buffer channel so goroutines on closed connections won't wait forever + e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs)) + + // newly started member ("memberInitialized==false") + // does not need corruption check + if memberInitialized { + if err = e.Server.CheckInitialHashKV(); err != nil { + // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" + // (nothing to close since rafthttp transports have not been started) + + e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err)) + e.Server.Cleanup() + e.Server = nil + return e, err + } + } + e.Server.Start() + + if err = e.servePeers(); err != nil { + return e, err + } + if err = e.serveClients(); err != nil { + return e, err + } + if err = e.serveMetrics(); err != nil { + return e, err + } + + e.cfg.logger.Info( + "now serving peer/client/metrics", + zap.String("local-member-id", e.Server.ID().String()), + zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()), + zap.Strings("listen-peer-urls", e.cfg.getLPURLs()), + zap.Strings("advertise-client-urls", e.cfg.getACURLs()), + zap.Strings("listen-client-urls", e.cfg.getLCURLs()), + zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()), + ) + serving = true + return e, nil +} + +func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized bool) { + cors := make([]string, 0, len(ec.CORS)) + for v := range ec.CORS { + cors = append(cors, v) + } + sort.Strings(cors) + + hss := make([]string, 0, len(ec.HostWhitelist)) + for v := range ec.HostWhitelist { + hss = append(hss, v) + } + sort.Strings(hss) + + quota := ec.QuotaBackendBytes + if quota == 0 { + quota = etcdserver.DefaultQuotaBytes + } + + lg.Info( + "starting an etcd server", + zap.String("etcd-version", version.Version), + zap.String("git-sha", version.GitSHA), + zap.String("go-version", runtime.Version()), + zap.String("go-os", runtime.GOOS), + zap.String("go-arch", runtime.GOARCH), + zap.Int("max-cpu-set", runtime.GOMAXPROCS(0)), + zap.Int("max-cpu-available", runtime.NumCPU()), + zap.Bool("member-initialized", memberInitialized), + zap.String("name", sc.Name), + zap.String("data-dir", sc.DataDir), + zap.String("wal-dir", ec.WalDir), + zap.String("wal-dir-dedicated", sc.DedicatedWALDir), + zap.String("member-dir", sc.MemberDir()), + zap.Bool("force-new-cluster", sc.ForceNewCluster), + zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(sc.TickMs)*time.Millisecond)), + zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)), + zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance), + zap.Uint64("snapshot-count", sc.SnapshotCount), + zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries), + zap.Strings("initial-advertise-peer-urls", ec.getAPURLs()), + zap.Strings("listen-peer-urls", ec.getLPURLs()), + zap.Strings("advertise-client-urls", ec.getACURLs()), + zap.Strings("listen-client-urls", ec.getLCURLs()), + zap.Strings("listen-metrics-urls", ec.getMetricsURLs()), + zap.Strings("cors", cors), + zap.Strings("host-whitelist", hss), + zap.String("initial-cluster", sc.InitialPeerURLsMap.String()), + zap.String("initial-cluster-state", ec.ClusterState), + zap.String("initial-cluster-token", sc.InitialClusterToken), + zap.Int64("quota-size-bytes", quota), + zap.Bool("pre-vote", sc.PreVote), + zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck), + zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()), + zap.String("auto-compaction-mode", sc.AutoCompactionMode), + zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention), + zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()), + zap.String("discovery-url", sc.DiscoveryURL), + zap.String("discovery-proxy", sc.DiscoveryProxy), + zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()), + ) +} + +// Config returns the current configuration. +func (e *Etcd) Config() Config { + return e.cfg +} + +// Close gracefully shuts down all servers/listeners. +// Client requests will be terminated with request timeout. +// After timeout, enforce remaning requests be closed immediately. +func (e *Etcd) Close() { + fields := []zap.Field{ + zap.String("name", e.cfg.Name), + zap.String("data-dir", e.cfg.Dir), + zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()), + zap.Strings("advertise-client-urls", e.cfg.getACURLs()), + } + lg := e.GetLogger() + lg.Info("closing etcd server", fields...) + defer func() { + lg.Info("closed etcd server", fields...) + verify.MustVerifyIfEnabled(verify.Config{ + Logger: lg, + DataDir: e.cfg.Dir, + ExactIndex: false, + }) + lg.Sync() + }() + + e.closeOnce.Do(func() { + close(e.stopc) + }) + + // close client requests with request timeout + timeout := 2 * time.Second + if e.Server != nil { + timeout = e.Server.Cfg.ReqTimeout() + } + for _, sctx := range e.sctxs { + for ss := range sctx.serversC { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + stopServers(ctx, ss) + cancel() + } + } + + for _, sctx := range e.sctxs { + sctx.cancel() + } + + for i := range e.Clients { + if e.Clients[i] != nil { + e.Clients[i].Close() + } + } + + for i := range e.metricsListeners { + e.metricsListeners[i].Close() + } + + // shutdown tracing exporter + if e.tracingExporterShutdown != nil { + e.tracingExporterShutdown() + } + + // close rafthttp transports + if e.Server != nil { + e.Server.Stop() + } + + // close all idle connections in peer handler (wait up to 1-second) + for i := range e.Peers { + if e.Peers[i] != nil && e.Peers[i].close != nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + e.Peers[i].close(ctx) + cancel() + } + } + if e.errc != nil { + close(e.errc) + } +} + +func stopServers(ctx context.Context, ss *servers) { + // first, close the http.Server + ss.http.Shutdown(ctx) + // do not grpc.Server.GracefulStop with TLS enabled etcd server + // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 + // and https://github.com/etcd-io/etcd/issues/8916 + if ss.secure { + ss.grpc.Stop() + return + } + + ch := make(chan struct{}) + go func() { + defer close(ch) + // close listeners to stop accepting new connections, + // will block on any existing transports + ss.grpc.GracefulStop() + }() + + // wait until all pending RPCs are finished + select { + case <-ch: + case <-ctx.Done(): + // took too long, manually close open transports + // e.g. watch streams + ss.grpc.Stop() + + // concurrent GracefulStop should be interrupted + <-ch + } +} + +// Err - return channel used to report errors during etcd run/shutdown. +// Since etcd 3.5 the channel is being closed when the etcd is over. +func (e *Etcd) Err() <-chan error { + return e.errc +} + +func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { + if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil { + return nil, err + } + if err = cfg.PeerSelfCert(); err != nil { + cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err)) + } + if !cfg.PeerTLSInfo.Empty() { + cfg.logger.Info( + "starting with peer TLS", + zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)), + zap.Strings("cipher-suites", cfg.CipherSuites), + ) + } + + peers = make([]*peerListener, len(cfg.LPUrls)) + defer func() { + if err == nil { + return + } + for i := range peers { + if peers[i] != nil && peers[i].close != nil { + cfg.logger.Warn( + "closing peer listener", + zap.String("address", cfg.LPUrls[i].String()), + zap.Error(err), + ) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + peers[i].close(ctx) + cancel() + } + } + }() + + for i, u := range cfg.LPUrls { + if u.Scheme == "http" { + if !cfg.PeerTLSInfo.Empty() { + cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) + } + if cfg.PeerTLSInfo.ClientCertAuth { + cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String())) + } + } + peers[i] = &peerListener{close: func(context.Context) error { return nil }} + peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme, + transport.WithTLSInfo(&cfg.PeerTLSInfo), + transport.WithSocketOpts(&cfg.SocketOpts), + transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout), + ) + if err != nil { + return nil, err + } + // once serve, overwrite with 'http.Server.Shutdown' + peers[i].close = func(context.Context) error { + return peers[i].Listener.Close() + } + } + return peers, nil +} + +// configure peer handlers after rafthttp.Transport started +func (e *Etcd) servePeers() (err error) { + ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server) + var peerTLScfg *tls.Config + if !e.cfg.PeerTLSInfo.Empty() { + if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil { + return err + } + } + + for _, p := range e.Peers { + u := p.Listener.Addr().String() + gs := v3rpc.Server(e.Server, peerTLScfg, nil) + m := cmux.New(p.Listener) + go gs.Serve(m.Match(cmux.HTTP2())) + srv := &http.Server{ + Handler: grpcHandlerFunc(gs, ph), + ReadTimeout: 5 * time.Minute, + ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error + } + go srv.Serve(m.Match(cmux.Any())) + p.serve = func() error { + e.cfg.logger.Info( + "cmux::serve", + zap.String("address", u), + ) + return m.Serve() + } + p.close = func(ctx context.Context) error { + // gracefully shutdown http.Server + // close open listeners, idle connections + // until context cancel or time-out + e.cfg.logger.Info( + "stopping serving peer traffic", + zap.String("address", u), + ) + stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv}) + e.cfg.logger.Info( + "stopped serving peer traffic", + zap.String("address", u), + ) + m.Close() + return nil + } + } + + // start peer servers in a goroutine + for _, pl := range e.Peers { + go func(l *peerListener) { + u := l.Addr().String() + e.cfg.logger.Info( + "serving peer traffic", + zap.String("address", u), + ) + e.errHandler(l.serve()) + }(pl) + } + return nil +} + +func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { + if err = updateCipherSuites(&cfg.ClientTLSInfo, cfg.CipherSuites); err != nil { + return nil, err + } + if err = cfg.ClientSelfCert(); err != nil { + cfg.logger.Fatal("failed to get client self-signed certs", zap.Error(err)) + } + if cfg.EnablePprof { + cfg.logger.Info("pprof is enabled", zap.String("path", debugutil.HTTPPrefixPProf)) + } + + sctxs = make(map[string]*serveCtx) + for _, u := range cfg.LCUrls { + sctx := newServeCtx(cfg.logger) + if u.Scheme == "http" || u.Scheme == "unix" { + if !cfg.ClientTLSInfo.Empty() { + cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String())) + } + if cfg.ClientTLSInfo.ClientCertAuth { + cfg.logger.Warn("scheme is HTTP while --client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("client-url", u.String())) + } + } + if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() { + return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String()) + } + + network := "tcp" + addr := u.Host + if u.Scheme == "unix" || u.Scheme == "unixs" { + network = "unix" + addr = u.Host + u.Path + } + sctx.network = network + + sctx.secure = u.Scheme == "https" || u.Scheme == "unixs" + sctx.insecure = !sctx.secure + if oldctx := sctxs[addr]; oldctx != nil { + oldctx.secure = oldctx.secure || sctx.secure + oldctx.insecure = oldctx.insecure || sctx.insecure + continue + } + + if sctx.l, err = transport.NewListenerWithOpts(addr, u.Scheme, + transport.WithSocketOpts(&cfg.SocketOpts), + transport.WithSkipTLSInfoCheck(true), + ); err != nil { + return nil, err + } + // net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking + // hosts that disable ipv6. So, use the address given by the user. + sctx.addr = addr + + if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil { + if fdLimit <= reservedInternalFDNum { + cfg.logger.Fatal( + "file descriptor limit of etcd process is too low; please set higher", + zap.Uint64("limit", fdLimit), + zap.Int("recommended-limit", reservedInternalFDNum), + ) + } + sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum)) + } + + if network == "tcp" { + if sctx.l, err = transport.NewKeepAliveListener(sctx.l, network, nil); err != nil { + return nil, err + } + } + + defer func(u url.URL) { + if err == nil { + return + } + sctx.l.Close() + cfg.logger.Warn( + "closing peer listener", + zap.String("address", u.Host), + zap.Error(err), + ) + }(u) + for k := range cfg.UserHandlers { + sctx.userHandlers[k] = cfg.UserHandlers[k] + } + sctx.serviceRegister = cfg.ServiceRegister + if cfg.EnablePprof || cfg.LogLevel == "debug" { + sctx.registerPprof() + } + if cfg.LogLevel == "debug" { + sctx.registerTrace() + } + sctxs[addr] = sctx + } + return sctxs, nil +} + +func (e *Etcd) serveClients() (err error) { + if !e.cfg.ClientTLSInfo.Empty() { + e.cfg.logger.Info( + "starting with client TLS", + zap.String("tls-info", fmt.Sprintf("%+v", e.cfg.ClientTLSInfo)), + zap.Strings("cipher-suites", e.cfg.CipherSuites), + ) + } + + // Start a client server goroutine for each listen address + var h http.Handler + if e.Config().EnableV2 { + if e.Config().V2DeprecationEffective().IsAtLeast(config.V2_DEPR_1_WRITE_ONLY) { + return fmt.Errorf("--enable-v2 and --v2-deprecation=%s are mutually exclusive", e.Config().V2DeprecationEffective()) + } + e.cfg.logger.Warn("Flag `enable-v2` is deprecated and will get removed in etcd 3.6.") + if len(e.Config().ExperimentalEnableV2V3) > 0 { + e.cfg.logger.Warn("Flag `experimental-enable-v2v3` is deprecated and will get removed in etcd 3.6.") + srv := v2v3.NewServer(e.cfg.logger, v3client.New(e.Server), e.cfg.ExperimentalEnableV2V3) + h = v2http.NewClientHandler(e.GetLogger(), srv, e.Server.Cfg.ReqTimeout()) + } else { + h = v2http.NewClientHandler(e.GetLogger(), e.Server, e.Server.Cfg.ReqTimeout()) + } + } else { + mux := http.NewServeMux() + etcdhttp.HandleBasic(e.cfg.logger, mux, e.Server) + etcdhttp.HandleMetricsHealthForV3(e.cfg.logger, mux, e.Server) + h = mux + } + + gopts := []grpc.ServerOption{} + if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) { + gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: e.cfg.GRPCKeepAliveMinTime, + PermitWithoutStream: false, + })) + } + if e.cfg.GRPCKeepAliveInterval > time.Duration(0) && + e.cfg.GRPCKeepAliveTimeout > time.Duration(0) { + gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: e.cfg.GRPCKeepAliveInterval, + Timeout: e.cfg.GRPCKeepAliveTimeout, + })) + } + + // start client servers in each goroutine + for _, sctx := range e.sctxs { + go func(s *serveCtx) { + e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...)) + }(sctx) + } + return nil +} + +func (e *Etcd) serveMetrics() (err error) { + if e.cfg.Metrics == "extensive" { + grpc_prometheus.EnableHandlingTimeHistogram() + } + + if len(e.cfg.ListenMetricsUrls) > 0 { + metricsMux := http.NewServeMux() + etcdhttp.HandleMetricsHealthForV3(e.cfg.logger, metricsMux, e.Server) + + for _, murl := range e.cfg.ListenMetricsUrls { + tlsInfo := &e.cfg.ClientTLSInfo + if murl.Scheme == "http" { + tlsInfo = nil + } + ml, err := transport.NewListenerWithOpts(murl.Host, murl.Scheme, + transport.WithTLSInfo(tlsInfo), + transport.WithSocketOpts(&e.cfg.SocketOpts), + ) + if err != nil { + return err + } + e.metricsListeners = append(e.metricsListeners, ml) + go func(u url.URL, ln net.Listener) { + e.cfg.logger.Info( + "serving metrics", + zap.String("address", u.String()), + ) + e.errHandler(http.Serve(ln, metricsMux)) + }(murl, ml) + } + } + return nil +} + +func (e *Etcd) errHandler(err error) { + select { + case <-e.stopc: + return + default: + } + select { + case <-e.stopc: + case e.errc <- err: + } +} + +// GetLogger returns the logger. +func (e *Etcd) GetLogger() *zap.Logger { + e.cfg.loggerMu.RLock() + l := e.cfg.logger + e.cfg.loggerMu.RUnlock() + return l +} + +func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) { + h, err := strconv.Atoi(retention) + if err == nil && h >= 0 { + switch mode { + case CompactorModeRevision: + ret = time.Duration(int64(h)) + case CompactorModePeriodic: + ret = time.Duration(int64(h)) * time.Hour + } + } else { + // periodic compaction + ret, err = time.ParseDuration(retention) + if err != nil { + return 0, fmt.Errorf("error parsing CompactionRetention: %v", err) + } + } + return ret, nil +} + +func (e *Etcd) setupTracing(ctx context.Context) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) { + exporter, err = otlp.NewExporter(ctx, + otlpgrpc.NewDriver( + otlpgrpc.WithEndpoint(e.cfg.ExperimentalDistributedTracingAddress), + otlpgrpc.WithInsecure(), + )) + if err != nil { + return nil, nil, err + } + res := resource.NewWithAttributes( + semconv.ServiceNameKey.String(e.cfg.ExperimentalDistributedTracingServiceName), + ) + // As Tracing service Instance ID must be unique, it should + // never use the empty default string value, so we only set it + // if it's a non empty string. + if e.cfg.ExperimentalDistributedTracingServiceInstanceID != "" { + resWithIDKey := resource.NewWithAttributes( + (semconv.ServiceInstanceIDKey.String(e.cfg.ExperimentalDistributedTracingServiceInstanceID)), + ) + // Merge resources to combine into a new + // resource in case of duplicates. + res = resource.Merge(res, resWithIDKey) + } + + options = append(options, + otelgrpc.WithPropagators( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + ), + otelgrpc.WithTracerProvider( + tracesdk.NewTracerProvider( + tracesdk.WithBatcher(exporter), + tracesdk.WithResource(res), + ), + ), + ) + + e.cfg.logger.Info( + "distributed tracing enabled", + zap.String("distributed-tracing-address", e.cfg.ExperimentalDistributedTracingAddress), + zap.String("distributed-tracing-service-name", e.cfg.ExperimentalDistributedTracingServiceName), + zap.String("distributed-tracing-service-instance-id", e.cfg.ExperimentalDistributedTracingServiceInstanceID), + ) + + return exporter, options, err +}