Skip to content

Commit

Permalink
Merge pull request #6045 from heyitsanthony/fix-version-race
Browse files Browse the repository at this point in the history
etcdserver, api, membership: don't race on setting version
  • Loading branch information
Anthony Romano authored Jul 27, 2016
2 parents a75688b + de2c3ec commit 13c2d32
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 39 deletions.
47 changes: 16 additions & 31 deletions etcdserver/api/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ package api

import (
"sync"
"time"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/version"
"github.com/coreos/go-semver/semver"
"github.com/coreos/pkg/capnslog"
Expand All @@ -43,45 +41,32 @@ var (
"3.0.0": {AuthCapability: true, V3rpcCapability: true},
}

// capLoopOnce ensures we only create one capability monitor goroutine
capLoopOnce sync.Once

enableMapMu sync.RWMutex
// enabledMap points to a map in capabilityMaps
enabledMap map[Capability]bool

curVersion *semver.Version
)

func init() {
enabledMap = make(map[Capability]bool)
}

// RunCapabilityLoop checks the cluster version every 500ms and updates
// the enabledMap when the cluster version increased.
func RunCapabilityLoop(s *etcdserver.EtcdServer) {
go capLoopOnce.Do(func() { runCapabilityLoop(s) })
}

func runCapabilityLoop(s *etcdserver.EtcdServer) {
stopped := s.StopNotify()

var pv *semver.Version
for {
if v := s.ClusterVersion(); v != pv {
if pv == nil || (v != nil && pv.LessThan(*v)) {
pv = v
enableMapMu.Lock()
enabledMap = capabilityMaps[pv.String()]
enableMapMu.Unlock()
plog.Infof("enabled capabilities for version %s", version.Cluster(pv.String()))
}
}

select {
case <-stopped:
return
case <-time.After(500 * time.Millisecond):
}
// UpdateCapability updates the enabledMap when the cluster version increases.
func UpdateCapability(v *semver.Version) {
if v == nil {
// if recovered but version was never set by cluster
return
}
enableMapMu.Lock()
if curVersion != nil && !curVersion.LessThan(*v) {
enableMapMu.Unlock()
return
}
curVersion = v
enabledMap = capabilityMaps[curVersion.String()]
enableMapMu.Unlock()
plog.Infof("enabled capabilities for version %s", version.Cluster(v.String()))
}

func IsCapabilityEnabled(c Capability) bool {
Expand Down
1 change: 0 additions & 1 deletion etcdserver/api/v2http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http
mux.Handle(pprofPrefix+"/block", pprof.Handler("block"))
}

api.RunCapabilityLoop(server)
return requestLogger(mux)
}

Expand Down
2 changes: 0 additions & 2 deletions etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"crypto/tls"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/pkg/capnslog"
"google.golang.org/grpc"
Expand Down Expand Up @@ -47,6 +46,5 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))

api.RunCapabilityLoop(s)
return grpcServer
}
3 changes: 2 additions & 1 deletion etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"path"
"time"

"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/pbutil"
Expand Down Expand Up @@ -86,7 +87,7 @@ func (a *applierV2store) Put(r *pb.Request) Response {
}
if r.Path == membership.StoreClusterVersionKey() {
if a.cluster != nil {
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability)
}
// return an empty response since there is no consumer.
return Response{}
Expand Down
6 changes: 4 additions & 2 deletions etcdserver/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,14 @@ func (c *RaftCluster) SetBackend(be backend.Backend) {
mustCreateBackendBuckets(c.be)
}

func (c *RaftCluster) Recover() {
func (c *RaftCluster) Recover(onSet func(*semver.Version)) {
c.Lock()
defer c.Unlock()

c.members, c.removed = membersFromStore(c.store)
c.version = clusterVersionFromStore(c.store)
mustDetectDowngrade(c.version)
onSet(c.version)

for _, m := range c.members {
plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id)
Expand Down Expand Up @@ -356,7 +357,7 @@ func (c *RaftCluster) Version() *semver.Version {
return semver.Must(semver.NewVersion(c.version.String()))
}

func (c *RaftCluster) SetVersion(ver *semver.Version) {
func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*semver.Version)) {
c.Lock()
defer c.Unlock()
if c.version != nil {
Expand All @@ -372,6 +373,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version) {
if c.be != nil {
mustSaveClusterVersionToBackend(c.be, ver)
}
onSet(ver)
}

func (c *RaftCluster) IsReadyToAddNewMember() bool {
Expand Down
5 changes: 3 additions & 2 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/coreos/etcd/auth"
"github.com/coreos/etcd/compactor"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver/api"
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
Expand Down Expand Up @@ -342,7 +343,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
}
cl.SetStore(st)
cl.SetBackend(be)
cl.Recover()
cl.Recover(api.UpdateCapability)
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
Expand Down Expand Up @@ -705,7 +706,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {

s.cluster.SetBackend(s.be)
plog.Info("recovering cluster configuration...")
s.cluster.Recover()
s.cluster.Recover(api.UpdateCapability)
plog.Info("finished recovering cluster configuration")

plog.Info("removing old peers from network...")
Expand Down

0 comments on commit 13c2d32

Please sign in to comment.