diff --git a/client/client.go b/client/client.go index 58b1e8e3426..ef938e947ff 100644 --- a/client/client.go +++ b/client/client.go @@ -1,6 +1,7 @@ package client import ( + "errors" "fmt" "io/ioutil" "log" @@ -10,7 +11,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/armon/go-metrics" @@ -21,7 +21,6 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/fingerprint" - "github.com/hashicorp/nomad/client/rpcproxy" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/command/agent/consul" @@ -105,26 +104,23 @@ type Client struct { logger *log.Logger - rpcProxy *rpcproxy.RPCProxy - connPool *nomad.ConnPool - // lastHeartbeatFromQuorum is an atomic int32 acting as a bool. When - // true, the last heartbeat message had a leader. When false (0), - // the last heartbeat did not include the RPC address of the leader, - // indicating that the server is in the minority or middle of an - // election. - lastHeartbeatFromQuorum int32 - - // consulPullHeartbeatDeadline is the deadline at which this Nomad - // Agent will begin polling Consul for a list of Nomad Servers. When - // Nomad Clients are heartbeating successfully with Nomad Servers, - // Nomad Clients do not poll Consul to populate their backup server - // list. - consulPullHeartbeatDeadline time.Time - lastHeartbeat time.Time - heartbeatTTL time.Duration - heartbeatLock sync.Mutex + // servers is the (optionally prioritized) list of nomad servers + servers *serverlist + + // consulDiscoverNext is the deadline at which this Nomad Agent will + // poll Consul for a list of Nomad Servers. When Nomad Clients are + // heartbeating successfully with Nomad Servers, Nomad Clients do not + // poll Consul to populate their server list. + consulDiscoverNext time.Time + lastHeartbeat time.Time + heartbeatTTL time.Duration + heartbeatLock sync.Mutex + + // discovered will be ticked whenever consul discovery completes + // succesfully + discovered chan struct{} // allocs is the current set of allocations allocs map[string]*AllocRunner @@ -154,6 +150,8 @@ type Client struct { vaultClient vaultclient.VaultClient } +var noServers = errors.New("no servers") + // NewClient is used to create a new client from the given configuration func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Client, error) { // Create the client @@ -162,6 +160,8 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg consulSyncer: consulSyncer, start: time.Now(), connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), + servers: newServerList(), + discovered: make(chan struct{}), logger: logger, hostStatsCollector: stats.NewHostStatsCollector(), allocs: make(map[string]*AllocRunner), @@ -202,9 +202,10 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg // Create the RPC Proxy and bootstrap with the preconfigured list of // static servers c.configLock.RLock() - c.rpcProxy = rpcproxy.NewRPCProxy(c.logger, c.shutdownCh, c, c.connPool) - for _, serverAddr := range c.configCopy.Servers { - c.rpcProxy.AddPrimaryServer(serverAddr) + if len(c.configCopy.Servers) > 0 { + if err := c.SetServers(c.configCopy.Servers); err != nil { + logger.Printf("[WARN] None of the configured servers are valid: %v", err) + } } c.configLock.RUnlock() @@ -238,16 +239,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg // Start collecting stats go c.collectHostStats() - // Start the RPCProxy maintenance task. This task periodically - // shuffles the list of Nomad Server Endpoints this Client will use - // when communicating with Nomad Servers via RPC. This is done in - // order to prevent server fixation in stable Nomad clusters. This - // task actively populates the active list of Nomad Server Endpoints - // from information from the Nomad Client heartbeats. If a heartbeat - // times out and there are no Nomad servers available, this data is - // populated by periodically polling Consul, if available. - go c.rpcProxy.Run() - return c, nil } @@ -350,33 +341,39 @@ func (c *Client) Shutdown() error { return c.saveState() } -// RPC is used to forward an RPC call to a nomad server, or fail if no servers +// RPC is used to forward an RPC call to a nomad server, or fail if no servers. func (c *Client) RPC(method string, args interface{}, reply interface{}) error { // Invoke the RPCHandler if it exists if c.config.RPCHandler != nil { return c.config.RPCHandler.RPC(method, args, reply) } - // Pick a server to request from - server := c.rpcProxy.FindServer() - if server == nil { - return fmt.Errorf("no known servers") + servers := c.servers.all() + if len(servers) == 0 { + return noServers } - // Make the RPC request - if err := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply); err != nil { - c.rpcProxy.NotifyFailedServer(server) - return fmt.Errorf("RPC failed to server %s: %v", server.Addr, err) + var mErr multierror.Error + for _, s := range servers { + // Make the RPC request + if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil { + errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err) + mErr.Errors = append(mErr.Errors, errmsg) + c.logger.Printf("[DEBUG] client: %v", errmsg) + c.servers.failed(s) + continue + } + c.servers.good(s) + return nil } - return nil + + // Force consul discovery ASAP since we have no healthy servers + return mErr.ErrorOrNil() } // Stats is used to return statistics for debugging and insight // for various sub-systems func (c *Client) Stats() map[string]map[string]string { - toString := func(v uint64) string { - return strconv.FormatUint(v, 10) - } c.allocLock.RLock() numAllocs := len(c.allocs) c.allocLock.RUnlock() @@ -386,8 +383,8 @@ func (c *Client) Stats() map[string]map[string]string { stats := map[string]map[string]string{ "client": map[string]string{ "node_id": c.Node().ID, - "known_servers": toString(uint64(c.rpcProxy.NumServers())), - "num_allocations": toString(uint64(numAllocs)), + "known_servers": strconv.Itoa(len(c.servers.all())), + "num_allocations": strconv.Itoa(numAllocs), "last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)), "heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL), }, @@ -438,10 +435,43 @@ func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) { return ar.ctx.AllocDir, nil } -// AddPrimaryServerToRPCProxy adds serverAddr to the RPC Proxy's primary -// server list. -func (c *Client) AddPrimaryServerToRPCProxy(serverAddr string) *rpcproxy.ServerEndpoint { - return c.rpcProxy.AddPrimaryServer(serverAddr) +// GetServers returns the list of nomad servers this client is aware of. +func (c *Client) GetServers() []string { + endpoints := c.servers.all() + res := make([]string, len(endpoints)) + for i := range endpoints { + res[i] = endpoints[i].name + } + return res +} + +// SetServers sets a new list of nomad servers to connect to. As long as one +// server is resolvable no error is returned. +func (c *Client) SetServers(servers []string) error { + endpoints := make([]*endpoint, 0, len(servers)) + var merr multierror.Error + for _, s := range servers { + addr, err := resolveServer(s) + if err != nil { + merr.Errors = append(merr.Errors, err) + continue + } + + // Valid endpoint, append it without a priority as this API + // doesn't support different priorities for different servers + endpoints = append(endpoints, &endpoint{name: s, addr: addr}) + } + + // Only return errors if no servers are valid + if len(endpoints) == 0 { + if len(merr.Errors) > 0 { + return merr.ErrorOrNil() + } + return noServers + } + + c.servers.set(endpoints) + return nil } // restoreState is used to restore our state from the data dir @@ -868,14 +898,18 @@ func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, u // retryRegisterNode is used to register the node or update the registration and // retry in case of failure. func (c *Client) retryRegisterNode() { - // Register the client for { - if err := c.registerNode(); err == nil { - break + err := c.registerNode() + if err == nil { + return + } + if err == noServers { + c.logger.Print("[DEBUG] client: registration waiting on servers") } else { - c.logger.Printf("[ERR] client: %v", err) + c.logger.Printf("[ERR] client: registration failure: %v", err) } select { + case <-c.discovered: case <-time.After(c.retryIntv(registerRetryIntv)): case <-c.shutdownCh: return @@ -917,6 +951,13 @@ func (c *Client) registerNode() error { // updateNodeStatus is used to heartbeat and update the status of the node func (c *Client) updateNodeStatus() error { + c.heartbeatLock.Lock() + defer c.heartbeatLock.Unlock() + + // If anything goes wrong we want consul discovery to happen ASAP. The + // heartbeat lock keeps it from running concurrent with node updating. + c.consulDiscoverNext = time.Time{} + node := c.Node() req := structs.NodeUpdateStatusRequest{ NodeID: node.ID, @@ -934,14 +975,29 @@ func (c *Client) updateNodeStatus() error { c.logger.Printf("[DEBUG] client: state updated to %s", req.Status) } - c.heartbeatLock.Lock() - defer c.heartbeatLock.Unlock() + // Update heartbeat time and ttl c.lastHeartbeat = time.Now() c.heartbeatTTL = resp.HeartbeatTTL - if err := c.rpcProxy.RefreshServerLists(resp.Servers, resp.NumNodes, resp.LeaderRPCAddr); err != nil { - return err + // Convert []*NodeServerInfo to []*endpoints + localdc := c.Datacenter() + servers := make(endpoints, 0, len(resp.Servers)) + for _, s := range resp.Servers { + addr, err := resolveServer(s.RPCAdvertiseAddr) + if err != nil { + continue + } + e := endpoint{name: s.RPCAdvertiseAddr, addr: addr} + if s.Datacenter != localdc { + // server is non-local; de-prioritize + e.priority = 1 + } + servers = append(servers, &e) } + if len(servers) == 0 { + return fmt.Errorf("server returned no valid servers") + } + c.servers.set(servers) // Begin polling Consul if there is no Nomad leader. We could be // heartbeating to a Nomad server that is in the minority of a @@ -949,13 +1005,11 @@ func (c *Client) updateNodeStatus() error { // has connectivity to the existing majority of Nomad Servers, but // only if it queries Consul. if resp.LeaderRPCAddr == "" { - atomic.CompareAndSwapInt32(&c.lastHeartbeatFromQuorum, 1, 0) return nil } const heartbeatFallbackFactor = 3 - atomic.CompareAndSwapInt32(&c.lastHeartbeatFromQuorum, 0, 1) - c.consulPullHeartbeatDeadline = time.Now().Add(heartbeatFallbackFactor * resp.HeartbeatTTL) + c.consulDiscoverNext = time.Now().Add(heartbeatFallbackFactor * resp.HeartbeatTTL) return nil } @@ -1079,9 +1133,13 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { resp = structs.NodeClientAllocsResponse{} err := c.RPC("Node.GetClientAllocs", &req, &resp) if err != nil { - c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err) + if err != noServers { + c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err) + } retry := c.retryIntv(getAllocRetryIntv) select { + case <-c.discovered: + continue case <-time.After(retry): continue case <-c.shutdownCh: @@ -1402,23 +1460,15 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli // TODO(sean@): this could eventually be moved to a priority queue and give // each task an interval, but that is not necessary at this time. func (c *Client) setupConsulSyncer() error { - // The bootstrapFn callback handler is used to periodically poll - // Consul to look up the Nomad Servers in Consul. In the event the - // heartbeat deadline has been exceeded and this Client is orphaned - // from its servers, periodically poll Consul to reattach this Client - // to its cluster and automatically recover from a detached state. - bootstrapFn := func() error { - now := time.Now() + disco := func() error { c.heartbeatLock.Lock() + defer c.heartbeatLock.Unlock() - // If the last heartbeat didn't contain a leader, give the - // Nomad server this Agent is talking to one more attempt at - // providing a heartbeat that does contain a leader. - if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) { - c.heartbeatLock.Unlock() + // Don't run before the deadline. When bootstrapping is done + // and heartbeats are working this is the common path. + if time.Now().Before(c.consulDiscoverNext) { return nil } - c.heartbeatLock.Unlock() consulCatalog := c.consulSyncer.ConsulClient().Catalog() dcs, err := consulCatalog.Datacenters() @@ -1437,18 +1487,19 @@ func (c *Client) setupConsulSyncer() error { dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)] } - // Forward RPCs to our region - nomadRPCArgs := structs.GenericRequest{ + // Query for servers in this client's region only + region := c.Region() + rpcargs := structs.GenericRequest{ QueryOptions: structs.QueryOptions{ - Region: c.Region(), + Region: region, }, } - nomadServerServiceName := c.config.ConsulConfig.ServerServiceName + serviceName := c.configCopy.ConsulConfig.ServerServiceName var mErr multierror.Error - const defaultMaxNumNomadServers = 8 - nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) + var servers endpoints c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs) + DISCOLOOP: for _, dc := range dcs { consulOpts := &consulapi.QueryOptions{ AllowStale: true, @@ -1456,79 +1507,76 @@ func (c *Client) setupConsulSyncer() error { Near: "_agent", WaitTime: consul.DefaultQueryWaitDuration, } - consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagRPC, consulOpts) + consulServices, _, err := consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts) if err != nil { - mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", nomadServerServiceName, dc, err)) + mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err)) continue } for _, s := range consulServices { - port := strconv.FormatInt(int64(s.ServicePort), 10) - addr := s.ServiceAddress - if addr == "" { - addr = s.Address + port := strconv.Itoa(s.ServicePort) + addrstr := s.ServiceAddress + if addrstr == "" { + addrstr = s.Address } - serverAddr := net.JoinHostPort(addr, port) - serverEndpoint, err := rpcproxy.NewServerEndpoint(serverAddr) + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port)) if err != nil { mErr.Errors = append(mErr.Errors, err) continue } var peers []string - if err := c.connPool.RPC(c.Region(), serverEndpoint.Addr, c.RPCMajorVersion(), "Status.Peers", nomadRPCArgs, &peers); err != nil { + if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil { mErr.Errors = append(mErr.Errors, err) continue } + // Successfully received the Server peers list of the correct // region - if len(peers) != 0 { - nomadServerServices = append(nomadServerServices, peers...) - break + for _, p := range peers { + addr, err := net.ResolveTCPAddr("tcp", p) + if err != nil { + mErr.Errors = append(mErr.Errors, err) + } + servers = append(servers, &endpoint{name: p, addr: addr}) + } + if len(servers) > 0 { + break DISCOLOOP } - } - // Break if at least one Nomad Server was successfully pinged - if len(nomadServerServices) > 0 { - break } } - if len(nomadServerServices) == 0 { + if len(servers) == 0 { if len(mErr.Errors) > 0 { return mErr.ErrorOrNil() } - - return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %q", nomadServerServiceName, dcs) + return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %+q", serviceName, dcs) } - // Log the servers we are adding - c.logger.Printf("[INFO] client.consul: bootstrap adding following Servers: %q", nomadServerServices) + c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", servers) + c.servers.set(servers) - c.heartbeatLock.Lock() - if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) { - c.heartbeatLock.Unlock() - // Common, healthy path - if err := c.rpcProxy.SetBackupServers(nomadServerServices); err != nil { - return fmt.Errorf("client.consul: unable to set backup servers: %v", err) - } - } else { - c.heartbeatLock.Unlock() - // If this Client is talking with a Server that - // doesn't have a leader, and we have exceeded the - // consulPullHeartbeatDeadline, change the call from - // SetBackupServers() to calling AddPrimaryServer() - // in order to allow the Clients to randomly begin - // considering all known Nomad servers and - // eventually, hopefully, find their way to a Nomad - // Server that has quorum (assuming Consul has a - // server list that is in the majority). - for _, s := range nomadServerServices { - c.rpcProxy.AddPrimaryServer(s) + // Make sure registration is given ample opportunity to run + // before trying consul discovery again. + c.consulDiscoverNext = time.Now().Add(2 * registerRetryIntv) + + // Notify waiting rpc calls. Wait briefly in case initial rpc + // just failed but the calling goroutine isn't selecting on + // discovered yet. + const dur = 50 * time.Millisecond + timeout := time.NewTimer(dur) + for { + select { + case c.discovered <- struct{}{}: + if !timeout.Stop() { + <-timeout.C + } + timeout.Reset(dur) + case <-timeout.C: + return nil } } - - return nil } - if c.config.ConsulConfig.ClientAutoJoin { - c.consulSyncer.AddPeriodicHandler("Nomad Client Fallback Server Handler", bootstrapFn) + if c.configCopy.ConsulConfig.ClientAutoJoin { + c.consulSyncer.AddPeriodicHandler("Nomad Client Consul Server Discovery", disco) } consulServicesReaperFn := func() error { @@ -1617,7 +1665,17 @@ func (c *Client) emitStats(hStats *stats.HostStats) { } } -// RPCProxy returns the Client's RPCProxy instance -func (c *Client) RPCProxy() *rpcproxy.RPCProxy { - return c.rpcProxy +// resolveServer given a sever's address as a string, return it's resolved +// net.Addr or an error. +func resolveServer(s string) (net.Addr, error) { + const defaultClientPort = "4647" // default client RPC port + host, port, err := net.SplitHostPort(s) + if err != nil { + if strings.Contains(err.Error(), "missing port") { + port = defaultClientPort + } else { + return nil, err + } + } + return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port)) } diff --git a/client/rpcproxy/rpcproxy.go b/client/rpcproxy/rpcproxy.go deleted file mode 100644 index 0e8c7604127..00000000000 --- a/client/rpcproxy/rpcproxy.go +++ /dev/null @@ -1,779 +0,0 @@ -// Package rpcproxy provides a proxy interface to Nomad Servers. The -// RPCProxy periodically shuffles which server a Nomad Client communicates -// with in order to redistribute load across Nomad Servers. Nomad Servers -// that fail an RPC request are automatically cycled to the end of the list -// until the server list is reshuffled. -// -// The rpcproxy package does not provide any external API guarantees and -// should be called only by `hashicorp/nomad`. -package rpcproxy - -import ( - "fmt" - "log" - "math/rand" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/hashicorp/consul/lib" - "github.com/hashicorp/nomad/nomad/structs" -) - -const ( - // clientRPCJitterFraction determines the amount of jitter added to - // clientRPCMinReuseDuration before a connection is expired and a new - // connection is established in order to rebalance load across Nomad - // servers. The cluster-wide number of connections per second from - // rebalancing is applied after this jitter to ensure the CPU impact - // is always finite. See newRebalanceConnsPerSecPerServer's comment - // for additional commentary. - // - // For example, in a 10K Nomad cluster with 5x servers, this default - // averages out to ~13 new connections from rebalancing per server - // per second. - clientRPCJitterFraction = 2 - - // clientRPCMinReuseDuration controls the minimum amount of time RPC - // queries are sent over an established connection to a single server - clientRPCMinReuseDuration = 600 * time.Second - - // Limit the number of new connections a server receives per second - // for connection rebalancing. This limit caps the load caused by - // continual rebalancing efforts when a cluster is in equilibrium. A - // lower value comes at the cost of increased recovery time after a - // partition. This parameter begins to take effect when there are - // more than ~48K clients querying 5x servers or at lower server - // counts when there is a partition. - // - // For example, in a 100K Nomad cluster with 5x servers, it will take - // ~5min for all servers to rebalance their connections. If 99,995 - // agents are in the minority talking to only one server, it will - // take ~26min for all servers to rebalance. A 10K cluster in the - // same scenario will take ~2.6min to rebalance. - newRebalanceConnsPerSecPerServer = 64 - - // rpcAPIMismatchLogRate determines the rate at which log entries are - // emitted when the client and server's API versions are mismatched. - rpcAPIMismatchLogRate = 3 * time.Hour -) - -// NomadConfigInfo is an interface wrapper around this Nomad Agent's -// configuration to prevents a cyclic import dependency. -type NomadConfigInfo interface { - Datacenter() string - RPCMajorVersion() int - RPCMinorVersion() int - Region() string -} - -// Pinger is an interface wrapping client.ConnPool to prevent a -// cyclic import dependency -type Pinger interface { - PingNomadServer(region string, apiMajorVersion int, s *ServerEndpoint) (bool, error) -} - -// serverList is an array of Nomad Servers. The first server in the list is -// the active server. -// -// NOTE(sean@): We are explicitly relying on the fact that serverList will be -// copied onto the stack by atomic.Value. Please keep this structure light. -type serverList struct { - L []*ServerEndpoint -} - -// RPCProxy is the manager type responsible for returning and managing Nomad -// addresses. -type RPCProxy struct { - // activatedList manages the list of Nomad Servers that are eligible - // to be queried by the Client agent. - activatedList atomic.Value - activatedListLock sync.Mutex - - // primaryServers is a list of servers found in the last heartbeat. - // primaryServers are periodically reshuffled. Covered by - // serverListLock. - primaryServers serverList - - // backupServers is a list of fallback servers. These servers are - // appended to the RPCProxy's serverList, but are never shuffled with - // the list of servers discovered via the Nomad heartbeat. Covered - // by serverListLock. - backupServers serverList - - // serverListLock covers both backupServers and primaryServers. If - // it is necessary to hold serverListLock and listLock, obtain an - // exclusive lock on serverListLock before listLock. - serverListLock sync.RWMutex - - leaderAddr string - numNodes int - - // rebalanceTimer controls the duration of the rebalance interval - rebalanceTimer *time.Timer - - // shutdownCh is a copy of the channel in nomad.Client - shutdownCh chan struct{} - - logger *log.Logger - - configInfo NomadConfigInfo - - // rpcAPIMismatchThrottle regulates the rate at which warning - // messages are emitted in the event of an API mismatch between the - // clients and servers. - rpcAPIMismatchThrottle map[string]time.Time - - // connPoolPinger is used to test the health of a server in the - // connection pool. Pinger is an interface that wraps - // client.ConnPool. - connPoolPinger Pinger -} - -// NewRPCProxy is the only way to safely create a new RPCProxy. -func NewRPCProxy(logger *log.Logger, shutdownCh chan struct{}, configInfo NomadConfigInfo, connPoolPinger Pinger) *RPCProxy { - p := &RPCProxy{ - logger: logger, - configInfo: configInfo, // can't pass *nomad.Client: import cycle - connPoolPinger: connPoolPinger, // can't pass *nomad.ConnPool: import cycle - rebalanceTimer: time.NewTimer(clientRPCMinReuseDuration), - shutdownCh: shutdownCh, - } - - l := serverList{} - l.L = make([]*ServerEndpoint, 0) - p.saveServerList(l) - return p -} - -// activateEndpoint adds an endpoint to the RPCProxy's active serverList. -// Returns true if the server was added, returns false if the server already -// existed in the RPCProxy's serverList. -func (p *RPCProxy) activateEndpoint(s *ServerEndpoint) bool { - l := p.getServerList() - - // Check if this server is known - found := false - for idx, existing := range l.L { - if existing.Name == s.Name { - newServers := make([]*ServerEndpoint, len(l.L)) - copy(newServers, l.L) - - // Overwrite the existing server details in order to - // possibly update metadata (e.g. server version) - newServers[idx] = s - - l.L = newServers - found = true - break - } - } - - // Add to the list if not known - if !found { - newServers := make([]*ServerEndpoint, len(l.L), len(l.L)+1) - copy(newServers, l.L) - newServers = append(newServers, s) - l.L = newServers - } - - p.saveServerList(l) - - return !found -} - -// SetBackupServers sets a list of Nomad Servers to be used in the event that -// the Nomad Agent lost contact with the list of Nomad Servers provided via -// the Nomad Agent's heartbeat. If available, the backup servers are -// populated via Consul. -func (p *RPCProxy) SetBackupServers(addrs []string) error { - l := make([]*ServerEndpoint, 0, len(addrs)) - for _, s := range addrs { - s, err := NewServerEndpoint(s) - if err != nil { - p.logger.Printf("[WARN] client.rpcproxy: unable to create backup server %+q: %v", s, err) - return fmt.Errorf("unable to create new backup server from %+q: %v", s, err) - } - l = append(l, s) - } - - p.serverListLock.Lock() - p.backupServers.L = l - p.serverListLock.Unlock() - - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - for _, s := range l { - p.activateEndpoint(s) - } - - return nil -} - -// AddPrimaryServer takes the RPC address of a Nomad server, creates a new -// endpoint, and adds it to both the primaryServers list and the active -// serverList used in the RPC Proxy. If the endpoint is not known by the -// RPCProxy, appends the endpoint to the list. The new endpoint will begin -// seeing use after the rebalance timer fires (or enough servers fail -// organically). Any values in the primary server list are overridden by the -// next successful heartbeat. -func (p *RPCProxy) AddPrimaryServer(rpcAddr string) *ServerEndpoint { - s, err := NewServerEndpoint(rpcAddr) - if err != nil { - p.logger.Printf("[WARN] client.rpcproxy: unable to create new primary server from endpoint %+q: %v", rpcAddr, err) - return nil - } - - k := s.Key() - p.serverListLock.Lock() - if serverExists := p.primaryServers.serverExistByKey(k); serverExists { - p.serverListLock.Unlock() - return s - } - p.primaryServers.L = append(p.primaryServers.L, s) - p.serverListLock.Unlock() - - p.activatedListLock.Lock() - p.activateEndpoint(s) - p.activatedListLock.Unlock() - - return s -} - -// cycleServers returns a new list of servers that has dequeued the first -// server and enqueued it at the end of the list. cycleServers assumes the -// caller is holding the listLock. cycleServer does not test or ping -// the next server inline. cycleServer may be called when the environment -// has just entered an unhealthy situation and blocking on a server test is -// less desirable than just returning the next server in the firing line. If -// the next server fails, it will fail fast enough and cycleServer will be -// called again. -func (l *serverList) cycleServer() (servers []*ServerEndpoint) { - numServers := len(l.L) - if numServers < 2 { - return servers // No action required - } - - newServers := make([]*ServerEndpoint, 0, numServers) - newServers = append(newServers, l.L[1:]...) - newServers = append(newServers, l.L[0]) - - return newServers -} - -// serverExistByKey performs a search to see if a server exists in the -// serverList. Assumes the caller is holding at least a read lock. -func (l *serverList) serverExistByKey(targetKey *EndpointKey) bool { - var found bool - for _, server := range l.L { - if targetKey.Equal(server.Key()) { - found = true - } - } - return found -} - -// removeServerByKey performs an inline removal of the first matching server -func (l *serverList) removeServerByKey(targetKey *EndpointKey) { - for i, s := range l.L { - if targetKey.Equal(s.Key()) { - copy(l.L[i:], l.L[i+1:]) - l.L[len(l.L)-1] = nil - l.L = l.L[:len(l.L)-1] - return - } - } -} - -// shuffleServers shuffles the server list in place -func (l *serverList) shuffleServers() { - for i := len(l.L) - 1; i > 0; i-- { - j := rand.Int31n(int32(i + 1)) - l.L[i], l.L[j] = l.L[j], l.L[i] - } -} - -// String returns a string representation of serverList -func (l *serverList) String() string { - if len(l.L) == 0 { - return fmt.Sprintf("empty server list") - } - - serverStrs := make([]string, 0, len(l.L)) - for _, server := range l.L { - serverStrs = append(serverStrs, server.String()) - } - - return fmt.Sprintf("[%s]", strings.Join(serverStrs, ", ")) -} - -// FindServer takes out an internal "read lock" and searches through the list -// of servers to find a "healthy" server. If the server is actually -// unhealthy, we rely on heartbeats to detect this and remove the node from -// the server list. If the server at the front of the list has failed or -// fails during an RPC call, it is rotated to the end of the list. If there -// are no servers available, return nil. -func (p *RPCProxy) FindServer() *ServerEndpoint { - l := p.getServerList() - numServers := len(l.L) - if numServers == 0 { - p.logger.Printf("[WARN] client.rpcproxy: No servers available") - return nil - } - - // Return whatever is at the front of the list because it is - // assumed to be the oldest in the server list (unless - - // hypothetically - the server list was rotated right after a - // server was added). - return l.L[0] -} - -// getServerList is a convenience method which hides the locking semantics -// of atomic.Value from the caller. -func (p *RPCProxy) getServerList() serverList { - return p.activatedList.Load().(serverList) -} - -// saveServerList is a convenience method which hides the locking semantics -// of atomic.Value from the caller. -func (p *RPCProxy) saveServerList(l serverList) { - p.activatedList.Store(l) -} - -// LeaderAddr returns the current leader address. If an empty string, then -// the Nomad Server for this Nomad Agent is in the minority or the Nomad -// Servers are in the middle of an election. -func (p *RPCProxy) LeaderAddr() string { - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - return p.leaderAddr -} - -// NotifyFailedServer marks the passed in server as "failed" by rotating it -// to the end of the server list. -func (p *RPCProxy) NotifyFailedServer(s *ServerEndpoint) { - l := p.getServerList() - - // If the server being failed is not the first server on the list, - // this is a noop. If, however, the server is failed and first on - // the list, acquire the lock, retest, and take the penalty of moving - // the server to the end of the list. - - // Only rotate the server list when there is more than one server - if len(l.L) > 1 && l.L[0] == s { - // Grab a lock, retest, and take the hit of cycling the first - // server to the end. - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - l = p.getServerList() - - if len(l.L) > 1 && l.L[0] == s { - l.L = l.cycleServer() - p.saveServerList(l) - } - } -} - -// NumNodes returns the estimated number of nodes according to the last Nomad -// Heartbeat. -func (p *RPCProxy) NumNodes() int { - return p.numNodes -} - -// NumServers takes out an internal "read lock" and returns the number of -// servers. numServers includes both healthy and unhealthy servers. -func (p *RPCProxy) NumServers() int { - l := p.getServerList() - return len(l.L) -} - -// RebalanceServers shuffles the list of servers on this agent. The server -// at the front of the list is selected for the next RPC. RPC calls that -// fail for a particular server are rotated to the end of the list. This -// method reshuffles the list periodically in order to redistribute work -// across all known Nomad servers (i.e. guarantee that the order of servers -// in the server list is not positively correlated with the age of a server -// in the Nomad cluster). Periodically shuffling the server list prevents -// long-lived clients from fixating on long-lived servers. -// -// Unhealthy servers are removed from the server list during the next client -// heartbeat. Before the newly shuffled server list is saved, the new remote -// endpoint is tested to ensure its responsive. -func (p *RPCProxy) RebalanceServers() { - var serverListLocked bool - p.serverListLock.Lock() - serverListLocked = true - defer func() { - if serverListLocked { - p.serverListLock.Unlock() - } - }() - - // Early abort if there is nothing to shuffle - if (len(p.primaryServers.L) + len(p.backupServers.L)) < 2 { - return - } - - // Shuffle server lists independently - p.primaryServers.shuffleServers() - p.backupServers.shuffleServers() - - // Create a new merged serverList - type targetServer struct { - server *ServerEndpoint - // 'p' == Primary Server - // 's' == Secondary/Backup Server - // 'b' == Both - state byte - } - mergedList := make(map[EndpointKey]*targetServer, len(p.primaryServers.L)+len(p.backupServers.L)) - for _, s := range p.primaryServers.L { - mergedList[*s.Key()] = &targetServer{server: s, state: 'p'} - } - for _, s := range p.backupServers.L { - k := s.Key() - _, found := mergedList[*k] - if found { - mergedList[*k].state = 'b' - } else { - mergedList[*k] = &targetServer{server: s, state: 's'} - } - } - - l := &serverList{L: make([]*ServerEndpoint, 0, len(mergedList))} - for _, s := range p.primaryServers.L { - l.L = append(l.L, s) - } - for _, v := range mergedList { - if v.state != 's' { - continue - } - l.L = append(l.L, v.server) - } - - // Release the lock before we begin transition to operations on the - // network timescale and attempt to ping servers. A copy of the - // servers has been made at this point. - p.serverListLock.Unlock() - serverListLocked = false - - // Iterate through the shuffled server list to find an assumed - // healthy server. NOTE: Do not iterate on the list directly because - // this loop mutates the server list in-place. - var foundHealthyServer bool - for i := 0; i < len(l.L); i++ { - // Always test the first server. Failed servers are cycled - // and eventually removed from the list when Nomad heartbeats - // detect the failed node. - selectedServer := l.L[0] - - ok, err := p.connPoolPinger.PingNomadServer(p.configInfo.Region(), p.configInfo.RPCMajorVersion(), selectedServer) - if ok { - foundHealthyServer = true - break - } - p.logger.Printf(`[DEBUG] client.rpcproxy: pinging server "%s" failed: %s`, selectedServer.String(), err) - - l.cycleServer() - } - - // If no healthy servers were found, sleep and wait for the admin to - // join this node to a server and begin receiving heartbeats with an - // updated list of Nomad servers. Or Consul will begin advertising a - // new server in the nomad service (Nomad server service). - if !foundHealthyServer { - p.logger.Printf("[DEBUG] client.rpcproxy: No healthy servers during rebalance, aborting") - return - } - - // Verify that all servers are present. Reconcile will save the - // final serverList. - if p.reconcileServerList(l) { - p.logger.Printf("[TRACE] client.rpcproxy: Rebalanced %d servers, next active server is %s", len(l.L), l.L[0].String()) - } else { - // reconcileServerList failed because Nomad removed the - // server that was at the front of the list that had - // successfully been Ping'ed. Between the Ping and - // reconcile, a Nomad heartbeat removed the node. - // - // Instead of doing any heroics, "freeze in place" and - // continue to use the existing connection until the next - // rebalance occurs. - } - - return -} - -// reconcileServerList returns true when the first server in serverList -// (l) exists in the receiver's serverList (p). If true, the merged -// serverList (l) is stored as the receiver's serverList (p). Returns -// false if the first server in p does not exist in the passed in list (l) -// (i.e. was removed by Nomad during a PingNomadServer() call. Newly added -// servers are appended to the list and other missing servers are removed -// from the list. -func (p *RPCProxy) reconcileServerList(l *serverList) bool { - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - - // newServerList is a serverList that has been kept up-to-date with - // join and leave events. - newServerList := p.getServerList() - - // If a Nomad heartbeat removed all nodes, or there is no selected - // server (zero nodes in serverList), abort early. - if len(newServerList.L) == 0 || len(l.L) == 0 { - return false - } - - type targetServer struct { - server *ServerEndpoint - - // 'b' == both - // 'o' == original - // 'n' == new - state byte - } - mergedList := make(map[EndpointKey]*targetServer, len(l.L)) - for _, s := range l.L { - mergedList[*s.Key()] = &targetServer{server: s, state: 'o'} - } - for _, s := range newServerList.L { - k := s.Key() - _, found := mergedList[*k] - if found { - mergedList[*k].state = 'b' - } else { - mergedList[*k] = &targetServer{server: s, state: 'n'} - } - } - - // Ensure the selected server has not been removed by a heartbeat - selectedServerKey := l.L[0].Key() - if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' { - return false - } - - // Append any new servers and remove any old servers - for k, v := range mergedList { - switch v.state { - case 'b': - // Do nothing, server exists in both - case 'o': - // Server has been removed - l.removeServerByKey(&k) - case 'n': - // Server added - l.L = append(l.L, v.server) - default: - panic("unknown merge list state") - } - } - - p.saveServerList(*l) - return true -} - -// RemoveServer takes out an internal write lock and removes a server from -// the activated server list. -func (p *RPCProxy) RemoveServer(s *ServerEndpoint) { - // Lock hierarchy protocol dictates serverListLock is acquired first. - p.serverListLock.Lock() - defer p.serverListLock.Unlock() - - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - l := p.getServerList() - - k := s.Key() - l.removeServerByKey(k) - p.saveServerList(l) - - p.primaryServers.removeServerByKey(k) - p.backupServers.removeServerByKey(k) -} - -// refreshServerRebalanceTimer is only called once p.rebalanceTimer expires. -func (p *RPCProxy) refreshServerRebalanceTimer() time.Duration { - l := p.getServerList() - numServers := len(l.L) - // Limit this connection's life based on the size (and health) of the - // cluster. Never rebalance a connection more frequently than - // connReuseLowWatermarkDuration, and make sure we never exceed - // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. - clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer) - connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) - numLANMembers := p.numNodes - connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) - - p.rebalanceTimer.Reset(connRebalanceTimeout) - return connRebalanceTimeout -} - -// ResetRebalanceTimer resets the rebalance timer. This method exists for -// testing and should not be used directly. -func (p *RPCProxy) ResetRebalanceTimer() { - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - p.rebalanceTimer.Reset(clientRPCMinReuseDuration) -} - -// ServerRPCAddrs returns one RPC Address per server -func (p *RPCProxy) ServerRPCAddrs() []string { - l := p.getServerList() - serverAddrs := make([]string, 0, len(l.L)) - for _, s := range l.L { - serverAddrs = append(serverAddrs, s.Addr.String()) - } - return serverAddrs -} - -// Run is used to start and manage the task of automatically shuffling and -// rebalancing the list of Nomad servers. This maintenance only happens -// periodically based on the expiration of the timer. Failed servers are -// automatically cycled to the end of the list. New servers are appended to -// the list. The order of the server list must be shuffled periodically to -// distribute load across all known and available Nomad servers. -func (p *RPCProxy) Run() { - for { - select { - case <-p.rebalanceTimer.C: - p.RebalanceServers() - - p.refreshServerRebalanceTimer() - case <-p.shutdownCh: - p.logger.Printf("[INFO] client.rpcproxy: shutting down") - return - } - } -} - -// RefreshServerLists is called when the Client receives an update from a -// Nomad Server. The response from Nomad Client Heartbeats contain a list of -// Nomad Servers that the Nomad Client should use for RPC requests. -// RefreshServerLists does not rebalance its serverLists (that is handled -// elsewhere via a periodic timer). New Nomad Servers learned via the -// heartbeat are appended to the RPCProxy's activated serverList. Servers -// that are no longer present in the Heartbeat are removed immediately from -// all server lists. Nomad Servers speaking a newer major or minor API -// version are filtered from the serverList. -func (p *RPCProxy) RefreshServerLists(servers []*structs.NodeServerInfo, numNodes int32, leaderRPCAddr string) error { - // Merge all servers found in the response. Servers in the response - // with newer API versions are filtered from the list. If the list - // is missing an address found in the RPCProxy's server list, remove - // it from the RPCProxy. - - p.serverListLock.Lock() - defer p.serverListLock.Unlock() - - // Clear the backup server list when a heartbeat contains at least - // one server. - if len(servers) > 0 && len(p.backupServers.L) > 0 { - p.backupServers.L = make([]*ServerEndpoint, 0, len(servers)) - } - - // 1) Create a map to reconcile the difference between - // p.primaryServers and servers. - type targetServer struct { - server *ServerEndpoint - - // 'b' == both - // 'o' == original - // 'n' == new - state byte - } - mergedPrimaryMap := make(map[EndpointKey]*targetServer, len(p.primaryServers.L)+len(servers)) - numOldServers := 0 - for _, s := range p.primaryServers.L { - mergedPrimaryMap[*s.Key()] = &targetServer{server: s, state: 'o'} - numOldServers++ - } - numBothServers := 0 - var newServers bool - for _, s := range servers { - // Filter out servers using a newer API version. Prevent - // spamming the logs every heartbeat. - // - // TODO(sean@): Move the logging throttle logic into a - // dedicated logging package so RPCProxy does not have to - // perform this accounting. - if int32(p.configInfo.RPCMajorVersion()) < s.RPCMajorVersion || - (int32(p.configInfo.RPCMajorVersion()) == s.RPCMajorVersion && - int32(p.configInfo.RPCMinorVersion()) < s.RPCMinorVersion) { - now := time.Now() - t, ok := p.rpcAPIMismatchThrottle[s.RPCAdvertiseAddr] - if ok && t.After(now) { - continue - } - - p.logger.Printf("[WARN] client.rpcproxy: API mismatch between client version (v%d.%d) and server version (v%d.%d), ignoring server %+q", p.configInfo.RPCMajorVersion(), p.configInfo.RPCMinorVersion(), s.RPCMajorVersion, s.RPCMinorVersion, s.RPCAdvertiseAddr) - p.rpcAPIMismatchThrottle[s.RPCAdvertiseAddr] = now.Add(rpcAPIMismatchLogRate) - continue - } - - server, err := NewServerEndpoint(s.RPCAdvertiseAddr) - if err != nil { - p.logger.Printf("[WARN] client.rpcproxy: Unable to create a server from %+q: %v", s.RPCAdvertiseAddr, err) - continue - } - - // Nomad servers in different datacenters are automatically - // added to the backup server list. - if s.Datacenter != p.configInfo.Datacenter() { - p.backupServers.L = append(p.backupServers.L, server) - continue - } - - k := server.Key() - _, found := mergedPrimaryMap[*k] - if found { - mergedPrimaryMap[*k].state = 'b' - numBothServers++ - } else { - mergedPrimaryMap[*k] = &targetServer{server: server, state: 'n'} - newServers = true - } - } - - // Short-circuit acquiring listLock if nothing changed - if !newServers && numOldServers == numBothServers { - return nil - } - - p.activatedListLock.Lock() - defer p.activatedListLock.Unlock() - newServerCfg := p.getServerList() - for k, v := range mergedPrimaryMap { - switch v.state { - case 'b': - // Do nothing, server exists in both - case 'o': - // Server has been removed - - // TODO(sean@): Teach Nomad servers how to remove - // themselves from their heartbeat in order to - // gracefully drain their clients over the next - // cluster's max rebalanceTimer duration. Without - // this enhancement, if a server being shutdown and - // it is the first in serverList, the client will - // fail its next RPC connection. - p.primaryServers.removeServerByKey(&k) - newServerCfg.removeServerByKey(&k) - case 'n': - // Server added. Append it to both lists - // immediately. The server should only go into - // active use in the event of a failure or after a - // rebalance occurs. - p.primaryServers.L = append(p.primaryServers.L, v.server) - newServerCfg.L = append(newServerCfg.L, v.server) - default: - panic("unknown merge list state") - } - } - - p.numNodes = int(numNodes) - p.leaderAddr = leaderRPCAddr - p.saveServerList(newServerCfg) - - return nil -} diff --git a/client/rpcproxy/rpcproxy_test.go b/client/rpcproxy/rpcproxy_test.go deleted file mode 100644 index c6b7327e62a..00000000000 --- a/client/rpcproxy/rpcproxy_test.go +++ /dev/null @@ -1,818 +0,0 @@ -package rpcproxy - -import ( - "bytes" - "encoding/binary" - "fmt" - "log" - "math/rand" - "net" - "os" - "strings" - "sync/atomic" - "testing" - "time" -) - -const ( - ipv4len = 4 - nodeNameFmt = "s%03d" - defaultNomadPort = "4647" - - // Poached from RFC2544 and RFC3330 - testingNetworkCidr = "198.18.0.0/15" - testingNetworkUint32 = 3323068416 -) - -var ( - localLogger *log.Logger - localLogBuffer *bytes.Buffer - serverCount uint32 - validIp uint32 -) - -func init() { - localLogBuffer = new(bytes.Buffer) - localLogger = log.New(localLogBuffer, "", 0) -} - -func makeServerEndpointName() string { - serverNum := atomic.AddUint32(&serverCount, 1) - validIp := testingNetworkUint32 + serverNum - ipv4 := make(net.IP, ipv4len) - binary.BigEndian.PutUint32(ipv4, validIp) - return net.JoinHostPort(ipv4.String(), defaultNomadPort) -} - -func GetBufferedLogger() *log.Logger { - return localLogger -} - -type fauxConnPool struct { - // failPct between 0.0 and 1.0 == pct of time a Ping should fail - failPct float64 -} - -func (cp *fauxConnPool) PingNomadServer(region string, majorVersion int, s *ServerEndpoint) (bool, error) { - var success bool - successProb := rand.Float64() - if successProb > cp.failPct { - success = true - } - return success, nil -} - -type fauxSerf struct { - datacenter string - numNodes int - region string - rpcMinorVersion int - rpcMajorVersion int -} - -func (s *fauxSerf) NumNodes() int { - return s.numNodes -} - -func (s *fauxSerf) Region() string { - return s.region -} - -func (s *fauxSerf) Datacenter() string { - return s.datacenter -} - -func (s *fauxSerf) RPCMajorVersion() int { - return s.rpcMajorVersion -} - -func (s *fauxSerf) RPCMinorVersion() int { - return s.rpcMinorVersion -} - -func testRPCProxy() (p *RPCProxy) { - logger := GetBufferedLogger() - logger = log.New(os.Stderr, "", log.LstdFlags) - shutdownCh := make(chan struct{}) - p = NewRPCProxy(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}) - return p -} - -func testRPCProxyFailProb(failPct float64) (p *RPCProxy) { - logger := GetBufferedLogger() - logger = log.New(os.Stderr, "", log.LstdFlags) - shutdownCh := make(chan struct{}) - p = NewRPCProxy(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) - return p -} - -// func (p *RPCProxy) AddPrimaryServer(server *ServerEndpoint) { -func TestRPCProxy_AddPrimaryServer(t *testing.T) { - p := testRPCProxy() - var num int - num = p.NumServers() - if num != 0 { - t.Fatalf("Expected zero servers to start") - } - - s1Endpoint := makeServerEndpointName() - s1 := p.AddPrimaryServer(s1Endpoint) - num = p.NumServers() - if num != 1 { - t.Fatalf("Expected one server") - } - if s1 == nil { - t.Fatalf("bad") - } - if s1.Name != s1Endpoint { - t.Fatalf("bad") - } - - s1 = p.AddPrimaryServer(s1Endpoint) - num = p.NumServers() - if num != 1 { - t.Fatalf("Expected one server (still)") - } - if s1 == nil { - t.Fatalf("bad") - } - if s1.Name != s1Endpoint { - t.Fatalf("bad") - } - - s2Endpoint := makeServerEndpointName() - s2 := p.AddPrimaryServer(s2Endpoint) - num = p.NumServers() - if num != 2 { - t.Fatalf("Expected two servers") - } - if s2 == nil { - t.Fatalf("bad") - } - if s2.Name != s2Endpoint { - t.Fatalf("bad") - } -} - -// func (p *RPCProxy) FindServer() (server *ServerEndpoint) { -func TestRPCProxy_FindServer(t *testing.T) { - p := testRPCProxy() - - if p.FindServer() != nil { - t.Fatalf("Expected nil return") - } - - s1Endpoint := makeServerEndpointName() - p.AddPrimaryServer(s1Endpoint) - if p.NumServers() != 1 { - t.Fatalf("Expected one server") - } - - s1 := p.FindServer() - if s1 == nil { - t.Fatalf("Expected non-nil server") - } - if s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server") - } - - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server (still)") - } - - s2Endpoint := makeServerEndpointName() - p.AddPrimaryServer(s2Endpoint) - if p.NumServers() != 2 { - t.Fatalf("Expected two servers") - } - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server (still)") - } - - p.NotifyFailedServer(s1) - s2 := p.FindServer() - if s2 == nil || s2.Name != s2Endpoint { - t.Fatalf("Expected s2 server") - } - - p.NotifyFailedServer(s2) - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server") - } -} - -// func New(logger *log.Logger, shutdownCh chan struct{}) (p *RPCProxy) { -func TestRPCProxy_New(t *testing.T) { - logger := GetBufferedLogger() - logger = log.New(os.Stderr, "", log.LstdFlags) - shutdownCh := make(chan struct{}) - p := NewRPCProxy(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) - if p == nil { - t.Fatalf("RPCProxy nil") - } -} - -// func (p *RPCProxy) NotifyFailedServer(server *ServerEndpoint) { -func TestRPCProxy_NotifyFailedServer(t *testing.T) { - p := testRPCProxy() - - if p.NumServers() != 0 { - t.Fatalf("Expected zero servers to start") - } - - // Try notifying for a server that is not managed by RPCProxy - s1Endpoint := makeServerEndpointName() - s1 := p.AddPrimaryServer(s1Endpoint) - if s1 == nil { - t.Fatalf("bad") - } - if p.NumServers() != 1 { - t.Fatalf("bad") - } - p.RemoveServer(s1) - if p.NumServers() != 0 { - t.Fatalf("bad") - } - p.NotifyFailedServer(s1) - s1 = p.AddPrimaryServer(s1Endpoint) - - // Test again w/ a server not in the list - s2Endpoint := makeServerEndpointName() - s2 := p.AddPrimaryServer(s2Endpoint) - if s2 == nil { - t.Fatalf("bad") - } - if p.NumServers() != 2 { - t.Fatalf("bad") - } - p.RemoveServer(s2) - if p.NumServers() != 1 { - t.Fatalf("bad") - } - p.NotifyFailedServer(s2) - if p.NumServers() != 1 { - t.Fatalf("Expected one server") - } - - // Re-add s2 so there are two servers in the RPCProxy server list - s2 = p.AddPrimaryServer(s2Endpoint) - if p.NumServers() != 2 { - t.Fatalf("Expected two servers") - } - - // Find the first server, it should be s1 - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server") - } - - // Notify s2 as failed, s1 should still be first - p.NotifyFailedServer(s2) - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server (still)") - } - - // Fail s1, s2 should be first - p.NotifyFailedServer(s1) - s2 = p.FindServer() - if s2 == nil || s2.Name != s2Endpoint { - t.Fatalf("Expected s2 server") - } - - // Fail s2, s1 should be first - p.NotifyFailedServer(s2) - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server") - } -} - -// func (p *RPCProxy) NumServers() (numServers int) { -func TestRPCProxy_NumServers(t *testing.T) { - p := testRPCProxy() - const maxNumServers = 100 - serverList := make([]*ServerEndpoint, 0, maxNumServers) - - // Add some servers - for i := 0; i < maxNumServers; i++ { - num := p.NumServers() - if num != i { - t.Fatalf("%d: Expected %d servers", i, num) - } - serverName := makeServerEndpointName() - s := p.AddPrimaryServer(serverName) - if s == nil { - t.Fatalf("Expected server from %+q", serverName) - } - serverList = append(serverList, s) - - num = p.NumServers() - if num != i+1 { - t.Fatalf("%d: Expected %d servers", i, num+1) - } - } - - // Remove some servers - for i := maxNumServers; i > 0; i-- { - num := p.NumServers() - if num != i { - t.Fatalf("%d: Expected %d servers", i, num) - } - p.RemoveServer(serverList[i-1]) - num = p.NumServers() - if num != i-1 { - t.Fatalf("%d: Expected %d servers", i, num-1) - } - } -} - -// func (p *RPCProxy) RebalanceServers() { -func TestRPCProxy_RebalanceServers(t *testing.T) { - const failPct = 0.5 - p := testRPCProxyFailProb(failPct) - const maxServers = 100 - const numShuffleTests = 100 - const uniquePassRate = 0.5 - - // Make a huge list of nodes. - for i := 0; i < maxServers; i++ { - p.AddPrimaryServer(makeServerEndpointName()) - } - - // Keep track of how many unique shuffles we get. - uniques := make(map[string]struct{}, maxServers) - for i := 0; i < numShuffleTests; i++ { - p.RebalanceServers() - - var names []string - for j := 0; j < maxServers; j++ { - server := p.FindServer() - p.NotifyFailedServer(server) - names = append(names, server.Name) - } - key := strings.Join(names, "|") - uniques[key] = struct{}{} - } - - // We have to allow for the fact that there won't always be a unique - // shuffle each pass, so we just look for smell here without the test - // being flaky. - if len(uniques) < int(maxServers*uniquePassRate) { - t.Fatalf("unique shuffle ratio too low: %d/%d", len(uniques), maxServers) - } -} - -// func (p *RPCProxy) RemoveServer(server *ServerEndpoint) { -func TestRPCProxy_RemoveServer(t *testing.T) { - p := testRPCProxy() - if p.NumServers() != 0 { - t.Fatalf("Expected zero servers to start") - } - - // Test removing server before its added - s1Endpoint := makeServerEndpointName() - s1 := p.AddPrimaryServer(s1Endpoint) - if p.NumServers() != 1 { - t.Fatalf("bad") - } - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server: %+q", s1.Name) - } - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 server: %+q", s1.Name) - } - p.RemoveServer(s1) - if p.NumServers() != 0 { - t.Fatalf("bad") - } - // Remove it a second time now that it doesn't exist - p.RemoveServer(s1) - if p.NumServers() != 0 { - t.Fatalf("bad") - } - p.AddPrimaryServer(s1Endpoint) - if p.NumServers() != 1 { - t.Fatalf("bad") - } - - s2Endpoint := makeServerEndpointName() - s2 := p.AddPrimaryServer(s2Endpoint) - if p.NumServers() != 2 { - t.Fatalf("bad") - } - if s2 == nil || s2.Name != s2Endpoint { - t.Fatalf("Expected s2 server: %+q", s2.Name) - } - s1 = p.FindServer() - if s1 == nil || s1.Name != s1Endpoint { - t.Fatalf("Expected s1 to be the front of the list: %+q==%+q", s1.Name, s1Endpoint) - } - // Move s1 to the back of the server list - p.NotifyFailedServer(s1) - s2 = p.FindServer() - if s2 == nil || s2.Name != s2Endpoint { - t.Fatalf("Expected s2 server: %+q", s2Endpoint) - } - p.RemoveServer(s2) - if p.NumServers() != 1 { - t.Fatalf("bad") - } - p.RemoveServer(s2) - if p.NumServers() != 1 { - t.Fatalf("bad") - } - p.AddPrimaryServer(s2Endpoint) - - const maxServers = 19 - servers := make([]*ServerEndpoint, 0, maxServers) - servers = append(servers, s1) - servers = append(servers, s2) - // Already added two servers above - for i := maxServers; i > 2; i-- { - server := p.AddPrimaryServer(makeServerEndpointName()) - servers = append(servers, server) - } - if p.NumServers() != maxServers { - t.Fatalf("Expected %d servers, received %d", maxServers, p.NumServers()) - } - - p.RebalanceServers() - - if p.NumServers() != maxServers { - t.Fatalf("Expected %d servers, received %d", maxServers, p.NumServers()) - } - - findServer := func(server *ServerEndpoint) bool { - for i := p.NumServers(); i > 0; i-- { - s := p.FindServer() - if s == server { - return true - } - } - return false - } - - expectedNumServers := maxServers - removedServers := make([]*ServerEndpoint, 0, maxServers) - - // Remove servers from the front of the list - for i := 3; i > 0; i-- { - server := p.FindServer() - if server == nil { - t.Fatalf("FindServer returned nil") - } - p.RemoveServer(server) - expectedNumServers-- - if p.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, p.NumServers()) - } - if findServer(server) == true { - t.Fatalf("Did not expect to find server %s after removal from the front", server.Name) - } - removedServers = append(removedServers, server) - } - - // Remove server from the end of the list - for i := 3; i > 0; i-- { - server := p.FindServer() - p.NotifyFailedServer(server) - p.RemoveServer(server) - expectedNumServers-- - if p.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, p.NumServers()) - } - if findServer(server) == true { - t.Fatalf("Did not expect to find server %s", server.Name) - } - removedServers = append(removedServers, server) - } - - // Remove server from the middle of the list - for i := 3; i > 0; i-- { - server := p.FindServer() - p.NotifyFailedServer(server) - server2 := p.FindServer() - p.NotifyFailedServer(server2) // server2 now at end of the list - - p.RemoveServer(server) - expectedNumServers-- - if p.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, p.NumServers()) - } - if findServer(server) == true { - t.Fatalf("Did not expect to find server %s", server.Name) - } - removedServers = append(removedServers, server) - } - - if p.NumServers()+len(removedServers) != maxServers { - t.Fatalf("Expected %d+%d=%d servers", p.NumServers(), len(removedServers), maxServers) - } - - // Drain the remaining servers from the middle - for i := p.NumServers(); i > 0; i-- { - server := p.FindServer() - p.NotifyFailedServer(server) - server2 := p.FindServer() - p.NotifyFailedServer(server2) // server2 now at end of the list - p.RemoveServer(server) - removedServers = append(removedServers, server) - } - - if p.NumServers() != 0 { - t.Fatalf("Expected an empty server list") - } - if len(removedServers) != maxServers { - t.Fatalf("Expected all servers to be in removed server list") - } -} - -// func (p *RPCProxy) Start() { - -// func (l *serverList) cycleServer() (servers []*Server) { -func TestRPCProxyInternal_cycleServer(t *testing.T) { - p := testRPCProxy() - l := p.getServerList() - - server0 := &ServerEndpoint{Name: "server1"} - server1 := &ServerEndpoint{Name: "server2"} - server2 := &ServerEndpoint{Name: "server3"} - l.L = append(l.L, server0, server1, server2) - p.saveServerList(l) - - l = p.getServerList() - if len(l.L) != 3 { - t.Fatalf("server length incorrect: %d/3", len(l.L)) - } - if l.L[0] != server0 && - l.L[1] != server1 && - l.L[2] != server2 { - t.Fatalf("initial server ordering not correct") - } - - l.L = l.cycleServer() - if len(l.L) != 3 { - t.Fatalf("server length incorrect: %d/3", len(l.L)) - } - if l.L[0] != server1 && - l.L[1] != server2 && - l.L[2] != server0 { - t.Fatalf("server ordering after one cycle not correct") - } - - l.L = l.cycleServer() - if len(l.L) != 3 { - t.Fatalf("server length incorrect: %d/3", len(l.L)) - } - if l.L[0] != server2 && - l.L[1] != server0 && - l.L[2] != server1 { - t.Fatalf("server ordering after two cycles not correct") - } - - l.L = l.cycleServer() - if len(l.L) != 3 { - t.Fatalf("server length incorrect: %d/3", len(l.L)) - } - if l.L[0] != server0 && - l.L[1] != server1 && - l.L[2] != server2 { - t.Fatalf("server ordering after three cycles not correct") - } -} - -// func (p *RPCProxy) getServerList() serverList { -func TestRPCProxyInternal_getServerList(t *testing.T) { - p := testRPCProxy() - l := p.getServerList() - if l.L == nil { - t.Fatalf("serverList.servers nil") - } - - if len(l.L) != 0 { - t.Fatalf("serverList.servers length not zero") - } -} - -func TestRPCProxyInternal_New(t *testing.T) { - p := testRPCProxy() - if p == nil { - t.Fatalf("bad") - } - - if p.logger == nil { - t.Fatalf("bad") - } - - if p.shutdownCh == nil { - t.Fatalf("bad") - } -} - -// func (p *RPCProxy) reconcileServerList(l *serverList) bool { -func TestRPCProxyInternal_reconcileServerList(t *testing.T) { - tests := []int{0, 1, 2, 3, 4, 5, 10, 100} - for _, n := range tests { - ok, err := test_reconcileServerList(n) - if !ok { - t.Errorf("Expected %d to pass: %v", n, err) - } - } -} - -func test_reconcileServerList(maxServers int) (bool, error) { - // Build a server list, reconcile, verify the missing servers are - // missing, the added have been added, and the original server is - // present. - const failPct = 0.5 - p := testRPCProxyFailProb(failPct) - - var failedServers, healthyServers []*ServerEndpoint - for i := 0; i < maxServers; i++ { - nodeName := fmt.Sprintf("s%02d", i) - - node := &ServerEndpoint{Name: nodeName} - // Add 66% of servers to RPCProxy - if rand.Float64() > 0.33 { - p.activateEndpoint(node) - - // Of healthy servers, (ab)use connPoolPinger to - // failPct of the servers for the reconcile. This - // allows for the selected server to no longer be - // healthy for the reconcile below. - if ok, _ := p.connPoolPinger.PingNomadServer(p.configInfo.Region(), p.configInfo.RPCMajorVersion(), node); ok { - // Will still be present - healthyServers = append(healthyServers, node) - } else { - // Will be missing - failedServers = append(failedServers, node) - } - } else { - // Will be added from the call to reconcile - healthyServers = append(healthyServers, node) - } - } - - // Randomize RPCProxy's server list - p.RebalanceServers() - selectedServer := p.FindServer() - - var selectedServerFailed bool - for _, s := range failedServers { - if selectedServer.Key().Equal(s.Key()) { - selectedServerFailed = true - break - } - } - - // Update RPCProxy's server list to be "healthy" based on Serf. - // Reconcile this with origServers, which is shuffled and has a live - // connection, but possibly out of date. - origServers := p.getServerList() - p.saveServerList(serverList{L: healthyServers}) - - // This should always succeed with non-zero server lists - if !selectedServerFailed && !p.reconcileServerList(&origServers) && - len(p.getServerList().L) != 0 && - len(origServers.L) != 0 { - // If the random gods are unfavorable and we end up with zero - // length lists, expect things to fail and retry the test. - return false, fmt.Errorf("Expected reconcile to succeed: %v %d %d", - selectedServerFailed, - len(p.getServerList().L), - len(origServers.L)) - } - - // If we have zero-length server lists, test succeeded in degenerate - // case. - if len(p.getServerList().L) == 0 && - len(origServers.L) == 0 { - // Failed as expected w/ zero length list - return true, nil - } - - resultingServerMap := make(map[EndpointKey]bool) - for _, s := range p.getServerList().L { - resultingServerMap[*s.Key()] = true - } - - // Test to make sure no failed servers are in the RPCProxy's - // list. Error if there are any failedServers in l.servers - for _, s := range failedServers { - _, ok := resultingServerMap[*s.Key()] - if ok { - return false, fmt.Errorf("Found failed server %v in merged list %v", s, resultingServerMap) - } - } - - // Test to make sure all healthy servers are in the healthy list. - if len(healthyServers) != len(p.getServerList().L) { - return false, fmt.Errorf("Expected healthy map and servers to match: %d/%d", len(healthyServers), len(healthyServers)) - } - - // Test to make sure all healthy servers are in the resultingServerMap list. - for _, s := range healthyServers { - _, ok := resultingServerMap[*s.Key()] - if !ok { - return false, fmt.Errorf("Server %v missing from healthy map after merged lists", s) - } - } - return true, nil -} - -// func (l *serverList) refreshServerRebalanceTimer() { -func TestRPCProxyInternal_refreshServerRebalanceTimer(t *testing.T) { - type clusterSizes struct { - numNodes int - numServers int - minRebalance time.Duration - } - clusters := []clusterSizes{ - {0, 3, 10 * time.Minute}, - {1, 0, 10 * time.Minute}, // partitioned cluster - {1, 3, 10 * time.Minute}, - {2, 3, 10 * time.Minute}, - {100, 0, 10 * time.Minute}, // partitioned - {100, 1, 10 * time.Minute}, // partitioned - {100, 3, 10 * time.Minute}, - {1024, 1, 10 * time.Minute}, // partitioned - {1024, 3, 10 * time.Minute}, // partitioned - {1024, 5, 10 * time.Minute}, - {16384, 1, 10 * time.Minute}, // partitioned - {16384, 2, 10 * time.Minute}, // partitioned - {16384, 3, 10 * time.Minute}, // partitioned - {16384, 5, 10 * time.Minute}, - {65535, 0, 10 * time.Minute}, // partitioned - {65535, 1, 10 * time.Minute}, // partitioned - {65535, 2, 10 * time.Minute}, // partitioned - {65535, 3, 10 * time.Minute}, // partitioned - {65535, 5, 10 * time.Minute}, // partitioned - {65535, 7, 10 * time.Minute}, - {1000000, 1, 10 * time.Minute}, // partitioned - {1000000, 2, 10 * time.Minute}, // partitioned - {1000000, 3, 10 * time.Minute}, // partitioned - {1000000, 5, 10 * time.Minute}, // partitioned - {1000000, 11, 10 * time.Minute}, // partitioned - {1000000, 19, 10 * time.Minute}, - } - - logger := log.New(os.Stderr, "", log.LstdFlags) - shutdownCh := make(chan struct{}) - - for i, s := range clusters { - p := NewRPCProxy(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}) - for i := 0; i < s.numServers; i++ { - nodeName := fmt.Sprintf("s%02d", i) - p.activateEndpoint(&ServerEndpoint{Name: nodeName}) - } - - d := p.refreshServerRebalanceTimer() - if d < s.minRebalance { - t.Errorf("[%d] duration too short for cluster of size %d and %d servers (%s < %s)", i, s.numNodes, s.numServers, d, s.minRebalance) - } - } -} - -// func (p *RPCProxy) saveServerList(l serverList) { -func TestRPCProxyInternal_saveServerList(t *testing.T) { - p := testRPCProxy() - - // Initial condition - func() { - l := p.getServerList() - if len(l.L) != 0 { - t.Fatalf("RPCProxy.saveServerList failed to load init config") - } - - newServer := new(ServerEndpoint) - l.L = append(l.L, newServer) - p.saveServerList(l) - }() - - // Test that save works - func() { - l1 := p.getServerList() - t1NumServers := len(l1.L) - if t1NumServers != 1 { - t.Fatalf("RPCProxy.saveServerList failed to save mutated config") - } - }() - - // Verify mutation w/o a save doesn't alter the original - func() { - newServer := new(ServerEndpoint) - l := p.getServerList() - l.L = append(l.L, newServer) - - l_orig := p.getServerList() - origNumServers := len(l_orig.L) - if origNumServers >= len(l.L) { - t.Fatalf("RPCProxy.saveServerList unsaved config overwrote original") - } - }() -} diff --git a/client/rpcproxy/server_endpoint.go b/client/rpcproxy/server_endpoint.go deleted file mode 100644 index d9b1add5b6a..00000000000 --- a/client/rpcproxy/server_endpoint.go +++ /dev/null @@ -1,84 +0,0 @@ -package rpcproxy - -import ( - "fmt" - "net" - "strings" -) - -const ( - defaultNomadRPCPort = "4647" -) - -// EndpointKey is used in maps and for equality tests. A key is based on endpoints. -type EndpointKey struct { - name string -} - -// Equal compares two EndpointKey objects -func (k *EndpointKey) Equal(x *EndpointKey) bool { - return k.name == x.name -} - -// ServerEndpoint contains the address information for to connect to a Nomad -// server. -// -// TODO(sean@): Server is stubbed out so that in the future it can hold a -// reference to Node (and ultimately Node.ID). -type ServerEndpoint struct { - // Name is the unique lookup key for a Server instance - Name string - Host string - Port string - Addr net.Addr -} - -// Key returns the corresponding Key -func (s *ServerEndpoint) Key() *EndpointKey { - return &EndpointKey{ - name: s.Name, - } -} - -// NewServerEndpoint creates a new Server instance with a resolvable -// endpoint. `name` can be either an IP address or a DNS name. If `name` is -// a DNS name, it must be resolvable to an IP address (most inputs are IP -// addresses, not DNS names, but both work equally well when the name is -// resolvable). -func NewServerEndpoint(name string) (*ServerEndpoint, error) { - s := &ServerEndpoint{ - Name: name, - } - - var host, port string - var err error - host, port, err = net.SplitHostPort(name) - if err == nil { - s.Host = host - s.Port = port - } else { - if strings.Contains(err.Error(), "missing port") { - s.Host = name - s.Port = defaultNomadRPCPort - } else { - return nil, err - } - } - - if s.Addr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(s.Host, s.Port)); err != nil { - return nil, err - } - - return s, err -} - -// String returns a string representation of Server -func (s *ServerEndpoint) String() string { - var addrStr, networkStr string - if s.Addr != nil { - addrStr = s.Addr.String() - networkStr = s.Addr.Network() - } - - return fmt.Sprintf("%s (%s:%s)", s.Name, networkStr, addrStr) -} diff --git a/client/rpcproxy/server_endpoint_test.go b/client/rpcproxy/server_endpoint_test.go deleted file mode 100644 index f04494859d3..00000000000 --- a/client/rpcproxy/server_endpoint_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package rpcproxy - -import ( - "fmt" - "net" - "testing" -) - -// func (k *EndpointKey) Equal(x *EndpointKey) { -func TestServerEndpointKey_Equal(t *testing.T) { - tests := []struct { - name string - s1 *ServerEndpoint - s2 *ServerEndpoint - equal bool - }{ - { - name: "equal", - s1: &ServerEndpoint{Name: "k1"}, - s2: &ServerEndpoint{Name: "k1"}, - equal: true, - }, - { - name: "not equal", - s1: &ServerEndpoint{Name: "k1"}, - s2: &ServerEndpoint{Name: "k2"}, - equal: false, - }, - } - - for _, test := range tests { - if test.s1.Key().Equal(test.s2.Key()) != test.equal { - t.Errorf("fixture %s failed forward comparison", test.name) - } - - if test.s2.Key().Equal(test.s1.Key()) != test.equal { - t.Errorf("fixture %s failed reverse comparison", test.name) - } - } -} - -// func (k *ServerEndpoint) String() { -func TestServerEndpoint_String(t *testing.T) { - tests := []struct { - name string - s *ServerEndpoint - str string - }{ - { - name: "name", - s: &ServerEndpoint{Name: "s"}, - str: "s (:)", - }, - { - name: "name, host, port", - s: &ServerEndpoint{ - Name: "s", - Host: "127.0.0.1", - Port: "4647", - }, - str: "s (tcp:127.0.0.1:4647)", - }, - } - - for _, test := range tests { - if test.s.Addr == nil && (test.s.Host != "" && test.s.Port != "") { - fmt.Printf("Setting addr\n") - addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(test.s.Host, test.s.Port)) - if err == nil { - test.s.Addr = addr - } - } - if test.s.String() != test.str { - t.Errorf("fixture %q failed: %q vs %q", test.name, test.s.String(), test.str) - } - } -} diff --git a/client/serverlist.go b/client/serverlist.go new file mode 100644 index 00000000000..9b77085c658 --- /dev/null +++ b/client/serverlist.go @@ -0,0 +1,107 @@ +package client + +import ( + "math/rand" + "net" + "sort" + "strings" + "sync" +) + +type serverlist struct { + e endpoints + mu sync.RWMutex +} + +func newServerList() *serverlist { + return &serverlist{} +} + +// set the server list to a new list. The new list will be shuffled and sorted +// by priority. +func (s *serverlist) set(newe endpoints) { + s.mu.Lock() + s.e = newe + s.mu.Unlock() +} + +// all returns a copy of the full server list, shuffled and then sorted by +// priority +func (s *serverlist) all() endpoints { + s.mu.RLock() + out := make(endpoints, len(s.e)) + copy(out, s.e) + s.mu.RUnlock() + + // Randomize the order + for i, j := range rand.Perm(len(out)) { + out[i], out[j] = out[j], out[i] + } + + // Sort by priority + sort.Sort(out) + return out +} + +// failed servers get deprioritized +func (s *serverlist) failed(e *endpoint) { + s.mu.Lock() + defer s.mu.Unlock() + for i := 0; i < len(s.e); i++ { + if s.e[i].equal(e) { + e.priority++ + return + } + } +} + +// good servers get promoted to the highest priority +func (s *serverlist) good(e *endpoint) { + s.mu.Lock() + defer s.mu.Unlock() + for i := 0; i < len(s.e); i++ { + if s.e[i].equal(e) { + e.priority = 0 + return + } + } +} + +func (e endpoints) Len() int { + return len(e) +} + +func (e endpoints) Less(i int, j int) bool { + // Sort only by priority as endpoints should be shuffled and ordered + // only by priority + return e[i].priority < e[j].priority +} + +func (e endpoints) Swap(i int, j int) { + e[i], e[j] = e[j], e[i] +} + +type endpoints []*endpoint + +func (e endpoints) String() string { + names := make([]string, 0, len(e)) + for _, endpoint := range e { + names = append(names, endpoint.name) + } + return strings.Join(names, ",") +} + +type endpoint struct { + name string + addr net.Addr + + // 0 being the highest priority + priority int +} + +// equal returns true if the name and addr match between two endpoints. +// Priority is ignored because the same endpoint may be added by discovery and +// heartbeating with different priorities. +func (e *endpoint) equal(o *endpoint) bool { + return e.name == o.name && e.addr == o.addr +} diff --git a/client/serverlist_test.go b/client/serverlist_test.go new file mode 100644 index 00000000000..d3f9d1c5742 --- /dev/null +++ b/client/serverlist_test.go @@ -0,0 +1,80 @@ +package client + +import "testing" + +func TestServerList(t *testing.T) { + s := newServerList() + + // New lists should be empty + if e := s.get(); e != nil { + t.Fatalf("expected empty list to return nil, but received: %v", e) + } + if e := s.all(); len(e) != 0 { + t.Fatalf("expected empty list to return an empty list, but received: %+q", e) + } + + mklist := func() endpoints { + return endpoints{ + &endpoint{"b", nil, 1}, + &endpoint{"c", nil, 1}, + &endpoint{"g", nil, 2}, + &endpoint{"d", nil, 1}, + &endpoint{"e", nil, 1}, + &endpoint{"f", nil, 1}, + &endpoint{"h", nil, 2}, + &endpoint{"a", nil, 0}, + } + } + s.set(mklist()) + + orig := mklist() + all := s.all() + if len(all) != len(orig) { + t.Fatalf("expected %d endpoints but only have %d", len(orig), len(all)) + } + + // Assert list is properly randomized+sorted + for i, pri := range []int{0, 1, 1, 1, 1, 1, 2, 2} { + if all[i].priority != pri { + t.Errorf("expected endpoint %d (%+q) to be priority %d", i, all[i], pri) + } + } + + // Subsequent sets should reshuffle (try multiple times as they may + // shuffle in the same order) + tries := 0 + max := 3 + for ; tries < max; tries++ { + s.set(mklist()) + // First entry should always be the same + if e := s.get(); *e != *all[0] { + t.Fatalf("on try %d get returned the wrong endpoint: %+q", tries, e) + } + + all2 := s.all() + if all.String() == all2.String() { + // eek, matched; try again in case we just got unlucky + continue + } + break + } + if tries == max { + t.Fatalf("after %d attempts servers were still not random reshuffled", tries) + } + + // Mark should rotate list items in place + s.mark(&endpoint{"a", nil, 0}) + all3 := s.all() + if s.get().name == "a" || all3[len(all3)-1].name != "a" { + t.Fatalf("endpoint a shold have been rotated to end") + } + if len(all3) != len(all) { + t.Fatalf("marking should not have changed list length") + } + + // Marking a non-existant endpoint should do nothing + s.mark(&endpoint{}) + if s.all().String() != all3.String() { + t.Fatalf("marking a non-existant endpoint alterd the list") + } +} diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index aaf466373b0..5e9a0c8d8fc 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -139,7 +139,7 @@ func (s *HTTPServer) listServers(resp http.ResponseWriter, req *http.Request) (i return nil, CodedError(501, ErrInvalidMethod) } - peers := s.agent.client.RPCProxy().ServerRPCAddrs() + peers := s.agent.client.GetServers() return peers, nil } @@ -156,12 +156,11 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request) } // Set the servers list into the client - for _, server := range servers { - s.agent.logger.Printf("[TRACE] Adding server %s to the client's primary server list", server) - se := client.AddPrimaryServerToRPCProxy(server) - if se == nil { - s.agent.logger.Printf("[ERR] Attempt to add server %q to client failed", server) - } + s.agent.logger.Printf("[TRACE] Adding servers %+q to the client's primary server list", servers) + if err := client.SetServers(servers); err != nil { + s.agent.logger.Printf("[ERR] Attempt to add servers %q to client failed: %v", servers, err) + //TODO is this the right error to return? + return nil, CodedError(400, err.Error()) } return nil, nil } diff --git a/nomad/pool.go b/nomad/pool.go index 669e788989e..7ef7acf596e 100644 --- a/nomad/pool.go +++ b/nomad/pool.go @@ -12,7 +12,6 @@ import ( "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/hashicorp/nomad/client/rpcproxy" "github.com/hashicorp/yamux" ) @@ -376,9 +375,9 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string, // PingNomadServer sends a Status.Ping message to the specified server and // returns true if healthy, false if an error occurred -func (p *ConnPool) PingNomadServer(region string, apiMajorVersion int, s *rpcproxy.ServerEndpoint) (bool, error) { +func (p *ConnPool) PingNomadServer(region string, apiMajorVersion int, s net.Addr) (bool, error) { // Get a usable client - conn, sc, err := p.getClient(region, s.Addr, apiMajorVersion) + conn, sc, err := p.getClient(region, s, apiMajorVersion) if err != nil { return false, err }