Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove unnecessary triggerDiscovery calls. So we call it only in case… #4688

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type Client struct {
lastHeartbeat time.Time
heartbeatTTL time.Duration
haveHeartbeated bool
heartbeatLock sync.Mutex
heartbeatLock sync.RWMutex

// triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery
triggerDiscoveryCh chan struct{}
Expand Down Expand Up @@ -235,6 +235,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic

// Initialize the server manager
c.servers = servers.New(c.logger, c.shutdownCh, c)
go c.servers.Start()

// Initialize the client
if err := c.init(); err != nil {
Expand Down Expand Up @@ -546,8 +547,8 @@ func (c *Client) Shutdown() error {
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (c *Client) Stats() map[string]map[string]string {
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
c.heartbeatLock.RLock()
defer c.heartbeatLock.RUnlock()
stats := map[string]map[string]string{
"client": {
"node_id": c.NodeID(),
Expand Down Expand Up @@ -1231,13 +1232,16 @@ func (c *Client) registerAndHeartbeat() {
c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err)
heartbeat = time.After(intv)

// If heartbeating fails, trigger Consul discovery
c.triggerDiscovery()
// If heartbeating fails, trigger Consul discovery, only if all servers have failed state
if c.servers.IsAllServersFail() {
c.logger.Printf("[WARN] all known servers are failed so triggered Discovery")
c.triggerDiscovery()
}
}
} else {
c.heartbeatLock.Lock()
c.heartbeatLock.RLock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
c.heartbeatLock.RUnlock()
}
}
}
Expand Down Expand Up @@ -1463,7 +1467,6 @@ func (c *Client) updateNodeStatus() error {
}
var resp structs.NodeUpdateResponse
if err := c.RPC("Node.UpdateStatus", &req, &resp); err != nil {
c.triggerDiscovery()
return fmt.Errorf("failed to update status: %v", err)
}
end := time.Now()
Expand Down Expand Up @@ -2104,13 +2107,6 @@ func (c *Client) consulDiscovery() {
}

func (c *Client) consulDiscoveryImpl() error {
// Acquire heartbeat lock to prevent heartbeat from running
// concurrently with discovery. Concurrent execution is safe, however
// discovery is usually triggered when heartbeating has failed so
// there's no point in allowing it.
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()

dcs, err := c.consulCatalog.Datacenters()
if err != nil {
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
Expand Down
6 changes: 5 additions & 1 deletion client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ TRY:
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)
if rpcErr == nil {
c.fireRpcRetryWatcher()
c.servers.NotifySuccessServer(server)
return nil
}

// Move off to another server, and see if we can retry.
c.logger.Printf("[ERR] nomad: %q RPC failed to server %s: %v", method, server.Addr, rpcErr)
c.servers.NotifyFailedServer(server)
if !strings.Contains(rpcErr.Error(), "rpc error: rpc error:") {
c.servers.NotifyFailedServer(server)
}

if retry := canRetry(args, rpcErr); !retry {
return rpcErr
}
Expand Down
109 changes: 77 additions & 32 deletions client/servers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,15 @@ type Server struct {
// Addr is the resolved address of the server
Addr net.Addr
addr string
sync.Mutex

// DC is the datacenter of the server
DC string
failed bool

sync.RWMutex
}

func (s *Server) Copy() *Server {
s.Lock()
defer s.Unlock()

return &Server{
Addr: s.Addr,
addr: s.addr,
Expand All @@ -65,16 +64,14 @@ func (s *Server) Copy() *Server {
}

func (s *Server) String() string {
s.Lock()
defer s.Unlock()

if s.addr == "" {
s.addr = s.Addr.String()
}

return s.addr
}


func (s *Server) Equal(o *Server) bool {
if s == nil && o == nil {
return true
Expand All @@ -85,12 +82,26 @@ func (s *Server) Equal(o *Server) bool {
return s.Addr.String() == o.Addr.String() && s.DC == o.DC
}

func (s *Server) Fail(failed bool) {
s.Lock()
defer s.Unlock()

s.failed = failed
}

func (s *Server) Failed() bool {
s.RLock()
defer s.RUnlock()

return s.failed
}

type Servers []*Server

func (s Servers) String() string {
addrs := make([]string, 0, len(s))
for _, srv := range s {
addrs = append(addrs, srv.String())
addrs = append(addrs, srv.String() + "(DC: " + srv.DC + ")")
}
return strings.Join(addrs, ",")
}
Expand All @@ -111,10 +122,9 @@ func (s Servers) cycle() {

// shuffle shuffles the server list in place
func (s Servers) shuffle() {
for i := len(s) - 1; i > 0; i-- {
j := rand.Int31n(int32(i + 1))
rand.Shuffle(len(s), func(i, j int) {
s[i], s[j] = s[j], s[i]
}
})
}

func (s Servers) Sort() {
Expand All @@ -134,8 +144,8 @@ func (s Servers) Equal(o Servers) bool {
return false
}

for i, v := range s {
if !v.Equal(o[i]) {
for i := range s {
if !s[i].Equal(o[i]) {
return false
}
}
Expand Down Expand Up @@ -164,7 +174,7 @@ type Manager struct {
// pool. Pinger is an interface that wraps client.ConnPool.
connPoolPinger Pinger

sync.Mutex
sync.RWMutex
}

// New is the only way to safely create a new Manager struct.
Expand Down Expand Up @@ -202,23 +212,27 @@ func (m *Manager) SetServers(servers Servers) bool {

// Sort both the existing and incoming servers
servers.Sort()
m.servers.Sort()
l_servers := m.getServers()
l_servers.Sort()

// Determine if they are equal
equal := servers.Equal(m.servers)
equal := servers.Equal(l_servers)

// Randomize the incoming servers
servers.shuffle()
m.servers = servers
if !equal {
m.logger.Printf("[INFO] manager: setting new servers list: %s", servers.String())
// Randomize the incoming servers
servers.shuffle()
m.servers = servers
}

return !equal
}

// FindServer returns a server to send an RPC too. If there are no servers, nil
// is returned.
func (m *Manager) FindServer() *Server {
m.Lock()
defer m.Unlock()
m.RLock()
defer m.RUnlock()

if len(m.servers) == 0 {
m.logger.Printf("[WARN] manager: No servers available")
Expand All @@ -234,8 +248,8 @@ func (m *Manager) FindServer() *Server {

// NumNodes returns the number of approximate nodes in the cluster.
func (m *Manager) NumNodes() int32 {
m.Lock()
defer m.Unlock()
m.RLock()
defer m.RUnlock()
return m.numNodes
}

Expand All @@ -256,22 +270,45 @@ func (m *Manager) NotifyFailedServer(s *Server) {
// this is a noop. If, however, the server is failed and first on
// the list, move the server to the end of the list.
if len(m.servers) > 1 && m.servers[0].Equal(s) {
m.servers[0].Fail(true)
m.servers.cycle()
}
}

func (m *Manager) NotifySuccessServer(s *Server) {
m.RLock()
defer m.RUnlock()

if len(m.servers) > 1 && m.servers[0].Equal(s) {
if m.servers[0].Failed() {
m.servers[0].Fail(false)
}
}
}


// NumServers returns the total number of known servers whether healthy or not.
func (m *Manager) NumServers() int {
m.Lock()
defer m.Unlock()
m.RLock()
defer m.RUnlock()
return len(m.servers)
}

// GetServers returns a copy of the current list of servers.
func (m *Manager) GetServers() Servers {
m.Lock()
defer m.Unlock()
func (m *Manager) IsAllServersFail() bool {
m.RLock()
defer m.RUnlock()

for l_i := range m.servers {
if !m.servers[l_i].Failed() {
return false
}
}

return true
}

// GetServers returns a copy of the current list of servers.
func (m *Manager) getServers() Servers {
copy := make([]*Server, 0, len(m.servers))
for _, s := range m.servers {
copy = append(copy, s.Copy())
Expand All @@ -280,6 +317,13 @@ func (m *Manager) GetServers() Servers {
return copy
}

func (m *Manager) GetServers() Servers {
m.RLock()
defer m.RUnlock()

return m.getServers()
}

// RebalanceServers shuffles the order in which Servers will be contacted. The
// function will shuffle the set of potential servers to contact and then attempt
// to contact each server. If a server successfully responds it is used, otherwise
Expand All @@ -293,23 +337,24 @@ func (m *Manager) RebalanceServers() {
// healthy server. NOTE: Do not iterate on the list directly because
// this loop mutates the server list in-place.
var foundHealthyServer bool
for i := 0; i < len(m.servers); i++ {
for i := 0; i < len(servers); i++ {
// Always test the first server. Failed servers are cycled
// while Serf detects the node has failed.
srv := servers[0]

err := m.connPoolPinger.Ping(srv.Addr)
if err == nil {
srv.Fail(false)
foundHealthyServer = true
break
}
m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, srv, err)
m.logger.Printf(`[WARN] manager: pinging server "%s" failed: %s`, srv, err)

servers.cycle()
}

if !foundHealthyServer {
m.logger.Printf("[DEBUG] manager: No healthy servers during rebalance")
m.logger.Printf("[ERR] manager: No healthy servers during rebalance")
return
}

Expand Down