diff --git a/embed/config.go b/embed/config.go index 719d0680d0f7..5e51c314b1c9 100644 --- a/embed/config.go +++ b/embed/config.go @@ -153,6 +153,7 @@ func NewConfig() *Config { ACUrls: []url.URL{*acurl}, ClusterState: ClusterStateFlagNew, InitialClusterToken: "etcd-cluster", + StrictReconfigCheck: true, } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) return cfg diff --git a/etcdserver/server.go b/etcdserver/server.go index c0732b883fcf..56477c161c60 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -65,6 +65,10 @@ const ( StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" + // HealthInterval is the minimum time the cluster should be healthy + // before accepting add member requests. + HealthInterval = 5 * time.Second + purgeFileInterval = 30 * time.Second // monitorVersionInterval should be smaller than the timeout // on the connection. Or we will not be able to reuse the connection @@ -814,9 +818,12 @@ func (s *EtcdServer) LeaderStats() []byte { func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) error { - if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() { + if s.Cfg.StrictReconfigCheck && + (!s.cluster.IsReadyToAddNewMember() || + !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.Members())) { // If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd. // In such a case adding a new member is allowed unconditionally + plog.Warningf("not ready to reconfigure, rejecting member add %+v", memb) return ErrNotEnoughStartedMembers } diff --git a/etcdserver/util.go b/etcdserver/util.go index 32c161743038..5a3fd81ee207 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -40,3 +40,14 @@ func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote ty t := transport.ActiveSince(remote) return !t.IsZero() && t.Before(since) } + +// isConnectedFullySince checks whether the local member is connected to all +// members in the cluster since the given time. +func isConnectedFullySince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool { + for _, m := range members { + if m.ID != self && !isConnectedSince(transport, since, m.ID) { + return false + } + } + return true +} diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 1ecc3cd7959a..37ae8704992a 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/coreos/etcd/client" + "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/pkg/testutil" "golang.org/x/net/context" @@ -346,6 +347,44 @@ func TestIssue3699(t *testing.T) { cancel() } +// TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members. +func TestRejectUnhealthyAdd(t *testing.T) { + defer testutil.AfterTest(t) + c := NewCluster(t, 3) + for _, m := range c.Members { + m.ServerConfig.StrictReconfigCheck = true + } + c.Launch(t) + defer c.Terminate(t) + + // make cluster unhealthy and wait for downed peer + c.Members[0].Stop(t) + c.WaitLeader(t) + + // all attempts to add member should fail + for i := 1; i < len(c.Members); i++ { + if err := c.addMemberByURL(t, c.URL(i), "unix://foo:12345"); err == nil { + t.Fatalf("should have failed adding peer") + } + } + + // make cluster healthy + c.Members[0].Restart(t) + c.WaitLeader(t) + time.Sleep(etcdserver.HealthInterval) + + // add member should succeed now that it's healthy + var err error + for i := 1; i < len(c.Members); i++ { + if err = c.addMemberByURL(t, c.URL(i), "unix://foo:12345"); err == nil { + break + } + } + if err != nil { + t.Fatalf("should have added peer to healthy cluster (%v)", err) + } +} + // clusterMustProgress ensures that cluster can make progress. It creates // a random key first, and check the new key could be got from all client urls // of the cluster.