From 64ac9b9359baae0cc9d6e2495487d5700c508a0a Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 21 Sep 2016 17:06:52 -0700 Subject: [PATCH 01/14] Retry all servers on RPC call failure rpcproxy is refactored into serverlist which prioritizes good servers over servers in a remote DC or who have had a failure. Registration, heartbeating, and alloc status updating will retry faster when new servers are discovered. Consul discovery will be retried more quickly when no servers are available (eg on startup or an outage). --- client/client.go | 324 ++++++---- client/rpcproxy/rpcproxy.go | 779 ---------------------- client/rpcproxy/rpcproxy_test.go | 818 ------------------------ client/rpcproxy/server_endpoint.go | 84 --- client/rpcproxy/server_endpoint_test.go | 77 --- client/serverlist.go | 111 ++++ client/serverlist_test.go | 117 ++++ command/agent/agent_endpoint.go | 13 +- nomad/pool.go | 5 +- 9 files changed, 428 insertions(+), 1900 deletions(-) delete mode 100644 client/rpcproxy/rpcproxy.go delete mode 100644 client/rpcproxy/rpcproxy_test.go delete mode 100644 client/rpcproxy/server_endpoint.go delete mode 100644 client/rpcproxy/server_endpoint_test.go create mode 100644 client/serverlist.go create mode 100644 client/serverlist_test.go diff --git a/client/client.go b/client/client.go index 58b1e8e3426..918aa1ec147 100644 --- a/client/client.go +++ b/client/client.go @@ -1,6 +1,7 @@ package client import ( + "errors" "fmt" "io/ioutil" "log" @@ -10,7 +11,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/armon/go-metrics" @@ -21,7 +21,6 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/fingerprint" - "github.com/hashicorp/nomad/client/rpcproxy" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/command/agent/consul" @@ -105,26 +104,23 @@ type Client struct { logger *log.Logger - rpcProxy *rpcproxy.RPCProxy - connPool *nomad.ConnPool - // lastHeartbeatFromQuorum is an atomic int32 acting as a bool. When - // true, the last heartbeat message had a leader. When false (0), - // the last heartbeat did not include the RPC address of the leader, - // indicating that the server is in the minority or middle of an - // election. - lastHeartbeatFromQuorum int32 - - // consulPullHeartbeatDeadline is the deadline at which this Nomad - // Agent will begin polling Consul for a list of Nomad Servers. When - // Nomad Clients are heartbeating successfully with Nomad Servers, - // Nomad Clients do not poll Consul to populate their backup server - // list. - consulPullHeartbeatDeadline time.Time - lastHeartbeat time.Time - heartbeatTTL time.Duration - heartbeatLock sync.Mutex + // servers is the (optionally prioritized) list of nomad servers + servers *serverlist + + // consulDiscoverNext is the deadline at which this Nomad Agent will + // poll Consul for a list of Nomad Servers. When Nomad Clients are + // heartbeating successfully with Nomad Servers, Nomad Clients do not + // poll Consul to populate their server list. + consulDiscoverNext time.Time + lastHeartbeat time.Time + heartbeatTTL time.Duration + heartbeatLock sync.Mutex + + // discovered will be ticked whenever consul discovery completes + // succesfully + discovered chan struct{} // allocs is the current set of allocations allocs map[string]*AllocRunner @@ -154,6 +150,8 @@ type Client struct { vaultClient vaultclient.VaultClient } +var noServers = errors.New("no servers") + // NewClient is used to create a new client from the given configuration func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Client, error) { // Create the client @@ -162,6 +160,8 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg consulSyncer: consulSyncer, start: time.Now(), connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), + servers: newServerList(), + discovered: make(chan struct{}), logger: logger, hostStatsCollector: stats.NewHostStatsCollector(), allocs: make(map[string]*AllocRunner), @@ -202,9 +202,10 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg // Create the RPC Proxy and bootstrap with the preconfigured list of // static servers c.configLock.RLock() - c.rpcProxy = rpcproxy.NewRPCProxy(c.logger, c.shutdownCh, c, c.connPool) - for _, serverAddr := range c.configCopy.Servers { - c.rpcProxy.AddPrimaryServer(serverAddr) + if len(c.configCopy.Servers) > 0 { + if err := c.SetServers(c.configCopy.Servers); err != nil { + logger.Printf("[WARN] None of the configured servers are valid: %v", err) + } } c.configLock.RUnlock() @@ -238,16 +239,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg // Start collecting stats go c.collectHostStats() - // Start the RPCProxy maintenance task. This task periodically - // shuffles the list of Nomad Server Endpoints this Client will use - // when communicating with Nomad Servers via RPC. This is done in - // order to prevent server fixation in stable Nomad clusters. This - // task actively populates the active list of Nomad Server Endpoints - // from information from the Nomad Client heartbeats. If a heartbeat - // times out and there are no Nomad servers available, this data is - // populated by periodically polling Consul, if available. - go c.rpcProxy.Run() - return c, nil } @@ -350,33 +341,39 @@ func (c *Client) Shutdown() error { return c.saveState() } -// RPC is used to forward an RPC call to a nomad server, or fail if no servers +// RPC is used to forward an RPC call to a nomad server, or fail if no servers. func (c *Client) RPC(method string, args interface{}, reply interface{}) error { // Invoke the RPCHandler if it exists if c.config.RPCHandler != nil { return c.config.RPCHandler.RPC(method, args, reply) } - // Pick a server to request from - server := c.rpcProxy.FindServer() - if server == nil { - return fmt.Errorf("no known servers") + servers := c.servers.all() + if len(servers) == 0 { + return noServers } - // Make the RPC request - if err := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply); err != nil { - c.rpcProxy.NotifyFailedServer(server) - return fmt.Errorf("RPC failed to server %s: %v", server.Addr, err) + var mErr multierror.Error + for _, s := range servers { + // Make the RPC request + if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil { + errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err) + mErr.Errors = append(mErr.Errors, errmsg) + c.logger.Printf("[DEBUG] client: %v", errmsg) + c.servers.failed(s) + continue + } + c.servers.good(s) + return nil } - return nil + + // Force consul discovery ASAP since we have no healthy servers + return mErr.ErrorOrNil() } // Stats is used to return statistics for debugging and insight // for various sub-systems func (c *Client) Stats() map[string]map[string]string { - toString := func(v uint64) string { - return strconv.FormatUint(v, 10) - } c.allocLock.RLock() numAllocs := len(c.allocs) c.allocLock.RUnlock() @@ -386,8 +383,8 @@ func (c *Client) Stats() map[string]map[string]string { stats := map[string]map[string]string{ "client": map[string]string{ "node_id": c.Node().ID, - "known_servers": toString(uint64(c.rpcProxy.NumServers())), - "num_allocations": toString(uint64(numAllocs)), + "known_servers": strconv.Itoa(len(c.servers.all())), + "num_allocations": strconv.Itoa(numAllocs), "last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)), "heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL), }, @@ -438,10 +435,44 @@ func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) { return ar.ctx.AllocDir, nil } -// AddPrimaryServerToRPCProxy adds serverAddr to the RPC Proxy's primary -// server list. -func (c *Client) AddPrimaryServerToRPCProxy(serverAddr string) *rpcproxy.ServerEndpoint { - return c.rpcProxy.AddPrimaryServer(serverAddr) +// GetServers returns the list of nomad servers this client is aware of. +func (c *Client) GetServers() []string { + endpoints := c.servers.all() + res := make([]string, len(endpoints)) + for i := range endpoints { + res[i] = endpoints[i].addr.String() + } + return res +} + +// SetServers sets a new list of nomad servers to connect to. As long as one +// server is resolvable no error is returned. +func (c *Client) SetServers(servers []string) error { + endpoints := make([]*endpoint, 0, len(servers)) + var merr multierror.Error + for _, s := range servers { + addr, err := resolveServer(s) + if err != nil { + c.logger.Printf("[DEBUG] client: ignoring server %s due to resolution error: %v", s, err) + merr.Errors = append(merr.Errors, err) + continue + } + + // Valid endpoint, append it without a priority as this API + // doesn't support different priorities for different servers + endpoints = append(endpoints, &endpoint{name: s, addr: addr}) + } + + // Only return errors if no servers are valid + if len(endpoints) == 0 { + if len(merr.Errors) > 0 { + return merr.ErrorOrNil() + } + return noServers + } + + c.servers.set(endpoints) + return nil } // restoreState is used to restore our state from the data dir @@ -868,14 +899,18 @@ func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, u // retryRegisterNode is used to register the node or update the registration and // retry in case of failure. func (c *Client) retryRegisterNode() { - // Register the client for { - if err := c.registerNode(); err == nil { - break + err := c.registerNode() + if err == nil { + return + } + if err == noServers { + c.logger.Print("[DEBUG] client: registration waiting on servers") } else { - c.logger.Printf("[ERR] client: %v", err) + c.logger.Printf("[ERR] client: registration failure: %v", err) } select { + case <-c.discovered: case <-time.After(c.retryIntv(registerRetryIntv)): case <-c.shutdownCh: return @@ -903,7 +938,7 @@ func (c *Client) registerNode() error { node.Status = structs.NodeStatusReady c.configLock.Unlock() - c.logger.Printf("[DEBUG] client: node registration complete") + c.logger.Printf("[INFO] client: node registration complete") if len(resp.EvalIDs) != 0 { c.logger.Printf("[DEBUG] client: %d evaluations triggered by node registration", len(resp.EvalIDs)) } @@ -917,6 +952,13 @@ func (c *Client) registerNode() error { // updateNodeStatus is used to heartbeat and update the status of the node func (c *Client) updateNodeStatus() error { + c.heartbeatLock.Lock() + defer c.heartbeatLock.Unlock() + + // If anything goes wrong we want consul discovery to happen ASAP. The + // heartbeat lock keeps it from running concurrent with node updating. + c.consulDiscoverNext = time.Time{} + node := c.Node() req := structs.NodeUpdateStatusRequest{ NodeID: node.ID, @@ -934,14 +976,29 @@ func (c *Client) updateNodeStatus() error { c.logger.Printf("[DEBUG] client: state updated to %s", req.Status) } - c.heartbeatLock.Lock() - defer c.heartbeatLock.Unlock() + // Update heartbeat time and ttl c.lastHeartbeat = time.Now() c.heartbeatTTL = resp.HeartbeatTTL - if err := c.rpcProxy.RefreshServerLists(resp.Servers, resp.NumNodes, resp.LeaderRPCAddr); err != nil { - return err + // Convert []*NodeServerInfo to []*endpoints + localdc := c.Datacenter() + servers := make(endpoints, 0, len(resp.Servers)) + for _, s := range resp.Servers { + addr, err := resolveServer(s.RPCAdvertiseAddr) + if err != nil { + continue + } + e := endpoint{name: s.RPCAdvertiseAddr, addr: addr} + if s.Datacenter != localdc { + // server is non-local; de-prioritize + e.priority = 1 + } + servers = append(servers, &e) } + if len(servers) == 0 { + return fmt.Errorf("server returned no valid servers") + } + c.servers.set(servers) // Begin polling Consul if there is no Nomad leader. We could be // heartbeating to a Nomad server that is in the minority of a @@ -949,13 +1006,11 @@ func (c *Client) updateNodeStatus() error { // has connectivity to the existing majority of Nomad Servers, but // only if it queries Consul. if resp.LeaderRPCAddr == "" { - atomic.CompareAndSwapInt32(&c.lastHeartbeatFromQuorum, 1, 0) return nil } const heartbeatFallbackFactor = 3 - atomic.CompareAndSwapInt32(&c.lastHeartbeatFromQuorum, 0, 1) - c.consulPullHeartbeatDeadline = time.Now().Add(heartbeatFallbackFactor * resp.HeartbeatTTL) + c.consulDiscoverNext = time.Now().Add(heartbeatFallbackFactor * resp.HeartbeatTTL) return nil } @@ -1079,9 +1134,13 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { resp = structs.NodeClientAllocsResponse{} err := c.RPC("Node.GetClientAllocs", &req, &resp) if err != nil { - c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err) + if err != noServers { + c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err) + } retry := c.retryIntv(getAllocRetryIntv) select { + case <-c.discovered: + continue case <-time.After(retry): continue case <-c.shutdownCh: @@ -1402,23 +1461,15 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli // TODO(sean@): this could eventually be moved to a priority queue and give // each task an interval, but that is not necessary at this time. func (c *Client) setupConsulSyncer() error { - // The bootstrapFn callback handler is used to periodically poll - // Consul to look up the Nomad Servers in Consul. In the event the - // heartbeat deadline has been exceeded and this Client is orphaned - // from its servers, periodically poll Consul to reattach this Client - // to its cluster and automatically recover from a detached state. - bootstrapFn := func() error { - now := time.Now() + disco := func() error { c.heartbeatLock.Lock() + defer c.heartbeatLock.Unlock() - // If the last heartbeat didn't contain a leader, give the - // Nomad server this Agent is talking to one more attempt at - // providing a heartbeat that does contain a leader. - if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) { - c.heartbeatLock.Unlock() + // Don't run before the deadline. When bootstrapping is done + // and heartbeats are working this is the common path. + if time.Now().Before(c.consulDiscoverNext) { return nil } - c.heartbeatLock.Unlock() consulCatalog := c.consulSyncer.ConsulClient().Catalog() dcs, err := consulCatalog.Datacenters() @@ -1437,18 +1488,19 @@ func (c *Client) setupConsulSyncer() error { dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)] } - // Forward RPCs to our region - nomadRPCArgs := structs.GenericRequest{ + // Query for servers in this client's region only + region := c.Region() + rpcargs := structs.GenericRequest{ QueryOptions: structs.QueryOptions{ - Region: c.Region(), + Region: region, }, } - nomadServerServiceName := c.config.ConsulConfig.ServerServiceName + serviceName := c.configCopy.ConsulConfig.ServerServiceName var mErr multierror.Error - const defaultMaxNumNomadServers = 8 - nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) + var servers endpoints c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs) + DISCOLOOP: for _, dc := range dcs { consulOpts := &consulapi.QueryOptions{ AllowStale: true, @@ -1456,79 +1508,76 @@ func (c *Client) setupConsulSyncer() error { Near: "_agent", WaitTime: consul.DefaultQueryWaitDuration, } - consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagRPC, consulOpts) + consulServices, _, err := consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts) if err != nil { - mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", nomadServerServiceName, dc, err)) + mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err)) continue } for _, s := range consulServices { - port := strconv.FormatInt(int64(s.ServicePort), 10) - addr := s.ServiceAddress - if addr == "" { - addr = s.Address + port := strconv.Itoa(s.ServicePort) + addrstr := s.ServiceAddress + if addrstr == "" { + addrstr = s.Address } - serverAddr := net.JoinHostPort(addr, port) - serverEndpoint, err := rpcproxy.NewServerEndpoint(serverAddr) + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port)) if err != nil { mErr.Errors = append(mErr.Errors, err) continue } var peers []string - if err := c.connPool.RPC(c.Region(), serverEndpoint.Addr, c.RPCMajorVersion(), "Status.Peers", nomadRPCArgs, &peers); err != nil { + if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil { mErr.Errors = append(mErr.Errors, err) continue } + // Successfully received the Server peers list of the correct // region - if len(peers) != 0 { - nomadServerServices = append(nomadServerServices, peers...) - break + for _, p := range peers { + addr, err := net.ResolveTCPAddr("tcp", p) + if err != nil { + mErr.Errors = append(mErr.Errors, err) + } + servers = append(servers, &endpoint{name: p, addr: addr}) + } + if len(servers) > 0 { + break DISCOLOOP } - } - // Break if at least one Nomad Server was successfully pinged - if len(nomadServerServices) > 0 { - break } } - if len(nomadServerServices) == 0 { + if len(servers) == 0 { if len(mErr.Errors) > 0 { return mErr.ErrorOrNil() } - - return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %q", nomadServerServiceName, dcs) + return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %+q", serviceName, dcs) } - // Log the servers we are adding - c.logger.Printf("[INFO] client.consul: bootstrap adding following Servers: %q", nomadServerServices) + c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", servers) + c.servers.set(servers) - c.heartbeatLock.Lock() - if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) { - c.heartbeatLock.Unlock() - // Common, healthy path - if err := c.rpcProxy.SetBackupServers(nomadServerServices); err != nil { - return fmt.Errorf("client.consul: unable to set backup servers: %v", err) - } - } else { - c.heartbeatLock.Unlock() - // If this Client is talking with a Server that - // doesn't have a leader, and we have exceeded the - // consulPullHeartbeatDeadline, change the call from - // SetBackupServers() to calling AddPrimaryServer() - // in order to allow the Clients to randomly begin - // considering all known Nomad servers and - // eventually, hopefully, find their way to a Nomad - // Server that has quorum (assuming Consul has a - // server list that is in the majority). - for _, s := range nomadServerServices { - c.rpcProxy.AddPrimaryServer(s) + // Make sure registration is given ample opportunity to run + // before trying consul discovery again. + c.consulDiscoverNext = time.Now().Add(2 * registerRetryIntv) + + // Notify waiting rpc calls. Wait briefly in case initial rpc + // just failed but the calling goroutine isn't selecting on + // discovered yet. + const dur = 50 * time.Millisecond + timeout := time.NewTimer(dur) + for { + select { + case c.discovered <- struct{}{}: + if !timeout.Stop() { + <-timeout.C + } + timeout.Reset(dur) + case <-timeout.C: + return nil } } - - return nil } - if c.config.ConsulConfig.ClientAutoJoin { - c.consulSyncer.AddPeriodicHandler("Nomad Client Fallback Server Handler", bootstrapFn) + if c.configCopy.ConsulConfig.ClientAutoJoin { + c.consulSyncer.AddPeriodicHandler("Nomad Client Consul Server Discovery", disco) } consulServicesReaperFn := func() error { @@ -1617,7 +1666,18 @@ func (c *Client) emitStats(hStats *stats.HostStats) { } } -// RPCProxy returns the Client's RPCProxy instance -func (c *Client) RPCProxy() *rpcproxy.RPCProxy { - return c.rpcProxy +// resolveServer given a sever's address as a string, return it's resolved +// net.Addr or an error. +func resolveServer(s string) (net.Addr, error) { + const defaultClientPort = "4647" // default client RPC port + host, port, err := net.SplitHostPort(s) + if err != nil { + if strings.Contains(err.Error(), "missing port") { + host = s + port = defaultClientPort + } else { + return nil, err + } + } + return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port)) } diff --git a/client/rpcproxy/rpcproxy.go b/client/rpcproxy/rpcproxy.go deleted file mode 100644 index 0e8c7604127..00000000000 --- a/client/rpcproxy/rpcproxy.go +++ /dev/null @@ -1,779 +0,0 @@ -// Package rpcproxy provides a proxy interface to Nomad Servers. The -// RPCProxy periodically shuffles which server a Nomad Client communicates -// with in order to redistribute load across Nomad Servers. Nomad Servers -// that fail an RPC request are automatically cycled to the end of the list -// until the server list is reshuffled. -// -// The rpcproxy package does not provide any external API guarantees and -// should be called only by `hashicorp/nomad`. -package rpcproxy - -import ( - "fmt" - "log" - "math/rand" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/hashicorp/consul/lib" - "github.com/hashicorp/nomad/nomad/structs" -) - -const ( - // clientRPCJitterFraction determines the amount of jitter added to - // clientRPCMinReuseDuration before a connection is expired and a new - // connection is established in order to rebalance load across Nomad - // servers. The cluster-wide number of connections per second from - // rebalancing is applied after this jitter to ensure the CPU impact - // is always finite. See newRebalanceConnsPerSecPerServer's comment - // for additional commentary. - // - // For example, in a 10K Nomad cluster with 5x servers, this default - // averages out to ~13 new connections from rebalancing per server - // per second. - clientRPCJitterFraction = 2 - - // clientRPCMinReuseDuration controls the minimum amount of time RPC - // queries are sent over an established connection to a single server - clientRPCMinReuseDuration = 600 * time.Second - - // Limit the number of new connections a server receives per second - // for connection rebalancing. This limit caps the load caused by - // continual rebalancing efforts when a cluster is in equilibrium. A - // lower value comes at the cost of increased recovery time after a - // partition. This parameter begins to take effect when there are - // more than ~48K clients querying 5x servers or at lower server - // counts when there is a partition. - // - // For example, in a 100K Nomad cluster with 5x servers, it will take - // ~5min for all servers to rebalance their connections. If 99,995 - // agents are in the minority talking to only one server, it will - // take ~26min for all servers to rebalance. A 10K cluster in the - // same scenario will take ~2.6min to rebalance. - newRebalanceConnsPerSecPerServer = 64 - - // rpcAPIMismatchLogRate determines the rate at which log entries are - // emitted when the client and server's API versions are mismatched. - rpcAPIMismatchLogRate = 3 * time.Hour -) - -// NomadConfigInfo is an interface wrapper around this Nomad Agent's -// configuration to prevents a cyclic import dependency. -type NomadConfigInfo interface { - Datacenter() string - RPCMajorVersion() int - RPCMinorVersion() int - Region() string -} - -// Pinger is an interface wrapping client.ConnPool to prevent a -// cyclic import dependency -type Pinger interface { - PingNomadServer(region string, apiMajorVersion int, s *ServerEndpoint) (bool, error) -} - -// serverList is an array of Nomad Servers. The first server in the list is -// the active server. -// -// NOTE(sean@): We are explicitly relying on the fact that serverList will be -// copied onto the stack by atomic.Value. Please keep this structure light. -type serverList struct { - L []*ServerEndpoint -} - -// RPCProxy is the manager type responsible for returning and managing Nomad -// addresses. -type RPCProxy struct { - // activatedList manages the list of Nomad Servers that are eligible - // to be queried by the Client agent. - activatedList atomic.Value - activatedListLock sync.Mutex - - // primaryServers is a list of servers found in the last heartbeat. - // primaryServers are periodically reshuffled. Covered by - // serverListLock. - primaryServers serverList - - // backupServers is a list of fallback servers. These servers are - // appended to the RPCProxy's serverList, but are never shuffled with - // the list of servers discovered via the Nomad heartbeat. Covered - // by serverListLock. - backupServers serverList - - // serverListLock covers both backupServers and primaryServers. If - // it is necessary to hold serverListLock and listLock, obtain an - // exclusive lock on serverListLock before listLock. - serverListLock sync.RWMutex - - leaderAddr string - numNodes int - - // rebalanceTimer controls the duration of the rebalance interval - rebalanceTimer *time.Timer - - // shutdownCh is a copy of the channel in nomad.Client - shutdownCh chan struct{} - - logger *log.Logger - - configInfo NomadConfigInfo - - // rpcAPIMismatchThrottle regulates the rate at which warning - // messages are emitted in the event of an API mismatch between the - // clients and servers. - rpcAPIMismatchThrottle map[string]time.Time - - // connPoolPinger is used to test the health of a server in the - // connection pool. Pinger is an interface that wraps - // client.ConnPool. - connPoolPinger Pinger -} - -// NewRPCProxy is the only way to safely create a new RPCProxy. -func NewRPCProxy(logger *log.Logger, shutdownCh chan struct{}, configInfo NomadConfigInfo, connPoolPinger Pinger) *RPCProxy { - p := &RPCProxy{ - logger: logger, - configInfo: configInfo, // can't pass *nomad.Client: import cycle - connPoolPinger: connPoolPinger, // can't pass *nomad.ConnPool: import cycle - rebalanceTimer: time.NewTimer(clientRPCMinReuseDuration), - shutdownCh: shutdownCh, - } - - l := serverList{} - l.L = make([]*ServerEndpoint, 0) - p.saveServerList(l) - return p -} - -// activateEndpoint adds an endpoint to the RPCProxy's active serverList. -// Returns true if the server was added, returns false if the server already -// existed in the RPCProxy's serverList. -func (p *RPCProxy) activateEndpoint(s *ServerEndpoint) bool { - l := p.getServerList() - - // Check if this server is known - found := false - for idx, existing := range l.L { - if existing.Name == s.Name { - newServers := make([]*ServerEndpoint, len(l.L)) - copy(newServers, l.L) - - // Overwrite the existing server details in order to - // possibly update metadata (e.g. server version) - newServers[idx] = s - - l.L = newServers - found = true - break - } - } - - // Add to the list if not known - if !found { - newServers := make([]*ServerEndpoint, len(l.L), len(l.L)+1) - copy(newServers, l.L) - newServers = append(newServers, s) - l.L = newServers - } - - p.saveServerList(l) - - return !found -} - -// SetBackupServers sets a list of Nomad Servers to be used in the event that -// the Nomad Agent lost contact with the list of Nomad Servers provided via -// the Nomad Agent's heartbeat. If available, the backup servers are -// populated via Consul. -func (p *RPCProxy) SetBackupServers(addrs []string) error { - l := make([]*ServerEndpoint, 0, len(addrs)) - for _, s := range addrs { - s, err := NewServerEndpoint(s) - if err != nil { - p.logger.Printf("[WARN] client.rpcproxy: unable to create backup server %+q: %v", s, err) - return fmt.Errorf("unable to create new backup server from %+q: %v", s, err) - } - l = append(l, s) - } - - p.serverListLock.Lock() - p.backupServers.L = l - p.serverListLock.Unlock() - - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - for _, s := range l { - p.activateEndpoint(s) - } - - return nil -} - -// AddPrimaryServer takes the RPC address of a Nomad server, creates a new -// endpoint, and adds it to both the primaryServers list and the active -// serverList used in the RPC Proxy. If the endpoint is not known by the -// RPCProxy, appends the endpoint to the list. The new endpoint will begin -// seeing use after the rebalance timer fires (or enough servers fail -// organically). Any values in the primary server list are overridden by the -// next successful heartbeat. -func (p *RPCProxy) AddPrimaryServer(rpcAddr string) *ServerEndpoint { - s, err := NewServerEndpoint(rpcAddr) - if err != nil { - p.logger.Printf("[WARN] client.rpcproxy: unable to create new primary server from endpoint %+q: %v", rpcAddr, err) - return nil - } - - k := s.Key() - p.serverListLock.Lock() - if serverExists := p.primaryServers.serverExistByKey(k); serverExists { - p.serverListLock.Unlock() - return s - } - p.primaryServers.L = append(p.primaryServers.L, s) - p.serverListLock.Unlock() - - p.activatedListLock.Lock() - p.activateEndpoint(s) - p.activatedListLock.Unlock() - - return s -} - -// cycleServers returns a new list of servers that has dequeued the first -// server and enqueued it at the end of the list. cycleServers assumes the -// caller is holding the listLock. cycleServer does not test or ping -// the next server inline. cycleServer may be called when the environment -// has just entered an unhealthy situation and blocking on a server test is -// less desirable than just returning the next server in the firing line. If -// the next server fails, it will fail fast enough and cycleServer will be -// called again. -func (l *serverList) cycleServer() (servers []*ServerEndpoint) { - numServers := len(l.L) - if numServers < 2 { - return servers // No action required - } - - newServers := make([]*ServerEndpoint, 0, numServers) - newServers = append(newServers, l.L[1:]...) - newServers = append(newServers, l.L[0]) - - return newServers -} - -// serverExistByKey performs a search to see if a server exists in the -// serverList. Assumes the caller is holding at least a read lock. -func (l *serverList) serverExistByKey(targetKey *EndpointKey) bool { - var found bool - for _, server := range l.L { - if targetKey.Equal(server.Key()) { - found = true - } - } - return found -} - -// removeServerByKey performs an inline removal of the first matching server -func (l *serverList) removeServerByKey(targetKey *EndpointKey) { - for i, s := range l.L { - if targetKey.Equal(s.Key()) { - copy(l.L[i:], l.L[i+1:]) - l.L[len(l.L)-1] = nil - l.L = l.L[:len(l.L)-1] - return - } - } -} - -// shuffleServers shuffles the server list in place -func (l *serverList) shuffleServers() { - for i := len(l.L) - 1; i > 0; i-- { - j := rand.Int31n(int32(i + 1)) - l.L[i], l.L[j] = l.L[j], l.L[i] - } -} - -// String returns a string representation of serverList -func (l *serverList) String() string { - if len(l.L) == 0 { - return fmt.Sprintf("empty server list") - } - - serverStrs := make([]string, 0, len(l.L)) - for _, server := range l.L { - serverStrs = append(serverStrs, server.String()) - } - - return fmt.Sprintf("[%s]", strings.Join(serverStrs, ", ")) -} - -// FindServer takes out an internal "read lock" and searches through the list -// of servers to find a "healthy" server. If the server is actually -// unhealthy, we rely on heartbeats to detect this and remove the node from -// the server list. If the server at the front of the list has failed or -// fails during an RPC call, it is rotated to the end of the list. If there -// are no servers available, return nil. -func (p *RPCProxy) FindServer() *ServerEndpoint { - l := p.getServerList() - numServers := len(l.L) - if numServers == 0 { - p.logger.Printf("[WARN] client.rpcproxy: No servers available") - return nil - } - - // Return whatever is at the front of the list because it is - // assumed to be the oldest in the server list (unless - - // hypothetically - the server list was rotated right after a - // server was added). - return l.L[0] -} - -// getServerList is a convenience method which hides the locking semantics -// of atomic.Value from the caller. -func (p *RPCProxy) getServerList() serverList { - return p.activatedList.Load().(serverList) -} - -// saveServerList is a convenience method which hides the locking semantics -// of atomic.Value from the caller. -func (p *RPCProxy) saveServerList(l serverList) { - p.activatedList.Store(l) -} - -// LeaderAddr returns the current leader address. If an empty string, then -// the Nomad Server for this Nomad Agent is in the minority or the Nomad -// Servers are in the middle of an election. -func (p *RPCProxy) LeaderAddr() string { - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - return p.leaderAddr -} - -// NotifyFailedServer marks the passed in server as "failed" by rotating it -// to the end of the server list. -func (p *RPCProxy) NotifyFailedServer(s *ServerEndpoint) { - l := p.getServerList() - - // If the server being failed is not the first server on the list, - // this is a noop. If, however, the server is failed and first on - // the list, acquire the lock, retest, and take the penalty of moving - // the server to the end of the list. - - // Only rotate the server list when there is more than one server - if len(l.L) > 1 && l.L[0] == s { - // Grab a lock, retest, and take the hit of cycling the first - // server to the end. - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - l = p.getServerList() - - if len(l.L) > 1 && l.L[0] == s { - l.L = l.cycleServer() - p.saveServerList(l) - } - } -} - -// NumNodes returns the estimated number of nodes according to the last Nomad -// Heartbeat. -func (p *RPCProxy) NumNodes() int { - return p.numNodes -} - -// NumServers takes out an internal "read lock" and returns the number of -// servers. numServers includes both healthy and unhealthy servers. -func (p *RPCProxy) NumServers() int { - l := p.getServerList() - return len(l.L) -} - -// RebalanceServers shuffles the list of servers on this agent. The server -// at the front of the list is selected for the next RPC. RPC calls that -// fail for a particular server are rotated to the end of the list. This -// method reshuffles the list periodically in order to redistribute work -// across all known Nomad servers (i.e. guarantee that the order of servers -// in the server list is not positively correlated with the age of a server -// in the Nomad cluster). Periodically shuffling the server list prevents -// long-lived clients from fixating on long-lived servers. -// -// Unhealthy servers are removed from the server list during the next client -// heartbeat. Before the newly shuffled server list is saved, the new remote -// endpoint is tested to ensure its responsive. -func (p *RPCProxy) RebalanceServers() { - var serverListLocked bool - p.serverListLock.Lock() - serverListLocked = true - defer func() { - if serverListLocked { - p.serverListLock.Unlock() - } - }() - - // Early abort if there is nothing to shuffle - if (len(p.primaryServers.L) + len(p.backupServers.L)) < 2 { - return - } - - // Shuffle server lists independently - p.primaryServers.shuffleServers() - p.backupServers.shuffleServers() - - // Create a new merged serverList - type targetServer struct { - server *ServerEndpoint - // 'p' == Primary Server - // 's' == Secondary/Backup Server - // 'b' == Both - state byte - } - mergedList := make(map[EndpointKey]*targetServer, len(p.primaryServers.L)+len(p.backupServers.L)) - for _, s := range p.primaryServers.L { - mergedList[*s.Key()] = &targetServer{server: s, state: 'p'} - } - for _, s := range p.backupServers.L { - k := s.Key() - _, found := mergedList[*k] - if found { - mergedList[*k].state = 'b' - } else { - mergedList[*k] = &targetServer{server: s, state: 's'} - } - } - - l := &serverList{L: make([]*ServerEndpoint, 0, len(mergedList))} - for _, s := range p.primaryServers.L { - l.L = append(l.L, s) - } - for _, v := range mergedList { - if v.state != 's' { - continue - } - l.L = append(l.L, v.server) - } - - // Release the lock before we begin transition to operations on the - // network timescale and attempt to ping servers. A copy of the - // servers has been made at this point. - p.serverListLock.Unlock() - serverListLocked = false - - // Iterate through the shuffled server list to find an assumed - // healthy server. NOTE: Do not iterate on the list directly because - // this loop mutates the server list in-place. - var foundHealthyServer bool - for i := 0; i < len(l.L); i++ { - // Always test the first server. Failed servers are cycled - // and eventually removed from the list when Nomad heartbeats - // detect the failed node. - selectedServer := l.L[0] - - ok, err := p.connPoolPinger.PingNomadServer(p.configInfo.Region(), p.configInfo.RPCMajorVersion(), selectedServer) - if ok { - foundHealthyServer = true - break - } - p.logger.Printf(`[DEBUG] client.rpcproxy: pinging server "%s" failed: %s`, selectedServer.String(), err) - - l.cycleServer() - } - - // If no healthy servers were found, sleep and wait for the admin to - // join this node to a server and begin receiving heartbeats with an - // updated list of Nomad servers. Or Consul will begin advertising a - // new server in the nomad service (Nomad server service). - if !foundHealthyServer { - p.logger.Printf("[DEBUG] client.rpcproxy: No healthy servers during rebalance, aborting") - return - } - - // Verify that all servers are present. Reconcile will save the - // final serverList. - if p.reconcileServerList(l) { - p.logger.Printf("[TRACE] client.rpcproxy: Rebalanced %d servers, next active server is %s", len(l.L), l.L[0].String()) - } else { - // reconcileServerList failed because Nomad removed the - // server that was at the front of the list that had - // successfully been Ping'ed. Between the Ping and - // reconcile, a Nomad heartbeat removed the node. - // - // Instead of doing any heroics, "freeze in place" and - // continue to use the existing connection until the next - // rebalance occurs. - } - - return -} - -// reconcileServerList returns true when the first server in serverList -// (l) exists in the receiver's serverList (p). If true, the merged -// serverList (l) is stored as the receiver's serverList (p). Returns -// false if the first server in p does not exist in the passed in list (l) -// (i.e. was removed by Nomad during a PingNomadServer() call. Newly added -// servers are appended to the list and other missing servers are removed -// from the list. -func (p *RPCProxy) reconcileServerList(l *serverList) bool { - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - - // newServerList is a serverList that has been kept up-to-date with - // join and leave events. - newServerList := p.getServerList() - - // If a Nomad heartbeat removed all nodes, or there is no selected - // server (zero nodes in serverList), abort early. - if len(newServerList.L) == 0 || len(l.L) == 0 { - return false - } - - type targetServer struct { - server *ServerEndpoint - - // 'b' == both - // 'o' == original - // 'n' == new - state byte - } - mergedList := make(map[EndpointKey]*targetServer, len(l.L)) - for _, s := range l.L { - mergedList[*s.Key()] = &targetServer{server: s, state: 'o'} - } - for _, s := range newServerList.L { - k := s.Key() - _, found := mergedList[*k] - if found { - mergedList[*k].state = 'b' - } else { - mergedList[*k] = &targetServer{server: s, state: 'n'} - } - } - - // Ensure the selected server has not been removed by a heartbeat - selectedServerKey := l.L[0].Key() - if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' { - return false - } - - // Append any new servers and remove any old servers - for k, v := range mergedList { - switch v.state { - case 'b': - // Do nothing, server exists in both - case 'o': - // Server has been removed - l.removeServerByKey(&k) - case 'n': - // Server added - l.L = append(l.L, v.server) - default: - panic("unknown merge list state") - } - } - - p.saveServerList(*l) - return true -} - -// RemoveServer takes out an internal write lock and removes a server from -// the activated server list. -func (p *RPCProxy) RemoveServer(s *ServerEndpoint) { - // Lock hierarchy protocol dictates serverListLock is acquired first. - p.serverListLock.Lock() - defer p.serverListLock.Unlock() - - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - l := p.getServerList() - - k := s.Key() - l.removeServerByKey(k) - p.saveServerList(l) - - p.primaryServers.removeServerByKey(k) - p.backupServers.removeServerByKey(k) -} - -// refreshServerRebalanceTimer is only called once p.rebalanceTimer expires. -func (p *RPCProxy) refreshServerRebalanceTimer() time.Duration { - l := p.getServerList() - numServers := len(l.L) - // Limit this connection's life based on the size (and health) of the - // cluster. Never rebalance a connection more frequently than - // connReuseLowWatermarkDuration, and make sure we never exceed - // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. - clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer) - connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) - numLANMembers := p.numNodes - connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) - - p.rebalanceTimer.Reset(connRebalanceTimeout) - return connRebalanceTimeout -} - -// ResetRebalanceTimer resets the rebalance timer. This method exists for -// testing and should not be used directly. -func (p *RPCProxy) ResetRebalanceTimer() { - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - p.rebalanceTimer.Reset(clientRPCMinReuseDuration) -} - -// ServerRPCAddrs returns one RPC Address per server -func (p *RPCProxy) ServerRPCAddrs() []string { - l := p.getServerList() - serverAddrs := make([]string, 0, len(l.L)) - for _, s := range l.L { - serverAddrs = append(serverAddrs, s.Addr.String()) - } - return serverAddrs -} - -// Run is used to start and manage the task of automatically shuffling and -// rebalancing the list of Nomad servers. This maintenance only happens -// periodically based on the expiration of the timer. Failed servers are -// automatically cycled to the end of the list. New servers are appended to -// the list. The order of the server list must be shuffled periodically to -// distribute load across all known and available Nomad servers. -func (p *RPCProxy) Run() { - for { - select { - case <-p.rebalanceTimer.C: - p.RebalanceServers() - - p.refreshServerRebalanceTimer() - case <-p.shutdownCh: - p.logger.Printf("[INFO] client.rpcproxy: shutting down") - return - } - } -} - -// RefreshServerLists is called when the Client receives an update from a -// Nomad Server. The response from Nomad Client Heartbeats contain a list of -// Nomad Servers that the Nomad Client should use for RPC requests. -// RefreshServerLists does not rebalance its serverLists (that is handled -// elsewhere via a periodic timer). New Nomad Servers learned via the -// heartbeat are appended to the RPCProxy's activated serverList. Servers -// that are no longer present in the Heartbeat are removed immediately from -// all server lists. Nomad Servers speaking a newer major or minor API -// version are filtered from the serverList. -func (p *RPCProxy) RefreshServerLists(servers []*structs.NodeServerInfo, numNodes int32, leaderRPCAddr string) error { - // Merge all servers found in the response. Servers in the response - // with newer API versions are filtered from the list. If the list - // is missing an address found in the RPCProxy's server list, remove - // it from the RPCProxy. - - p.serverListLock.Lock() - defer p.serverListLock.Unlock() - - // Clear the backup server list when a heartbeat contains at least - // one server. - if len(servers) > 0 && len(p.backupServers.L) > 0 { - p.backupServers.L = make([]*ServerEndpoint, 0, len(servers)) - } - - // 1) Create a map to reconcile the difference between - // p.primaryServers and servers. - type targetServer struct { - server *ServerEndpoint - - // 'b' == both - // 'o' == original - // 'n' == new - state byte - } - mergedPrimaryMap := make(map[EndpointKey]*targetServer, len(p.primaryServers.L)+len(servers)) - numOldServers := 0 - for _, s := range p.primaryServers.L { - mergedPrimaryMap[*s.Key()] = &targetServer{server: s, state: 'o'} - numOldServers++ - } - numBothServers := 0 - var newServers bool - for _, s := range servers { - // Filter out servers using a newer API version. Prevent - // spamming the logs every heartbeat. - // - // TODO(sean@): Move the logging throttle logic into a - // dedicated logging package so RPCProxy does not have to - // perform this accounting. - if int32(p.configInfo.RPCMajorVersion()) < s.RPCMajorVersion || - (int32(p.configInfo.RPCMajorVersion()) == s.RPCMajorVersion && - int32(p.configInfo.RPCMinorVersion()) < s.RPCMinorVersion) { - now := time.Now() - t, ok := p.rpcAPIMismatchThrottle[s.RPCAdvertiseAddr] - if ok && t.After(now) { - continue - } - - p.logger.Printf("[WARN] client.rpcproxy: API mismatch between client version (v%d.%d) and server version (v%d.%d), ignoring server %+q", p.configInfo.RPCMajorVersion(), p.configInfo.RPCMinorVersion(), s.RPCMajorVersion, s.RPCMinorVersion, s.RPCAdvertiseAddr) - p.rpcAPIMismatchThrottle[s.RPCAdvertiseAddr] = now.Add(rpcAPIMismatchLogRate) - continue - } - - server, err := NewServerEndpoint(s.RPCAdvertiseAddr) - if err != nil { - p.logger.Printf("[WARN] client.rpcproxy: Unable to create a server from %+q: %v", s.RPCAdvertiseAddr, err) - continue - } - - // Nomad servers in different datacenters are automatically - // added to the backup server list. - if s.Datacenter != p.configInfo.Datacenter() { - p.backupServers.L = append(p.backupServers.L, server) - continue - } - - k := server.Key() - _, found := mergedPrimaryMap[*k] - if found { - mergedPrimaryMap[*k].state = 'b' - numBothServers++ - } else { - mergedPrimaryMap[*k] = &targetServer{server: server, state: 'n'} - newServers = true - } - } - - // Short-circuit acquiring listLock if nothing changed - if !newServers && numOldServers == numBothServers { - return nil - } - - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - newServerCfg := p.getServerList() - for k, v := range mergedPrimaryMap { - switch v.state { - case 'b': - // Do nothing, server exists in both - case 'o': - // Server has been removed - - // TODO(sean@): Teach Nomad servers how to remove - // themselves from their heartbeat in order to - // gracefully drain their clients over the next - // cluster's max rebalanceTimer duration. Without - // this enhancement, if a server being shutdown and - // it is the first in serverList, the client will - // fail its next RPC connection. - p.primaryServers.removeServerByKey(&k) - newServerCfg.removeServerByKey(&k) - case 'n': - // Server added. Append it to both lists - // immediately. The server should only go into - // active use in the event of a failure or after a - // rebalance occurs. - p.primaryServers.L = append(p.primaryServers.L, v.server) - newServerCfg.L = append(newServerCfg.L, v.server) - default: - panic("unknown merge list state") - } - } - - p.numNodes = int(numNodes) - p.leaderAddr = leaderRPCAddr - p.saveServerList(newServerCfg) - - return nil -} diff --git a/client/rpcproxy/rpcproxy_test.go b/client/rpcproxy/rpcproxy_test.go deleted file mode 100644 index c6b7327e62a..00000000000 --- a/client/rpcproxy/rpcproxy_test.go +++ /dev/null @@ -1,818 +0,0 @@ -package rpcproxy - -import ( - "bytes" - "encoding/binary" - "fmt" - "log" - "math/rand" - "net" - "os" - "strings" - "sync/atomic" - "testing" - "time" -) - -const ( - ipv4len = 4 - nodeNameFmt = "s%03d" - defaultNomadPort = "4647" - - // Poached from RFC2544 and RFC3330 - testingNetworkCidr = "198.18.0.0/15" - testingNetworkUint32 = 3323068416 -) - -var ( - localLogger *log.Logger - localLogBuffer *bytes.Buffer - serverCount uint32 - validIp uint32 -) - -func init() { - localLogBuffer = new(bytes.Buffer) - localLogger = log.New(localLogBuffer, "", 0) -} - -func makeServerEndpointName() string { - serverNum := atomic.AddUint32(&serverCount, 1) - validIp := testingNetworkUint32 + serverNum - ipv4 := make(net.IP, ipv4len) - binary.BigEndian.PutUint32(ipv4, validIp) - return net.JoinHostPort(ipv4.String(), defaultNomadPort) -} - -func GetBufferedLogger() *log.Logger { - return localLogger -} - -type fauxConnPool struct { - // failPct between 0.0 and 1.0 == pct of time a Ping should fail - failPct float64 -} - -func (cp *fauxConnPool) PingNomadServer(region string, majorVersion int, s *ServerEndpoint) (bool, error) { - var success bool - successProb := rand.Float64() - if successProb > cp.failPct { - success = true - } - return success, nil -} - -type fauxSerf struct { - datacenter string - numNodes int - region string - rpcMinorVersion int - rpcMajorVersion int -} - -func (s *fauxSerf) NumNodes() int { - return s.numNodes -} - -func (s *fauxSerf) Region() string { - return s.region -} - -func (s *fauxSerf) Datacenter() string { - return s.datacenter -} - -func (s *fauxSerf) RPCMajorVersion() int { - return s.rpcMajorVersion -} - -func (s *fauxSerf) RPCMinorVersion() int { - return s.rpcMinorVersion -} - -func testRPCProxy() (p *RPCProxy) { - logger := GetBufferedLogger() - logger = log.New(os.Stderr, "", log.LstdFlags) - shutdownCh := make(chan struct{}) - p = NewRPCProxy(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}) - return p -} - -func testRPCProxyFailProb(failPct float64) (p *RPCProxy) { - logger := GetBufferedLogger() - logger = log.New(os.Stderr, "", log.LstdFlags) - shutdownCh := make(chan struct{}) - p = NewRPCProxy(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) - return p -} - -// func (p *RPCProxy) AddPrimaryServer(server *ServerEndpoint) { -func TestRPCProxy_AddPrimaryServer(t *testing.T) { - p := testRPCProxy() - var num int - num = p.NumServers() - if num != 0 { - t.Fatalf("Expected zero servers to start") - } - - s1Endpoint := makeServerEndpointName() - s1 := p.AddPrimaryServer(s1Endpoint) - num = p.NumServers() - if num != 1 { - t.Fatalf("Expected one server") - } - if s1 == nil { - t.Fatalf("bad") - } - if s1.Name != s1Endpoint { - t.Fatalf("bad") - } - - s1 = p.AddPrimaryServer(s1Endpoint) - num = p.NumServers() - if num != 1 { - t.Fatalf("Expected one server (still)") - } - if s1 == nil { - t.Fatalf("bad") - } - if s1.Name != s1Endpoint { - t.Fatalf("bad") - } - - s2Endpoint := makeServerEndpointName() - s2 := p.AddPrimaryServer(s2Endpoint) - num = p.NumServers() - if num != 2 { - t.Fatalf("Expected two servers") - } - if s2 == nil { - t.Fatalf("bad") - } - if s2.Name != s2Endpoint { - t.Fatalf("bad") - } -} - -// func (p *RPCProxy) FindServer() (server *ServerEndpoint) { -func TestRPCProxy_FindServer(t *testing.T) { - p := testRPCProxy() - - if p.FindServer() != nil { - t.Fatalf("Expected nil return") - } - - s1Endpoint := makeServerEndpointName() - p.AddPrimaryServer(s1Endpoint) - if p.NumServers() != 1 { - t.Fatalf("Expected one server") - } - - s1 := p.FindServer() - if s1 == nil { - t.Fatalf("Expected non-nil server") - } - if s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server") - } - - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server (still)") - } - - s2Endpoint := makeServerEndpointName() - p.AddPrimaryServer(s2Endpoint) - if p.NumServers() != 2 { - t.Fatalf("Expected two servers") - } - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server (still)") - } - - p.NotifyFailedServer(s1) - s2 := p.FindServer() - if s2 == nil || s2.Name != s2Endpoint { - t.Fatalf("Expected s2 server") - } - - p.NotifyFailedServer(s2) - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server") - } -} - -// func New(logger *log.Logger, shutdownCh chan struct{}) (p *RPCProxy) { -func TestRPCProxy_New(t *testing.T) { - logger := GetBufferedLogger() - logger = log.New(os.Stderr, "", log.LstdFlags) - shutdownCh := make(chan struct{}) - p := NewRPCProxy(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) - if p == nil { - t.Fatalf("RPCProxy nil") - } -} - -// func (p *RPCProxy) NotifyFailedServer(server *ServerEndpoint) { -func TestRPCProxy_NotifyFailedServer(t *testing.T) { - p := testRPCProxy() - - if p.NumServers() != 0 { - t.Fatalf("Expected zero servers to start") - } - - // Try notifying for a server that is not managed by RPCProxy - s1Endpoint := makeServerEndpointName() - s1 := p.AddPrimaryServer(s1Endpoint) - if s1 == nil { - t.Fatalf("bad") - } - if p.NumServers() != 1 { - t.Fatalf("bad") - } - p.RemoveServer(s1) - if p.NumServers() != 0 { - t.Fatalf("bad") - } - p.NotifyFailedServer(s1) - s1 = p.AddPrimaryServer(s1Endpoint) - - // Test again w/ a server not in the list - s2Endpoint := makeServerEndpointName() - s2 := p.AddPrimaryServer(s2Endpoint) - if s2 == nil { - t.Fatalf("bad") - } - if p.NumServers() != 2 { - t.Fatalf("bad") - } - p.RemoveServer(s2) - if p.NumServers() != 1 { - t.Fatalf("bad") - } - p.NotifyFailedServer(s2) - if p.NumServers() != 1 { - t.Fatalf("Expected one server") - } - - // Re-add s2 so there are two servers in the RPCProxy server list - s2 = p.AddPrimaryServer(s2Endpoint) - if p.NumServers() != 2 { - t.Fatalf("Expected two servers") - } - - // Find the first server, it should be s1 - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server") - } - - // Notify s2 as failed, s1 should still be first - p.NotifyFailedServer(s2) - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server (still)") - } - - // Fail s1, s2 should be first - p.NotifyFailedServer(s1) - s2 = p.FindServer() - if s2 == nil || s2.Name != s2Endpoint { - t.Fatalf("Expected s2 server") - } - - // Fail s2, s1 should be first - p.NotifyFailedServer(s2) - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server") - } -} - -// func (p *RPCProxy) NumServers() (numServers int) { -func TestRPCProxy_NumServers(t *testing.T) { - p := testRPCProxy() - const maxNumServers = 100 - serverList := make([]*ServerEndpoint, 0, maxNumServers) - - // Add some servers - for i := 0; i < maxNumServers; i++ { - num := p.NumServers() - if num != i { - t.Fatalf("%d: Expected %d servers", i, num) - } - serverName := makeServerEndpointName() - s := p.AddPrimaryServer(serverName) - if s == nil { - t.Fatalf("Expected server from %+q", serverName) - } - serverList = append(serverList, s) - - num = p.NumServers() - if num != i+1 { - t.Fatalf("%d: Expected %d servers", i, num+1) - } - } - - // Remove some servers - for i := maxNumServers; i > 0; i-- { - num := p.NumServers() - if num != i { - t.Fatalf("%d: Expected %d servers", i, num) - } - p.RemoveServer(serverList[i-1]) - num = p.NumServers() - if num != i-1 { - t.Fatalf("%d: Expected %d servers", i, num-1) - } - } -} - -// func (p *RPCProxy) RebalanceServers() { -func TestRPCProxy_RebalanceServers(t *testing.T) { - const failPct = 0.5 - p := testRPCProxyFailProb(failPct) - const maxServers = 100 - const numShuffleTests = 100 - const uniquePassRate = 0.5 - - // Make a huge list of nodes. - for i := 0; i < maxServers; i++ { - p.AddPrimaryServer(makeServerEndpointName()) - } - - // Keep track of how many unique shuffles we get. - uniques := make(map[string]struct{}, maxServers) - for i := 0; i < numShuffleTests; i++ { - p.RebalanceServers() - - var names []string - for j := 0; j < maxServers; j++ { - server := p.FindServer() - p.NotifyFailedServer(server) - names = append(names, server.Name) - } - key := strings.Join(names, "|") - uniques[key] = struct{}{} - } - - // We have to allow for the fact that there won't always be a unique - // shuffle each pass, so we just look for smell here without the test - // being flaky. - if len(uniques) < int(maxServers*uniquePassRate) { - t.Fatalf("unique shuffle ratio too low: %d/%d", len(uniques), maxServers) - } -} - -// func (p *RPCProxy) RemoveServer(server *ServerEndpoint) { -func TestRPCProxy_RemoveServer(t *testing.T) { - p := testRPCProxy() - if p.NumServers() != 0 { - t.Fatalf("Expected zero servers to start") - } - - // Test removing server before its added - s1Endpoint := makeServerEndpointName() - s1 := p.AddPrimaryServer(s1Endpoint) - if p.NumServers() != 1 { - t.Fatalf("bad") - } - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server: %+q", s1.Name) - } - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server: %+q", s1.Name) - } - p.RemoveServer(s1) - if p.NumServers() != 0 { - t.Fatalf("bad") - } - // Remove it a second time now that it doesn't exist - p.RemoveServer(s1) - if p.NumServers() != 0 { - t.Fatalf("bad") - } - p.AddPrimaryServer(s1Endpoint) - if p.NumServers() != 1 { - t.Fatalf("bad") - } - - s2Endpoint := makeServerEndpointName() - s2 := p.AddPrimaryServer(s2Endpoint) - if p.NumServers() != 2 { - t.Fatalf("bad") - } - if s2 == nil || s2.Name != s2Endpoint { - t.Fatalf("Expected s2 server: %+q", s2.Name) - } - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 to be the front of the list: %+q==%+q", s1.Name, s1Endpoint) - } - // Move s1 to the back of the server list - p.NotifyFailedServer(s1) - s2 = p.FindServer() - if s2 == nil || s2.Name != s2Endpoint { - t.Fatalf("Expected s2 server: %+q", s2Endpoint) - } - p.RemoveServer(s2) - if p.NumServers() != 1 { - t.Fatalf("bad") - } - p.RemoveServer(s2) - if p.NumServers() != 1 { - t.Fatalf("bad") - } - p.AddPrimaryServer(s2Endpoint) - - const maxServers = 19 - servers := make([]*ServerEndpoint, 0, maxServers) - servers = append(servers, s1) - servers = append(servers, s2) - // Already added two servers above - for i := maxServers; i > 2; i-- { - server := p.AddPrimaryServer(makeServerEndpointName()) - servers = append(servers, server) - } - if p.NumServers() != maxServers { - t.Fatalf("Expected %d servers, received %d", maxServers, p.NumServers()) - } - - p.RebalanceServers() - - if p.NumServers() != maxServers { - t.Fatalf("Expected %d servers, received %d", maxServers, p.NumServers()) - } - - findServer := func(server *ServerEndpoint) bool { - for i := p.NumServers(); i > 0; i-- { - s := p.FindServer() - if s == server { - return true - } - } - return false - } - - expectedNumServers := maxServers - removedServers := make([]*ServerEndpoint, 0, maxServers) - - // Remove servers from the front of the list - for i := 3; i > 0; i-- { - server := p.FindServer() - if server == nil { - t.Fatalf("FindServer returned nil") - } - p.RemoveServer(server) - expectedNumServers-- - if p.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, p.NumServers()) - } - if findServer(server) == true { - t.Fatalf("Did not expect to find server %s after removal from the front", server.Name) - } - removedServers = append(removedServers, server) - } - - // Remove server from the end of the list - for i := 3; i > 0; i-- { - server := p.FindServer() - p.NotifyFailedServer(server) - p.RemoveServer(server) - expectedNumServers-- - if p.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, p.NumServers()) - } - if findServer(server) == true { - t.Fatalf("Did not expect to find server %s", server.Name) - } - removedServers = append(removedServers, server) - } - - // Remove server from the middle of the list - for i := 3; i > 0; i-- { - server := p.FindServer() - p.NotifyFailedServer(server) - server2 := p.FindServer() - p.NotifyFailedServer(server2) // server2 now at end of the list - - p.RemoveServer(server) - expectedNumServers-- - if p.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, p.NumServers()) - } - if findServer(server) == true { - t.Fatalf("Did not expect to find server %s", server.Name) - } - removedServers = append(removedServers, server) - } - - if p.NumServers()+len(removedServers) != maxServers { - t.Fatalf("Expected %d+%d=%d servers", p.NumServers(), len(removedServers), maxServers) - } - - // Drain the remaining servers from the middle - for i := p.NumServers(); i > 0; i-- { - server := p.FindServer() - p.NotifyFailedServer(server) - server2 := p.FindServer() - p.NotifyFailedServer(server2) // server2 now at end of the list - p.RemoveServer(server) - removedServers = append(removedServers, server) - } - - if p.NumServers() != 0 { - t.Fatalf("Expected an empty server list") - } - if len(removedServers) != maxServers { - t.Fatalf("Expected all servers to be in removed server list") - } -} - -// func (p *RPCProxy) Start() { - -// func (l *serverList) cycleServer() (servers []*Server) { -func TestRPCProxyInternal_cycleServer(t *testing.T) { - p := testRPCProxy() - l := p.getServerList() - - server0 := &ServerEndpoint{Name: "server1"} - server1 := &ServerEndpoint{Name: "server2"} - server2 := &ServerEndpoint{Name: "server3"} - l.L = append(l.L, server0, server1, server2) - p.saveServerList(l) - - l = p.getServerList() - if len(l.L) != 3 { - t.Fatalf("server length incorrect: %d/3", len(l.L)) - } - if l.L[0] != server0 && - l.L[1] != server1 && - l.L[2] != server2 { - t.Fatalf("initial server ordering not correct") - } - - l.L = l.cycleServer() - if len(l.L) != 3 { - t.Fatalf("server length incorrect: %d/3", len(l.L)) - } - if l.L[0] != server1 && - l.L[1] != server2 && - l.L[2] != server0 { - t.Fatalf("server ordering after one cycle not correct") - } - - l.L = l.cycleServer() - if len(l.L) != 3 { - t.Fatalf("server length incorrect: %d/3", len(l.L)) - } - if l.L[0] != server2 && - l.L[1] != server0 && - l.L[2] != server1 { - t.Fatalf("server ordering after two cycles not correct") - } - - l.L = l.cycleServer() - if len(l.L) != 3 { - t.Fatalf("server length incorrect: %d/3", len(l.L)) - } - if l.L[0] != server0 && - l.L[1] != server1 && - l.L[2] != server2 { - t.Fatalf("server ordering after three cycles not correct") - } -} - -// func (p *RPCProxy) getServerList() serverList { -func TestRPCProxyInternal_getServerList(t *testing.T) { - p := testRPCProxy() - l := p.getServerList() - if l.L == nil { - t.Fatalf("serverList.servers nil") - } - - if len(l.L) != 0 { - t.Fatalf("serverList.servers length not zero") - } -} - -func TestRPCProxyInternal_New(t *testing.T) { - p := testRPCProxy() - if p == nil { - t.Fatalf("bad") - } - - if p.logger == nil { - t.Fatalf("bad") - } - - if p.shutdownCh == nil { - t.Fatalf("bad") - } -} - -// func (p *RPCProxy) reconcileServerList(l *serverList) bool { -func TestRPCProxyInternal_reconcileServerList(t *testing.T) { - tests := []int{0, 1, 2, 3, 4, 5, 10, 100} - for _, n := range tests { - ok, err := test_reconcileServerList(n) - if !ok { - t.Errorf("Expected %d to pass: %v", n, err) - } - } -} - -func test_reconcileServerList(maxServers int) (bool, error) { - // Build a server list, reconcile, verify the missing servers are - // missing, the added have been added, and the original server is - // present. - const failPct = 0.5 - p := testRPCProxyFailProb(failPct) - - var failedServers, healthyServers []*ServerEndpoint - for i := 0; i < maxServers; i++ { - nodeName := fmt.Sprintf("s%02d", i) - - node := &ServerEndpoint{Name: nodeName} - // Add 66% of servers to RPCProxy - if rand.Float64() > 0.33 { - p.activateEndpoint(node) - - // Of healthy servers, (ab)use connPoolPinger to - // failPct of the servers for the reconcile. This - // allows for the selected server to no longer be - // healthy for the reconcile below. - if ok, _ := p.connPoolPinger.PingNomadServer(p.configInfo.Region(), p.configInfo.RPCMajorVersion(), node); ok { - // Will still be present - healthyServers = append(healthyServers, node) - } else { - // Will be missing - failedServers = append(failedServers, node) - } - } else { - // Will be added from the call to reconcile - healthyServers = append(healthyServers, node) - } - } - - // Randomize RPCProxy's server list - p.RebalanceServers() - selectedServer := p.FindServer() - - var selectedServerFailed bool - for _, s := range failedServers { - if selectedServer.Key().Equal(s.Key()) { - selectedServerFailed = true - break - } - } - - // Update RPCProxy's server list to be "healthy" based on Serf. - // Reconcile this with origServers, which is shuffled and has a live - // connection, but possibly out of date. - origServers := p.getServerList() - p.saveServerList(serverList{L: healthyServers}) - - // This should always succeed with non-zero server lists - if !selectedServerFailed && !p.reconcileServerList(&origServers) && - len(p.getServerList().L) != 0 && - len(origServers.L) != 0 { - // If the random gods are unfavorable and we end up with zero - // length lists, expect things to fail and retry the test. - return false, fmt.Errorf("Expected reconcile to succeed: %v %d %d", - selectedServerFailed, - len(p.getServerList().L), - len(origServers.L)) - } - - // If we have zero-length server lists, test succeeded in degenerate - // case. - if len(p.getServerList().L) == 0 && - len(origServers.L) == 0 { - // Failed as expected w/ zero length list - return true, nil - } - - resultingServerMap := make(map[EndpointKey]bool) - for _, s := range p.getServerList().L { - resultingServerMap[*s.Key()] = true - } - - // Test to make sure no failed servers are in the RPCProxy's - // list. Error if there are any failedServers in l.servers - for _, s := range failedServers { - _, ok := resultingServerMap[*s.Key()] - if ok { - return false, fmt.Errorf("Found failed server %v in merged list %v", s, resultingServerMap) - } - } - - // Test to make sure all healthy servers are in the healthy list. - if len(healthyServers) != len(p.getServerList().L) { - return false, fmt.Errorf("Expected healthy map and servers to match: %d/%d", len(healthyServers), len(healthyServers)) - } - - // Test to make sure all healthy servers are in the resultingServerMap list. - for _, s := range healthyServers { - _, ok := resultingServerMap[*s.Key()] - if !ok { - return false, fmt.Errorf("Server %v missing from healthy map after merged lists", s) - } - } - return true, nil -} - -// func (l *serverList) refreshServerRebalanceTimer() { -func TestRPCProxyInternal_refreshServerRebalanceTimer(t *testing.T) { - type clusterSizes struct { - numNodes int - numServers int - minRebalance time.Duration - } - clusters := []clusterSizes{ - {0, 3, 10 * time.Minute}, - {1, 0, 10 * time.Minute}, // partitioned cluster - {1, 3, 10 * time.Minute}, - {2, 3, 10 * time.Minute}, - {100, 0, 10 * time.Minute}, // partitioned - {100, 1, 10 * time.Minute}, // partitioned - {100, 3, 10 * time.Minute}, - {1024, 1, 10 * time.Minute}, // partitioned - {1024, 3, 10 * time.Minute}, // partitioned - {1024, 5, 10 * time.Minute}, - {16384, 1, 10 * time.Minute}, // partitioned - {16384, 2, 10 * time.Minute}, // partitioned - {16384, 3, 10 * time.Minute}, // partitioned - {16384, 5, 10 * time.Minute}, - {65535, 0, 10 * time.Minute}, // partitioned - {65535, 1, 10 * time.Minute}, // partitioned - {65535, 2, 10 * time.Minute}, // partitioned - {65535, 3, 10 * time.Minute}, // partitioned - {65535, 5, 10 * time.Minute}, // partitioned - {65535, 7, 10 * time.Minute}, - {1000000, 1, 10 * time.Minute}, // partitioned - {1000000, 2, 10 * time.Minute}, // partitioned - {1000000, 3, 10 * time.Minute}, // partitioned - {1000000, 5, 10 * time.Minute}, // partitioned - {1000000, 11, 10 * time.Minute}, // partitioned - {1000000, 19, 10 * time.Minute}, - } - - logger := log.New(os.Stderr, "", log.LstdFlags) - shutdownCh := make(chan struct{}) - - for i, s := range clusters { - p := NewRPCProxy(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}) - for i := 0; i < s.numServers; i++ { - nodeName := fmt.Sprintf("s%02d", i) - p.activateEndpoint(&ServerEndpoint{Name: nodeName}) - } - - d := p.refreshServerRebalanceTimer() - if d < s.minRebalance { - t.Errorf("[%d] duration too short for cluster of size %d and %d servers (%s < %s)", i, s.numNodes, s.numServers, d, s.minRebalance) - } - } -} - -// func (p *RPCProxy) saveServerList(l serverList) { -func TestRPCProxyInternal_saveServerList(t *testing.T) { - p := testRPCProxy() - - // Initial condition - func() { - l := p.getServerList() - if len(l.L) != 0 { - t.Fatalf("RPCProxy.saveServerList failed to load init config") - } - - newServer := new(ServerEndpoint) - l.L = append(l.L, newServer) - p.saveServerList(l) - }() - - // Test that save works - func() { - l1 := p.getServerList() - t1NumServers := len(l1.L) - if t1NumServers != 1 { - t.Fatalf("RPCProxy.saveServerList failed to save mutated config") - } - }() - - // Verify mutation w/o a save doesn't alter the original - func() { - newServer := new(ServerEndpoint) - l := p.getServerList() - l.L = append(l.L, newServer) - - l_orig := p.getServerList() - origNumServers := len(l_orig.L) - if origNumServers >= len(l.L) { - t.Fatalf("RPCProxy.saveServerList unsaved config overwrote original") - } - }() -} diff --git a/client/rpcproxy/server_endpoint.go b/client/rpcproxy/server_endpoint.go deleted file mode 100644 index d9b1add5b6a..00000000000 --- a/client/rpcproxy/server_endpoint.go +++ /dev/null @@ -1,84 +0,0 @@ -package rpcproxy - -import ( - "fmt" - "net" - "strings" -) - -const ( - defaultNomadRPCPort = "4647" -) - -// EndpointKey is used in maps and for equality tests. A key is based on endpoints. -type EndpointKey struct { - name string -} - -// Equal compares two EndpointKey objects -func (k *EndpointKey) Equal(x *EndpointKey) bool { - return k.name == x.name -} - -// ServerEndpoint contains the address information for to connect to a Nomad -// server. -// -// TODO(sean@): Server is stubbed out so that in the future it can hold a -// reference to Node (and ultimately Node.ID). -type ServerEndpoint struct { - // Name is the unique lookup key for a Server instance - Name string - Host string - Port string - Addr net.Addr -} - -// Key returns the corresponding Key -func (s *ServerEndpoint) Key() *EndpointKey { - return &EndpointKey{ - name: s.Name, - } -} - -// NewServerEndpoint creates a new Server instance with a resolvable -// endpoint. `name` can be either an IP address or a DNS name. If `name` is -// a DNS name, it must be resolvable to an IP address (most inputs are IP -// addresses, not DNS names, but both work equally well when the name is -// resolvable). -func NewServerEndpoint(name string) (*ServerEndpoint, error) { - s := &ServerEndpoint{ - Name: name, - } - - var host, port string - var err error - host, port, err = net.SplitHostPort(name) - if err == nil { - s.Host = host - s.Port = port - } else { - if strings.Contains(err.Error(), "missing port") { - s.Host = name - s.Port = defaultNomadRPCPort - } else { - return nil, err - } - } - - if s.Addr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(s.Host, s.Port)); err != nil { - return nil, err - } - - return s, err -} - -// String returns a string representation of Server -func (s *ServerEndpoint) String() string { - var addrStr, networkStr string - if s.Addr != nil { - addrStr = s.Addr.String() - networkStr = s.Addr.Network() - } - - return fmt.Sprintf("%s (%s:%s)", s.Name, networkStr, addrStr) -} diff --git a/client/rpcproxy/server_endpoint_test.go b/client/rpcproxy/server_endpoint_test.go deleted file mode 100644 index f04494859d3..00000000000 --- a/client/rpcproxy/server_endpoint_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package rpcproxy - -import ( - "fmt" - "net" - "testing" -) - -// func (k *EndpointKey) Equal(x *EndpointKey) { -func TestServerEndpointKey_Equal(t *testing.T) { - tests := []struct { - name string - s1 *ServerEndpoint - s2 *ServerEndpoint - equal bool - }{ - { - name: "equal", - s1: &ServerEndpoint{Name: "k1"}, - s2: &ServerEndpoint{Name: "k1"}, - equal: true, - }, - { - name: "not equal", - s1: &ServerEndpoint{Name: "k1"}, - s2: &ServerEndpoint{Name: "k2"}, - equal: false, - }, - } - - for _, test := range tests { - if test.s1.Key().Equal(test.s2.Key()) != test.equal { - t.Errorf("fixture %s failed forward comparison", test.name) - } - - if test.s2.Key().Equal(test.s1.Key()) != test.equal { - t.Errorf("fixture %s failed reverse comparison", test.name) - } - } -} - -// func (k *ServerEndpoint) String() { -func TestServerEndpoint_String(t *testing.T) { - tests := []struct { - name string - s *ServerEndpoint - str string - }{ - { - name: "name", - s: &ServerEndpoint{Name: "s"}, - str: "s (:)", - }, - { - name: "name, host, port", - s: &ServerEndpoint{ - Name: "s", - Host: "127.0.0.1", - Port: "4647", - }, - str: "s (tcp:127.0.0.1:4647)", - }, - } - - for _, test := range tests { - if test.s.Addr == nil && (test.s.Host != "" && test.s.Port != "") { - fmt.Printf("Setting addr\n") - addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(test.s.Host, test.s.Port)) - if err == nil { - test.s.Addr = addr - } - } - if test.s.String() != test.str { - t.Errorf("fixture %q failed: %q vs %q", test.name, test.s.String(), test.str) - } - } -} diff --git a/client/serverlist.go b/client/serverlist.go new file mode 100644 index 00000000000..87aec05e61f --- /dev/null +++ b/client/serverlist.go @@ -0,0 +1,111 @@ +package client + +import ( + "math/rand" + "net" + "sort" + "strings" + "sync" +) + +// serverlist is a prioritized randomized list of nomad servers. Users should +// call all() to retrieve the full list, followed by failed(e) on each endpoint +// that's failed and good(e) when a valid endpoint is found. +type serverlist struct { + e endpoints + mu sync.RWMutex +} + +func newServerList() *serverlist { + return &serverlist{} +} + +// set the server list to a new list. The new list will be shuffled and sorted +// by priority. +func (s *serverlist) set(in endpoints) { + s.mu.Lock() + s.e = in + s.mu.Unlock() +} + +// all returns a copy of the full server list, shuffled and then sorted by +// priority +func (s *serverlist) all() endpoints { + s.mu.RLock() + out := make(endpoints, len(s.e)) + copy(out, s.e) + s.mu.RUnlock() + + // Randomize the order + for i, j := range rand.Perm(len(out)) { + out[i], out[j] = out[j], out[i] + } + + // Sort by priority + sort.Sort(out) + return out +} + +// failed endpoint will be deprioritized if its still in the list. +func (s *serverlist) failed(e *endpoint) { + s.mu.Lock() + defer s.mu.Unlock() + for _, cur := range s.e { + if cur.equal(e) { + cur.priority++ + return + } + } +} + +// good endpoint will get promoted to the highest priority if it's still in the +// list. +func (s *serverlist) good(e *endpoint) { + s.mu.Lock() + defer s.mu.Unlock() + for _, cur := range s.e { + if cur.equal(e) { + cur.priority = 0 + return + } + } +} + +func (e endpoints) Len() int { + return len(e) +} + +func (e endpoints) Less(i int, j int) bool { + // Sort only by priority as endpoints should be shuffled and ordered + // only by priority + return e[i].priority < e[j].priority +} + +func (e endpoints) Swap(i int, j int) { + e[i], e[j] = e[j], e[i] +} + +type endpoints []*endpoint + +func (e endpoints) String() string { + names := make([]string, 0, len(e)) + for _, endpoint := range e { + names = append(names, endpoint.name) + } + return strings.Join(names, ",") +} + +type endpoint struct { + name string + addr net.Addr + + // 0 being the highest priority + priority int +} + +// equal returns true if the name and addr match between two endpoints. +// Priority is ignored because the same endpoint may be added by discovery and +// heartbeating with different priorities. +func (e *endpoint) equal(o *endpoint) bool { + return e.name == o.name && e.addr == o.addr +} diff --git a/client/serverlist_test.go b/client/serverlist_test.go new file mode 100644 index 00000000000..1dfdb26dec9 --- /dev/null +++ b/client/serverlist_test.go @@ -0,0 +1,117 @@ +package client + +import ( + "log" + "os" + "strings" + "testing" +) + +func TestServerList(t *testing.T) { + s := newServerList() + + // New lists should be empty + if e := s.all(); len(e) != 0 { + t.Fatalf("expected empty list to return an empty list, but received: %+q", e) + } + + mklist := func() endpoints { + return endpoints{ + &endpoint{"b", nil, 1}, + &endpoint{"c", nil, 1}, + &endpoint{"g", nil, 2}, + &endpoint{"d", nil, 1}, + &endpoint{"e", nil, 1}, + &endpoint{"f", nil, 1}, + &endpoint{"h", nil, 2}, + &endpoint{"a", nil, 0}, + } + } + s.set(mklist()) + + orig := mklist() + all := s.all() + if len(all) != len(orig) { + t.Fatalf("expected %d endpoints but only have %d", len(orig), len(all)) + } + + // Assert list is properly randomized+sorted + for i, pri := range []int{0, 1, 1, 1, 1, 1, 2, 2} { + if all[i].priority != pri { + t.Errorf("expected endpoint %d (%+q) to be priority %d", i, all[i], pri) + } + } + + // Subsequent sets should reshuffle (try multiple times as they may + // shuffle in the same order) + tries := 0 + max := 3 + for ; tries < max; tries++ { + if s.all().String() == s.all().String() { + // eek, matched; try again in case we just got unlucky + continue + } + break + } + if tries == max { + t.Fatalf("after %d attempts servers were still not random reshuffled", tries) + } + + // Mark an endpoint as failed enough that it should be at the end of the list + sa := &endpoint{"a", nil, 0} + s.failed(sa) + s.failed(sa) + s.failed(sa) + all2 := s.all() + if len(all2) != len(orig) { + t.Fatalf("marking should not have changed list length") + } + if all2[len(all)-1].name != sa.name { + t.Fatalf("failed endpoint should be at end of list: %+q", all2) + } + + // But if the bad endpoint succeeds even once it should be bumped to the top group + s.good(sa) + found := false + for _, e := range s.all() { + if e.name == sa.name { + if e.priority != 0 { + t.Fatalf("server newly marked good should have highest priority") + } + found = true + } + } + if !found { + t.Fatalf("what happened to endpoint A?!") + } +} + +// TestClient_ServerList tests client methods that interact with the internal +// nomad server list. +func TestClient_ServerList(t *testing.T) { + // manually create a mostly empty client to avoid spinning up a ton of + // goroutines that complicate testing + client := Client{servers: newServerList(), logger: log.New(os.Stderr, "", log.Ltime|log.Lshortfile)} + + if s := client.GetServers(); len(s) != 0 { + t.Fatalf("expected server lit to be empty but found: %+q", s) + } + if err := client.SetServers(nil); err != noServers { + t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err) + } + if err := client.SetServers([]string{"not-a-real-domain.fake"}); err == nil { + t.Fatalf("expected setting a bad server to return an error") + } + if err := client.SetServers([]string{"bad.fake", "127.0.0.1:1234", "127.0.0.1"}); err != nil { + t.Fatalf("expected setting at least one good server to succeed but received: %v", err) + } + s := client.GetServers() + if len(s) != 2 { + t.Fatalf("expected 2 servers but received: %+q", s) + } + for _, host := range s { + if !strings.HasPrefix(host, "127.0.0.1:") { + t.Errorf("expected both servers to be localhost and include port but found: %s", host) + } + } +} diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index aaf466373b0..5e9a0c8d8fc 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -139,7 +139,7 @@ func (s *HTTPServer) listServers(resp http.ResponseWriter, req *http.Request) (i return nil, CodedError(501, ErrInvalidMethod) } - peers := s.agent.client.RPCProxy().ServerRPCAddrs() + peers := s.agent.client.GetServers() return peers, nil } @@ -156,12 +156,11 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request) } // Set the servers list into the client - for _, server := range servers { - s.agent.logger.Printf("[TRACE] Adding server %s to the client's primary server list", server) - se := client.AddPrimaryServerToRPCProxy(server) - if se == nil { - s.agent.logger.Printf("[ERR] Attempt to add server %q to client failed", server) - } + s.agent.logger.Printf("[TRACE] Adding servers %+q to the client's primary server list", servers) + if err := client.SetServers(servers); err != nil { + s.agent.logger.Printf("[ERR] Attempt to add servers %q to client failed: %v", servers, err) + //TODO is this the right error to return? + return nil, CodedError(400, err.Error()) } return nil, nil } diff --git a/nomad/pool.go b/nomad/pool.go index 669e788989e..7ef7acf596e 100644 --- a/nomad/pool.go +++ b/nomad/pool.go @@ -12,7 +12,6 @@ import ( "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/hashicorp/nomad/client/rpcproxy" "github.com/hashicorp/yamux" ) @@ -376,9 +375,9 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string, // PingNomadServer sends a Status.Ping message to the specified server and // returns true if healthy, false if an error occurred -func (p *ConnPool) PingNomadServer(region string, apiMajorVersion int, s *rpcproxy.ServerEndpoint) (bool, error) { +func (p *ConnPool) PingNomadServer(region string, apiMajorVersion int, s net.Addr) (bool, error) { // Get a usable client - conn, sc, err := p.getClient(region, s.Addr, apiMajorVersion) + conn, sc, err := p.getClient(region, s, apiMajorVersion) if err != nil { return false, err } From 6d9670e9ab7389189a14ac81ecb642c33d7a0a5a Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 23 Sep 2016 17:02:48 -0700 Subject: [PATCH 02/14] Replace periodic handlers with event driven disco Remove use of periodic consul handlers in the client and just use goroutines. Consul Discovery is now triggered with a chan instead of using a timer and deadline to trigger. Once discovery is complete a chan is ticked so all goroutines waiting for servers will run. Should speed up bootstraping and recovery while decreasing spinning on timers. --- client/client.go | 377 ++++++++++++++++++++++++++--------------------- 1 file changed, 208 insertions(+), 169 deletions(-) diff --git a/client/client.go b/client/client.go index 918aa1ec147..cfb136a5fa0 100644 --- a/client/client.go +++ b/client/client.go @@ -43,6 +43,10 @@ const ( // datacenters looking for the Nomad server service. datacenterQueryLimit = 9 + // consulReaperIntv is the interval at which the consul reaper will + // run. + consulReaperIntv = 5 * time.Second + // registerRetryIntv is minimum interval on which we retry // registration. We pick a value between this and 2x this. registerRetryIntv = 15 * time.Second @@ -109,14 +113,13 @@ type Client struct { // servers is the (optionally prioritized) list of nomad servers servers *serverlist - // consulDiscoverNext is the deadline at which this Nomad Agent will - // poll Consul for a list of Nomad Servers. When Nomad Clients are - // heartbeating successfully with Nomad Servers, Nomad Clients do not - // poll Consul to populate their server list. - consulDiscoverNext time.Time - lastHeartbeat time.Time - heartbeatTTL time.Duration - heartbeatLock sync.Mutex + // heartbeat related times for tracking how often to heartbeat + lastHeartbeat time.Time + heartbeatTTL time.Duration + heartbeatLock sync.Mutex + + // doDisco triggers consul discovery; see triggerDiscovery + doDisco chan struct{} // discovered will be ticked whenever consul discovery completes // succesfully @@ -137,6 +140,9 @@ type Client struct { // consulSyncer advertises this Nomad Agent with Consul consulSyncer *consul.Syncer + // consulReaperTicker ticks when the reaper should run + consulReaperTicker *time.Ticker + // HostStatsCollector collects host resource usage stats hostStatsCollector *stats.HostStatsCollector resourceUsage *stats.HostStats @@ -158,9 +164,11 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg c := &Client{ config: cfg, consulSyncer: consulSyncer, + consulReaperTicker: time.NewTicker(consulReaperIntv), start: time.Now(), connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), servers: newServerList(), + doDisco: make(chan struct{}), discovered: make(chan struct{}), logger: logger, hostStatsCollector: stats.NewHostStatsCollector(), @@ -209,11 +217,18 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg } c.configLock.RUnlock() - // Setup the Consul syncer - if err := c.setupConsulSyncer(); err != nil { - return nil, fmt.Errorf("failed to create client Consul syncer: %v", err) + // Setup Consul discovery if enabled + if c.configCopy.ConsulConfig.ClientAutoJoin { + go c.consulDiscovery() + if len(c.servers.all()) == 0 { + // No configured servers; trigger discovery manually + <-c.doDisco + } } + // Start Consul reaper + go c.consulReaper() + // Setup the vault client for token and secret renewals if err := c.setupVaultClient(); err != nil { return nil, fmt.Errorf("failed to setup vault client: %v", err) @@ -811,29 +826,33 @@ func (c *Client) registerAndHeartbeat() { for { select { + case <-c.discovered: case <-heartbeat: - if err := c.updateNodeStatus(); err != nil { - // The servers have changed such that this node has not been - // registered before - if strings.Contains(err.Error(), "node not found") { - // Re-register the node - c.logger.Printf("[INFO] client: re-registering node") - c.retryRegisterNode() - heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger)) - } else { - intv := c.retryIntv(registerRetryIntv) - c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err) - heartbeat = time.After(intv) - } - } else { - c.heartbeatLock.Lock() - heartbeat = time.After(c.heartbeatTTL) - c.heartbeatLock.Unlock() - } - case <-c.shutdownCh: return } + + if err := c.updateNodeStatus(); err != nil { + // The servers have changed such that this node has not been + // registered before + if strings.Contains(err.Error(), "node not found") { + // Re-register the node + c.logger.Printf("[INFO] client: re-registering node") + c.retryRegisterNode() + heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger)) + } else { + intv := c.retryIntv(registerRetryIntv) + c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err) + heartbeat = time.After(intv) + + // if heartbeating fails, trigger consul discovery + c.triggerDiscovery() + } + } else { + c.heartbeatLock.Lock() + heartbeat = time.After(c.heartbeatTTL) + c.heartbeatLock.Unlock() + } } } @@ -902,10 +921,13 @@ func (c *Client) retryRegisterNode() { for { err := c.registerNode() if err == nil { + // Registered! return } + if err == noServers { c.logger.Print("[DEBUG] client: registration waiting on servers") + c.triggerDiscovery() } else { c.logger.Printf("[ERR] client: registration failure: %v", err) } @@ -927,9 +949,6 @@ func (c *Client) registerNode() error { } var resp structs.NodeUpdateResponse if err := c.RPC("Node.Register", &req, &resp); err != nil { - if time.Since(c.start) > registerErrGrace { - return fmt.Errorf("failed to register node: %v", err) - } return err } @@ -955,10 +974,6 @@ func (c *Client) updateNodeStatus() error { c.heartbeatLock.Lock() defer c.heartbeatLock.Unlock() - // If anything goes wrong we want consul discovery to happen ASAP. The - // heartbeat lock keeps it from running concurrent with node updating. - c.consulDiscoverNext = time.Time{} - node := c.Node() req := structs.NodeUpdateStatusRequest{ NodeID: node.ID, @@ -967,6 +982,7 @@ func (c *Client) updateNodeStatus() error { } var resp structs.NodeUpdateResponse if err := c.RPC("Node.UpdateStatus", &req, &resp); err != nil { + c.triggerDiscovery() return fmt.Errorf("failed to update status: %v", err) } if len(resp.EvalIDs) != 0 { @@ -1006,11 +1022,9 @@ func (c *Client) updateNodeStatus() error { // has connectivity to the existing majority of Nomad Servers, but // only if it queries Consul. if resp.LeaderRPCAddr == "" { - return nil + c.triggerDiscovery() } - const heartbeatFallbackFactor = 3 - c.consulDiscoverNext = time.Now().Add(heartbeatFallbackFactor * resp.HeartbeatTTL) return nil } @@ -1134,6 +1148,13 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { resp = structs.NodeClientAllocsResponse{} err := c.RPC("Node.GetClientAllocs", &req, &resp) if err != nil { + // Shutdown often causes EOF errors, so check for shutdown first + select { + case <-c.shutdownCh: + return + default: + } + if err != noServers { c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err) } @@ -1187,6 +1208,8 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err) retry := c.retryIntv(getAllocRetryIntv) select { + case <-c.discovered: + continue case <-time.After(retry): continue case <-c.shutdownCh: @@ -1455,159 +1478,175 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli return unwrappedTokens, nil } -// setupConsulSyncer creates Client-mode consul.Syncer which periodically -// executes callbacks on a fixed interval. -// -// TODO(sean@): this could eventually be moved to a priority queue and give -// each task an interval, but that is not necessary at this time. -func (c *Client) setupConsulSyncer() error { - disco := func() error { - c.heartbeatLock.Lock() - defer c.heartbeatLock.Unlock() - - // Don't run before the deadline. When bootstrapping is done - // and heartbeats are working this is the common path. - if time.Now().Before(c.consulDiscoverNext) { - return nil +// triggerDiscovery causes a consul discovery to begin (if one hasn't alread) +func (c *Client) triggerDiscovery() { + select { + case <-c.doDisco: + // Discovery goroutine was released to execute + default: + // Discovery goroutine was already running + } +} + +func (c *Client) consulDiscovery() { + for { + select { + case c.doDisco <- struct{}{}: + if err := c.doConsulDisco(); err != nil { + c.logger.Printf("[ERR] client.consul: error discovering nomad servers: %v", err) + } + case <-c.shutdownCh: + return } + } +} - consulCatalog := c.consulSyncer.ConsulClient().Catalog() - dcs, err := consulCatalog.Datacenters() +func (c *Client) doConsulDisco() error { + // Acquire heartbeat lock to prevent heartbeat from running + // concurrently with discovery. Concurrent execution is safe, however + // discovery is usually triggered when heartbeating has failed so + // there's no point in allowing it. + c.heartbeatLock.Lock() + defer c.heartbeatLock.Unlock() + + consulCatalog := c.consulSyncer.ConsulClient().Catalog() + dcs, err := consulCatalog.Datacenters() + if err != nil { + return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err) + } + if len(dcs) > 2 { + // Query the local DC first, then shuffle the + // remaining DCs. Future heartbeats will cause Nomad + // Clients to fixate on their local datacenter so + // it's okay to talk with remote DCs. If the no + // Nomad servers are available within + // datacenterQueryLimit, the next heartbeat will pick + // a new set of servers so it's okay. + shuffleStrings(dcs[1:]) + dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)] + } + + // Query for servers in this client's region only + region := c.Region() + rpcargs := structs.GenericRequest{ + QueryOptions: structs.QueryOptions{ + Region: region, + }, + } + + serviceName := c.configCopy.ConsulConfig.ServerServiceName + var mErr multierror.Error + var servers endpoints + c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs) +DISCOLOOP: + for _, dc := range dcs { + consulOpts := &consulapi.QueryOptions{ + AllowStale: true, + Datacenter: dc, + Near: "_agent", + WaitTime: consul.DefaultQueryWaitDuration, + } + consulServices, _, err := consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts) if err != nil { - return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err) - } - if len(dcs) > 2 { - // Query the local DC first, then shuffle the - // remaining DCs. Future heartbeats will cause Nomad - // Clients to fixate on their local datacenter so - // it's okay to talk with remote DCs. If the no - // Nomad servers are available within - // datacenterQueryLimit, the next heartbeat will pick - // a new set of servers so it's okay. - shuffleStrings(dcs[1:]) - dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)] - } - - // Query for servers in this client's region only - region := c.Region() - rpcargs := structs.GenericRequest{ - QueryOptions: structs.QueryOptions{ - Region: region, - }, - } - - serviceName := c.configCopy.ConsulConfig.ServerServiceName - var mErr multierror.Error - var servers endpoints - c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs) - DISCOLOOP: - for _, dc := range dcs { - consulOpts := &consulapi.QueryOptions{ - AllowStale: true, - Datacenter: dc, - Near: "_agent", - WaitTime: consul.DefaultQueryWaitDuration, + mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err)) + continue + } + + for _, s := range consulServices { + port := strconv.Itoa(s.ServicePort) + addrstr := s.ServiceAddress + if addrstr == "" { + addrstr = s.Address } - consulServices, _, err := consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts) + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port)) if err != nil { - mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err)) + mErr.Errors = append(mErr.Errors, err) + continue + } + var peers []string + if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil { + mErr.Errors = append(mErr.Errors, err) continue } - for _, s := range consulServices { - port := strconv.Itoa(s.ServicePort) - addrstr := s.ServiceAddress - if addrstr == "" { - addrstr = s.Address - } - addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port)) + // Successfully received the Server peers list of the correct + // region + for _, p := range peers { + addr, err := net.ResolveTCPAddr("tcp", p) if err != nil { mErr.Errors = append(mErr.Errors, err) - continue - } - var peers []string - if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil { - mErr.Errors = append(mErr.Errors, err) - continue - } - - // Successfully received the Server peers list of the correct - // region - for _, p := range peers { - addr, err := net.ResolveTCPAddr("tcp", p) - if err != nil { - mErr.Errors = append(mErr.Errors, err) - } - servers = append(servers, &endpoint{name: p, addr: addr}) - } - if len(servers) > 0 { - break DISCOLOOP } + servers = append(servers, &endpoint{name: p, addr: addr}) } - } - if len(servers) == 0 { - if len(mErr.Errors) > 0 { - return mErr.ErrorOrNil() + if len(servers) > 0 { + break DISCOLOOP } - return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %+q", serviceName, dcs) } + } + if len(servers) == 0 { + if len(mErr.Errors) > 0 { + return mErr.ErrorOrNil() + } + return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %+q", serviceName, dcs) + } - c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", servers) - c.servers.set(servers) - - // Make sure registration is given ample opportunity to run - // before trying consul discovery again. - c.consulDiscoverNext = time.Now().Add(2 * registerRetryIntv) + c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", servers) + c.servers.set(servers) - // Notify waiting rpc calls. Wait briefly in case initial rpc - // just failed but the calling goroutine isn't selecting on - // discovered yet. - const dur = 50 * time.Millisecond - timeout := time.NewTimer(dur) - for { - select { - case c.discovered <- struct{}{}: - if !timeout.Stop() { - <-timeout.C - } - timeout.Reset(dur) - case <-timeout.C: - return nil + // Notify waiting rpc calls. Wait briefly in case initial rpc + // just failed but the calling goroutine isn't selecting on + // discovered yet. + const dur = 50 * time.Millisecond + timeout := time.NewTimer(dur) + for { + select { + case c.discovered <- struct{}{}: + if !timeout.Stop() { + <-timeout.C } + timeout.Reset(dur) + case <-timeout.C: + return nil } } - if c.configCopy.ConsulConfig.ClientAutoJoin { - c.consulSyncer.AddPeriodicHandler("Nomad Client Consul Server Discovery", disco) - } - - consulServicesReaperFn := func() error { - const estInitialExecutorDomains = 8 - - // Create the domains to keep and add the server and client - domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains) - domains[0] = consul.ServerDomain - domains[1] = consul.ClientDomain - - for allocID, ar := range c.getAllocRunners() { - ar.taskStatusLock.RLock() - taskStates := copyTaskStates(ar.taskStates) - ar.taskStatusLock.RUnlock() - for taskName, taskState := range taskStates { - // Only keep running tasks - if taskState.State == structs.TaskStateRunning { - d := consul.NewExecutorDomain(allocID, taskName) - domains = append(domains, d) - } +} + +func (c *Client) consulReaper() { + defer c.consulReaperTicker.Stop() + for { + select { + case <-c.consulReaperTicker.C: + if err := c.doConsulReap(); err != nil { + c.logger.Printf("[ERR] consul.client: error reaping services in consul: %v", err) } + case <-c.shutdownCh: + return } - - return c.consulSyncer.ReapUnmatched(domains) } - if c.config.ConsulConfig.AutoAdvertise { - c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesReaperFn) +} + +func (c *Client) doConsulReap() error { + const estInitialExecutorDomains = 8 + + // Create the domains to keep and add the server and client + domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains) + domains[0] = consul.ServerDomain + domains[1] = consul.ClientDomain + + for allocID, ar := range c.getAllocRunners() { + ar.taskStatusLock.RLock() + taskStates := copyTaskStates(ar.taskStates) + ar.taskStatusLock.RUnlock() + for taskName, taskState := range taskStates { + // Only keep running tasks + if taskState.State == structs.TaskStateRunning { + d := consul.NewExecutorDomain(allocID, taskName) + domains = append(domains, d) + } + } } - return nil + return c.consulSyncer.ReapUnmatched(domains) } // collectHostStats collects host resource usage stats periodically From 6b9be92dea588d85187a56b2678912e6b82bf908 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 26 Sep 2016 11:57:06 -0700 Subject: [PATCH 03/14] Fix agent server set test Test asserted endpoint appendend servers, but the new/desired behavior is for the endpoint to set/overwrite servers. --- command/agent/agent_endpoint_test.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 03f7b2a7904..c79f9dae509 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -114,13 +114,6 @@ func TestHTTP_AgentSetServers(t *testing.T) { } respW := httptest.NewRecorder() - // Make the request and check the result - out, err := s.Server.AgentServersRequest(respW, req) - if err != nil { - t.Fatalf("err: %s", err) - } - numServers := len(out.([]string)) - // Create the request req, err = http.NewRequest("PUT", "/v1/agent/servers", nil) if err != nil { @@ -135,7 +128,7 @@ func TestHTTP_AgentSetServers(t *testing.T) { } // Create a valid request - req, err = http.NewRequest("PUT", "/v1/agent/servers?address=127.0.0.1%3A4647&address=127.0.0.2%3A4647", nil) + req, err = http.NewRequest("PUT", "/v1/agent/servers?address=127.0.0.1%3A4647&address=127.0.0.2%3A4647&address=127.0.0.3%3A4647", nil) if err != nil { t.Fatalf("err: %s", err) } @@ -158,14 +151,15 @@ func TestHTTP_AgentSetServers(t *testing.T) { expected := map[string]bool{ "127.0.0.1:4647": true, "127.0.0.2:4647": true, + "127.0.0.3:4647": true, } - out, err = s.Server.AgentServersRequest(respW, req) + out, err := s.Server.AgentServersRequest(respW, req) if err != nil { t.Fatalf("err: %s", err) } servers := out.([]string) - if n := len(servers); n != numServers+2 { - t.Fatalf("expected %d servers, got: %d: %v", numServers+2, n, servers) + if n := len(servers); n != len(expected) { + t.Fatalf("expected %d servers, got: %d: %v", len(expected), n, servers) } received := make(map[string]bool, len(servers)) for _, server := range servers { From 01ff307c3e9ca2f41b8b5b23f61288409edf053f Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 26 Sep 2016 15:06:57 -0700 Subject: [PATCH 04/14] consul -> Consul --- client/client.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/client/client.go b/client/client.go index cfb136a5fa0..e5d8806f7e8 100644 --- a/client/client.go +++ b/client/client.go @@ -43,7 +43,7 @@ const ( // datacenters looking for the Nomad server service. datacenterQueryLimit = 9 - // consulReaperIntv is the interval at which the consul reaper will + // consulReaperIntv is the interval at which the Consul reaper will // run. consulReaperIntv = 5 * time.Second @@ -118,10 +118,10 @@ type Client struct { heartbeatTTL time.Duration heartbeatLock sync.Mutex - // doDisco triggers consul discovery; see triggerDiscovery + // doDisco triggers Consul discovery; see triggerDiscovery doDisco chan struct{} - // discovered will be ticked whenever consul discovery completes + // discovered will be ticked whenever Consul discovery completes // succesfully discovered chan struct{} @@ -382,7 +382,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { return nil } - // Force consul discovery ASAP since we have no healthy servers + // Force Consul discovery ASAP since we have no healthy servers return mErr.ErrorOrNil() } @@ -845,7 +845,7 @@ func (c *Client) registerAndHeartbeat() { c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err) heartbeat = time.After(intv) - // if heartbeating fails, trigger consul discovery + // if heartbeating fails, trigger Consul discovery c.triggerDiscovery() } } else { @@ -1478,7 +1478,7 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli return unwrappedTokens, nil } -// triggerDiscovery causes a consul discovery to begin (if one hasn't alread) +// triggerDiscovery causes a Consul discovery to begin (if one hasn't alread) func (c *Client) triggerDiscovery() { select { case <-c.doDisco: From 3bac14324fffdd777dac6a61145da78f10cf1e07 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 26 Sep 2016 15:12:35 -0700 Subject: [PATCH 05/14] noServers -> noServersErr --- client/client.go | 15 ++++++++++----- client/serverlist_test.go | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/client/client.go b/client/client.go index e5d8806f7e8..51c116ee2a2 100644 --- a/client/client.go +++ b/client/client.go @@ -156,7 +156,12 @@ type Client struct { vaultClient vaultclient.VaultClient } -var noServers = errors.New("no servers") +var ( + // noServersErr is returned by the RPC method when the client has no + // configured servers. This is used to trigger Consul discovery if + // enabled. + noServersErr = errors.New("no servers") +) // NewClient is used to create a new client from the given configuration func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Client, error) { @@ -365,7 +370,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { servers := c.servers.all() if len(servers) == 0 { - return noServers + return noServersErr } var mErr multierror.Error @@ -483,7 +488,7 @@ func (c *Client) SetServers(servers []string) error { if len(merr.Errors) > 0 { return merr.ErrorOrNil() } - return noServers + return noServersErr } c.servers.set(endpoints) @@ -925,7 +930,7 @@ func (c *Client) retryRegisterNode() { return } - if err == noServers { + if err == noServersErr { c.logger.Print("[DEBUG] client: registration waiting on servers") c.triggerDiscovery() } else { @@ -1155,7 +1160,7 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { default: } - if err != noServers { + if err != noServersErr { c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err) } retry := c.retryIntv(getAllocRetryIntv) diff --git a/client/serverlist_test.go b/client/serverlist_test.go index 1dfdb26dec9..8b7df22c6b8 100644 --- a/client/serverlist_test.go +++ b/client/serverlist_test.go @@ -96,7 +96,7 @@ func TestClient_ServerList(t *testing.T) { if s := client.GetServers(); len(s) != 0 { t.Fatalf("expected server lit to be empty but found: %+q", s) } - if err := client.SetServers(nil); err != noServers { + if err := client.SetServers(nil); err != noServersErr { t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err) } if err := client.SetServers([]string{"not-a-real-domain.fake"}); err == nil { From a35fb33f8f1bbeb87636416e3d7278fcc59cebc0 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 26 Sep 2016 15:20:43 -0700 Subject: [PATCH 06/14] doDisco -> triggerDiscoveryCh; discovered -> serversDiscoveredCh Also fix log line formatting --- client/client.go | 52 ++++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/client/client.go b/client/client.go index 51c116ee2a2..f45c9f69b9b 100644 --- a/client/client.go +++ b/client/client.go @@ -118,12 +118,12 @@ type Client struct { heartbeatTTL time.Duration heartbeatLock sync.Mutex - // doDisco triggers Consul discovery; see triggerDiscovery - doDisco chan struct{} + // triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery + triggerDiscoveryCh chan struct{} // discovered will be ticked whenever Consul discovery completes // succesfully - discovered chan struct{} + serversDiscoveredCh chan struct{} // allocs is the current set of allocations allocs map[string]*AllocRunner @@ -167,20 +167,20 @@ var ( func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Client, error) { // Create the client c := &Client{ - config: cfg, - consulSyncer: consulSyncer, - consulReaperTicker: time.NewTicker(consulReaperIntv), - start: time.Now(), - connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), - servers: newServerList(), - doDisco: make(chan struct{}), - discovered: make(chan struct{}), - logger: logger, - hostStatsCollector: stats.NewHostStatsCollector(), - allocs: make(map[string]*AllocRunner), - blockedAllocations: make(map[string]*structs.Allocation), - allocUpdates: make(chan *structs.Allocation, 64), - shutdownCh: make(chan struct{}), + config: cfg, + consulSyncer: consulSyncer, + consulReaperTicker: time.NewTicker(consulReaperIntv), + start: time.Now(), + connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), + servers: newServerList(), + triggerDiscoveryCh: make(chan struct{}), + serversDiscoveredCh: make(chan struct{}), + logger: logger, + hostStatsCollector: stats.NewHostStatsCollector(), + allocs: make(map[string]*AllocRunner), + blockedAllocations: make(map[string]*structs.Allocation), + allocUpdates: make(chan *structs.Allocation, 64), + shutdownCh: make(chan struct{}), } // Initialize the client @@ -217,7 +217,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg c.configLock.RLock() if len(c.configCopy.Servers) > 0 { if err := c.SetServers(c.configCopy.Servers); err != nil { - logger.Printf("[WARN] None of the configured servers are valid: %v", err) + logger.Printf("[WARN] client: None of the configured servers are valid: %v", err) } } c.configLock.RUnlock() @@ -227,7 +227,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg go c.consulDiscovery() if len(c.servers.all()) == 0 { // No configured servers; trigger discovery manually - <-c.doDisco + <-c.triggerDiscoveryCh } } @@ -831,7 +831,7 @@ func (c *Client) registerAndHeartbeat() { for { select { - case <-c.discovered: + case <-c.serversDiscoveredCh: case <-heartbeat: case <-c.shutdownCh: return @@ -937,7 +937,7 @@ func (c *Client) retryRegisterNode() { c.logger.Printf("[ERR] client: registration failure: %v", err) } select { - case <-c.discovered: + case <-c.serversDiscoveredCh: case <-time.After(c.retryIntv(registerRetryIntv)): case <-c.shutdownCh: return @@ -1165,7 +1165,7 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { } retry := c.retryIntv(getAllocRetryIntv) select { - case <-c.discovered: + case <-c.serversDiscoveredCh: continue case <-time.After(retry): continue @@ -1213,7 +1213,7 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err) retry := c.retryIntv(getAllocRetryIntv) select { - case <-c.discovered: + case <-c.serversDiscoveredCh: continue case <-time.After(retry): continue @@ -1486,7 +1486,7 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli // triggerDiscovery causes a Consul discovery to begin (if one hasn't alread) func (c *Client) triggerDiscovery() { select { - case <-c.doDisco: + case <-c.triggerDiscoveryCh: // Discovery goroutine was released to execute default: // Discovery goroutine was already running @@ -1496,7 +1496,7 @@ func (c *Client) triggerDiscovery() { func (c *Client) consulDiscovery() { for { select { - case c.doDisco <- struct{}{}: + case c.triggerDiscoveryCh <- struct{}{}: if err := c.doConsulDisco(); err != nil { c.logger.Printf("[ERR] client.consul: error discovering nomad servers: %v", err) } @@ -1605,7 +1605,7 @@ DISCOLOOP: timeout := time.NewTimer(dur) for { select { - case c.discovered <- struct{}{}: + case c.serversDiscoveredCh <- struct{}{}: if !timeout.Stop() { <-timeout.C } From a4a61e81d50ff319ccb4c6c738395fbe0e0bd8a7 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 26 Sep 2016 15:40:26 -0700 Subject: [PATCH 07/14] Return csv of servers from Stats, not just count --- client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index f45c9f69b9b..645ccc69dc7 100644 --- a/client/client.go +++ b/client/client.go @@ -403,7 +403,7 @@ func (c *Client) Stats() map[string]map[string]string { stats := map[string]map[string]string{ "client": map[string]string{ "node_id": c.Node().ID, - "known_servers": strconv.Itoa(len(c.servers.all())), + "known_servers": c.servers.all().String(), "num_allocations": strconv.Itoa(numAllocs), "last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)), "heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL), From 93fbc12401532fb01e30023d994a2c98ab1f02bd Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 26 Sep 2016 15:52:40 -0700 Subject: [PATCH 08/14] Flip disco chan; clarify method names/comments --- client/client.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/client/client.go b/client/client.go index 645ccc69dc7..c806b2c25fb 100644 --- a/client/client.go +++ b/client/client.go @@ -227,7 +227,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg go c.consulDiscovery() if len(c.servers.all()) == 0 { // No configured servers; trigger discovery manually - <-c.triggerDiscoveryCh + c.triggerDiscoveryCh <- struct{}{} } } @@ -1486,18 +1486,21 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli // triggerDiscovery causes a Consul discovery to begin (if one hasn't alread) func (c *Client) triggerDiscovery() { select { - case <-c.triggerDiscoveryCh: + case c.triggerDiscoveryCh <- struct{}{}: // Discovery goroutine was released to execute default: // Discovery goroutine was already running } } +// consulDiscovery waits for the signal to attempt server discovery via Consul. +// It's intended to be started in a goroutine. See triggerDiscovery() for +// causing consul discovery from other code locations. func (c *Client) consulDiscovery() { for { select { - case c.triggerDiscoveryCh <- struct{}{}: - if err := c.doConsulDisco(); err != nil { + case <-c.triggerDiscoveryCh: + if err := c.consulDiscoveryImpl(); err != nil { c.logger.Printf("[ERR] client.consul: error discovering nomad servers: %v", err) } case <-c.shutdownCh: @@ -1506,7 +1509,7 @@ func (c *Client) consulDiscovery() { } } -func (c *Client) doConsulDisco() error { +func (c *Client) consulDiscoveryImpl() error { // Acquire heartbeat lock to prevent heartbeat from running // concurrently with discovery. Concurrent execution is safe, however // discovery is usually triggered when heartbeating has failed so From d6b149678e060347653758c28dfa489ec91baed7 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 26 Sep 2016 16:05:21 -0700 Subject: [PATCH 09/14] Drop clumsy timeout on discovery notifications It's better to just let goroutines fallback to their longer retry intervals then try to be clever here. --- client/client.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/client/client.go b/client/client.go index c806b2c25fb..9b3bb53c794 100644 --- a/client/client.go +++ b/client/client.go @@ -1601,19 +1601,13 @@ DISCOLOOP: c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", servers) c.servers.set(servers) - // Notify waiting rpc calls. Wait briefly in case initial rpc - // just failed but the calling goroutine isn't selecting on - // discovered yet. - const dur = 50 * time.Millisecond - timeout := time.NewTimer(dur) + // Notify waiting rpc calls. If a goroutine just failed an RPC call and + // isn't receiving on this chan yet they'll still retry eventually. + // This is a shortcircuit for the longer retry intervals. for { select { case c.serversDiscoveredCh <- struct{}{}: - if !timeout.Stop() { - <-timeout.C - } - timeout.Reset(dur) - case <-timeout.C: + default: return nil } } From ac90d1de763beb1115d0a16da419fd52eb003bbc Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 26 Sep 2016 16:13:47 -0700 Subject: [PATCH 10/14] No need to put reaper ticker on the struct --- client/client.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/client/client.go b/client/client.go index 9b3bb53c794..a3b9b019f1b 100644 --- a/client/client.go +++ b/client/client.go @@ -140,9 +140,6 @@ type Client struct { // consulSyncer advertises this Nomad Agent with Consul consulSyncer *consul.Syncer - // consulReaperTicker ticks when the reaper should run - consulReaperTicker *time.Ticker - // HostStatsCollector collects host resource usage stats hostStatsCollector *stats.HostStatsCollector resourceUsage *stats.HostStats @@ -169,7 +166,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg c := &Client{ config: cfg, consulSyncer: consulSyncer, - consulReaperTicker: time.NewTicker(consulReaperIntv), start: time.Now(), connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), servers: newServerList(), @@ -1613,12 +1609,15 @@ DISCOLOOP: } } +// consulReaper periodically reaps unmatched domains from Consul. Intended to +// be called in its own goroutine. See consulReaperIntv for interval. func (c *Client) consulReaper() { - defer c.consulReaperTicker.Stop() + ticker := time.NewTicker(consulReaperIntv) + defer ticker.Stop() for { select { - case <-c.consulReaperTicker.C: - if err := c.doConsulReap(); err != nil { + case <-ticker.C: + if err := c.consulReaperImpl(); err != nil { c.logger.Printf("[ERR] consul.client: error reaping services in consul: %v", err) } case <-c.shutdownCh: @@ -1627,7 +1626,8 @@ func (c *Client) consulReaper() { } } -func (c *Client) doConsulReap() error { +// consulReaperImpl reaps unmatched domains from Consul. +func (c *Client) consulReaperImpl() error { const estInitialExecutorDomains = 8 // Create the domains to keep and add the server and client From ec81fc9baf776efb30bd752f64688ad562d915c6 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 26 Sep 2016 16:51:53 -0700 Subject: [PATCH 11/14] Fix lies found in comments by fact checkers --- client/client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index a3b9b019f1b..e66e119da84 100644 --- a/client/client.go +++ b/client/client.go @@ -208,8 +208,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg c.configCopy = c.config.Copy() c.configLock.Unlock() - // Create the RPC Proxy and bootstrap with the preconfigured list of - // static servers + // Set the preconfigured list of static servers c.configLock.RLock() if len(c.configCopy.Servers) > 0 { if err := c.SetServers(c.configCopy.Servers); err != nil { @@ -383,7 +382,6 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { return nil } - // Force Consul discovery ASAP since we have no healthy servers return mErr.ErrorOrNil() } From ac29e9cffe2fb0e01caefeac6943cba5920b5012 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 27 Sep 2016 16:04:01 -0700 Subject: [PATCH 12/14] Remove unused const --- client/client.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/client/client.go b/client/client.go index e66e119da84..58072b4bf6f 100644 --- a/client/client.go +++ b/client/client.go @@ -61,11 +61,6 @@ const ( // stateSnapshotIntv is how often the client snapshots state stateSnapshotIntv = 60 * time.Second - // registerErrGrace is the grace period where we don't log about - // register errors after start. This is to improve the user experience - // in dev mode where the leader isn't elected for a few seconds. - registerErrGrace = 10 * time.Second - // initialHeartbeatStagger is used to stagger the interval between // starting and the intial heartbeat. After the intial heartbeat, // we switch to using the TTL specified by the servers. From 64aa187e9e33c2e40516d605258178d59670d1cb Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 27 Sep 2016 16:07:41 -0700 Subject: [PATCH 13/14] Add #1735 related fixes/improvements to changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b27e126ab3c..aa67685d7d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ IMPROVEMENTS: * core: Introduce node SecretID which can be used to minimize the available surface area of RPCs to malicious Nomad Clients [GH-1597] * cli: `nomad alloc-status` shows allocation creation time [GH-1623] + * client: Retry all servers on RPC call failure [GH-1735] * client: Enforce shared allocation directory disk usage [GH-1580] * client: Introduce a `secrets/` directory to tasks where sensitive data can be written [GH-1681] @@ -17,6 +18,8 @@ BUG FIXES: * client/fingerprint: Fix inconsistent CPU MHz fingerprinting [GH-1366] * discovery: Fix old services not getting removed from consul on update [GH-1668] + * discovery: Fix client flapping when server was in a different datacenter + [GH-1641] * discovery: Fix HTTP timeout with Server HTTP health check when there is no leader [GH-1656] From 91936f4cf3042776e622d74d9a18dd36136c7567 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 27 Sep 2016 16:32:32 -0700 Subject: [PATCH 14/14] Cleanup changelog entries --- CHANGELOG.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa67685d7d6..f639e387b2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ IMPROVEMENTS: * core: Introduce node SecretID which can be used to minimize the available surface area of RPCs to malicious Nomad Clients [GH-1597] * cli: `nomad alloc-status` shows allocation creation time [GH-1623] - * client: Retry all servers on RPC call failure [GH-1735] + * client: Failed RPCs are retried on all servers [GH-1735] * client: Enforce shared allocation directory disk usage [GH-1580] * client: Introduce a `secrets/` directory to tasks where sensitive data can be written [GH-1681] @@ -18,10 +18,10 @@ BUG FIXES: * client/fingerprint: Fix inconsistent CPU MHz fingerprinting [GH-1366] * discovery: Fix old services not getting removed from consul on update [GH-1668] - * discovery: Fix client flapping when server was in a different datacenter - [GH-1641] * discovery: Fix HTTP timeout with Server HTTP health check when there is no leader [GH-1656] + * discovery: Fix client flapping when server is in a different datacenter as + the client [GH-1641] ## 0.4.1