diff --git a/etcdserver/api/capability.go b/etcdserver/api/capability.go index d9175b4783f..768111eafc6 100644 --- a/etcdserver/api/capability.go +++ b/etcdserver/api/capability.go @@ -16,9 +16,7 @@ package api import ( "sync" - "time" - "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/version" "github.com/coreos/go-semver/semver" "github.com/coreos/pkg/capnslog" @@ -43,45 +41,32 @@ var ( "3.0.0": {AuthCapability: true, V3rpcCapability: true}, } - // capLoopOnce ensures we only create one capability monitor goroutine - capLoopOnce sync.Once - enableMapMu sync.RWMutex // enabledMap points to a map in capabilityMaps enabledMap map[Capability]bool + + curVersion *semver.Version ) func init() { enabledMap = make(map[Capability]bool) } -// RunCapabilityLoop checks the cluster version every 500ms and updates -// the enabledMap when the cluster version increased. -func RunCapabilityLoop(s *etcdserver.EtcdServer) { - go capLoopOnce.Do(func() { runCapabilityLoop(s) }) -} - -func runCapabilityLoop(s *etcdserver.EtcdServer) { - stopped := s.StopNotify() - - var pv *semver.Version - for { - if v := s.ClusterVersion(); v != pv { - if pv == nil || (v != nil && pv.LessThan(*v)) { - pv = v - enableMapMu.Lock() - enabledMap = capabilityMaps[pv.String()] - enableMapMu.Unlock() - plog.Infof("enabled capabilities for version %s", version.Cluster(pv.String())) - } - } - - select { - case <-stopped: - return - case <-time.After(500 * time.Millisecond): - } +// UpdateCapability updates the enabledMap when the cluster version increases. +func UpdateCapability(v *semver.Version) { + if v == nil { + // if recovered but version was never set by cluster + return + } + enableMapMu.Lock() + if curVersion != nil && !curVersion.LessThan(*v) { + enableMapMu.Unlock() + return } + curVersion = v + enabledMap = capabilityMaps[curVersion.String()] + enableMapMu.Unlock() + plog.Infof("enabled capabilities for version %s", version.Cluster(v.String())) } func IsCapabilityEnabled(c Capability) bool { diff --git a/etcdserver/api/v2http/client.go b/etcdserver/api/v2http/client.go index af69b480997..492b8b78c57 100644 --- a/etcdserver/api/v2http/client.go +++ b/etcdserver/api/v2http/client.go @@ -130,7 +130,6 @@ func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http mux.Handle(pprofPrefix+"/block", pprof.Handler("block")) } - api.RunCapabilityLoop(server) return requestLogger(mux) } diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 1f020cc06e6..07c65320bd0 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -18,7 +18,6 @@ import ( "crypto/tls" "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/etcdserver/api" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/pkg/capnslog" "google.golang.org/grpc" @@ -47,6 +46,5 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server { pb.RegisterAuthServer(grpcServer, NewAuthServer(s)) pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s)) - api.RunCapabilityLoop(s) return grpcServer } diff --git a/etcdserver/apply_v2.go b/etcdserver/apply_v2.go index 28b1d5a27ca..f278efca88e 100644 --- a/etcdserver/apply_v2.go +++ b/etcdserver/apply_v2.go @@ -19,6 +19,7 @@ import ( "path" "time" + "github.com/coreos/etcd/etcdserver/api" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/pbutil" @@ -86,7 +87,7 @@ func (a *applierV2store) Put(r *pb.Request) Response { } if r.Path == membership.StoreClusterVersionKey() { if a.cluster != nil { - a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) + a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability) } // return an empty response since there is no consumer. return Response{} diff --git a/etcdserver/membership/cluster.go b/etcdserver/membership/cluster.go index fb19c3e60c0..4e757725dc3 100644 --- a/etcdserver/membership/cluster.go +++ b/etcdserver/membership/cluster.go @@ -200,13 +200,14 @@ func (c *RaftCluster) SetBackend(be backend.Backend) { mustCreateBackendBuckets(c.be) } -func (c *RaftCluster) Recover() { +func (c *RaftCluster) Recover(onSet func(*semver.Version)) { c.Lock() defer c.Unlock() c.members, c.removed = membersFromStore(c.store) c.version = clusterVersionFromStore(c.store) mustDetectDowngrade(c.version) + onSet(c.version) for _, m := range c.members { plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id) @@ -356,7 +357,7 @@ func (c *RaftCluster) Version() *semver.Version { return semver.Must(semver.NewVersion(c.version.String())) } -func (c *RaftCluster) SetVersion(ver *semver.Version) { +func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*semver.Version)) { c.Lock() defer c.Unlock() if c.version != nil { @@ -372,6 +373,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version) { if c.be != nil { mustSaveClusterVersionToBackend(c.be, ver) } + onSet(ver) } func (c *RaftCluster) IsReadyToAddNewMember() bool { diff --git a/etcdserver/server.go b/etcdserver/server.go index d2713020dda..ceb0641a1cb 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -31,6 +31,7 @@ import ( "github.com/coreos/etcd/auth" "github.com/coreos/etcd/compactor" "github.com/coreos/etcd/discovery" + "github.com/coreos/etcd/etcdserver/api" "github.com/coreos/etcd/etcdserver/api/v2http/httptypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" @@ -342,7 +343,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { } cl.SetStore(st) cl.SetBackend(be) - cl.Recover() + cl.Recover(api.UpdateCapability) if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { os.RemoveAll(bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) @@ -705,7 +706,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { s.cluster.SetBackend(s.be) plog.Info("recovering cluster configuration...") - s.cluster.Recover() + s.cluster.Recover(api.UpdateCapability) plog.Info("finished recovering cluster configuration") plog.Info("removing old peers from network...")