Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client RPCs are sticky to server #3738

Merged
merged 11 commits into from
Jan 26, 2018
97 changes: 59 additions & 38 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/rpc"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
Expand Down Expand Up @@ -111,8 +113,8 @@ type Client struct {

connPool *pool.ConnPool

// servers is the (optionally prioritized) list of nomad servers
servers *serverlist
// servers is the list of nomad servers
servers *servers.Manager

// heartbeat related times for tracking how often to heartbeat
lastHeartbeat time.Time
Expand Down Expand Up @@ -198,11 +200,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
servers: newServerList(),
triggerDiscoveryCh: make(chan struct{}),
serversDiscoveredCh: make(chan struct{}),
}

// Initialize the server manager
c.servers = servers.New(c.logger, c.shutdownCh, c)

// Initialize the client
if err := c.init(); err != nil {
return nil, fmt.Errorf("failed to initialize client: %v", err)
Expand Down Expand Up @@ -268,7 +272,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
// Setup Consul discovery if enabled
if c.configCopy.ConsulConfig.ClientAutoJoin != nil && *c.configCopy.ConsulConfig.ClientAutoJoin {
go c.consulDiscovery()
if len(c.servers.all()) == 0 {
if c.servers.NumServers() == 0 {
// No configured servers; trigger discovery manually
c.triggerDiscoveryCh <- struct{}{}
}
Expand Down Expand Up @@ -463,7 +467,7 @@ func (c *Client) Stats() map[string]map[string]string {
stats := map[string]map[string]string{
"client": {
"node_id": c.NodeID(),
"known_servers": c.servers.all().String(),
"known_servers": strings.Join(c.GetServers(), ","),
"num_allocations": strconv.Itoa(c.NumAllocs()),
"last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)),
"heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL),
Expand Down Expand Up @@ -548,32 +552,49 @@ func (c *Client) GetClientAlloc(allocID string) (*structs.Allocation, error) {

// GetServers returns the list of nomad servers this client is aware of.
func (c *Client) GetServers() []string {
endpoints := c.servers.all()
endpoints := c.servers.GetServers()
res := make([]string, len(endpoints))
for i := range endpoints {
res[i] = endpoints[i].addr.String()
res[i] = endpoints[i].String()
}
sort.Strings(res)
return res
}

// SetServers sets a new list of nomad servers to connect to. As long as one
// server is resolvable no error is returned.
func (c *Client) SetServers(servers []string) error {
endpoints := make([]*endpoint, 0, len(servers))
func (c *Client) SetServers(in []string) error {
var mu sync.Mutex
var wg sync.WaitGroup
var merr multierror.Error
for _, s := range servers {
addr, err := resolveServer(s)
if err != nil {
c.logger.Printf("[DEBUG] client: ignoring server %s due to resolution error: %v", s, err)
merr.Errors = append(merr.Errors, err)
continue
}

// Valid endpoint, append it without a priority as this API
// doesn't support different priorities for different servers
endpoints = append(endpoints, &endpoint{name: s, addr: addr})
endpoints := make([]*servers.Server, 0, len(in))
wg.Add(len(in))

for _, s := range in {
go func(srv string) {
defer wg.Done()
addr, err := resolveServer(srv)
if err != nil {
c.logger.Printf("[DEBUG] client: ignoring server %s due to resolution error: %v", srv, err)
merr.Errors = append(merr.Errors, err)
return
}

// Try to ping to check if it is a real server
if err := c.Ping(addr); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("Server at address %s failed ping: %v", addr, err))
return
}

mu.Lock()
endpoints = append(endpoints, &servers.Server{Addr: addr})
mu.Unlock()
}(s)
}

wg.Wait()

// Only return errors if no servers are valid
if len(endpoints) == 0 {
if len(merr.Errors) > 0 {
Expand All @@ -582,7 +603,7 @@ func (c *Client) SetServers(servers []string) error {
return noServersErr
}

c.servers.set(endpoints)
c.servers.SetServers(endpoints)
return nil
}

Expand Down Expand Up @@ -1228,26 +1249,25 @@ func (c *Client) updateNodeStatus() error {
}
}

// Convert []*NodeServerInfo to []*endpoints
localdc := c.Datacenter()
servers := make(endpoints, 0, len(resp.Servers))
// Update the number of nodes in the cluster so we can adjust our server
// rebalance rate.
c.servers.SetNumNodes(resp.NumNodes)

// Convert []*NodeServerInfo to []*servers.Server
nomadServers := make([]*servers.Server, 0, len(resp.Servers))
for _, s := range resp.Servers {
addr, err := resolveServer(s.RPCAdvertiseAddr)
if err != nil {
c.logger.Printf("[WARN] client: ignoring invalid server %q: %v", s.RPCAdvertiseAddr, err)
continue
}
e := endpoint{name: s.RPCAdvertiseAddr, addr: addr}
if s.Datacenter != localdc {
// server is non-local; de-prioritize
e.priority = 1
}
servers = append(servers, &e)
e := &servers.Server{DC: s.Datacenter, Addr: addr}
nomadServers = append(nomadServers, e)
}
if len(servers) == 0 {
return fmt.Errorf("server returned no valid servers")
if len(nomadServers) == 0 {
return fmt.Errorf("heartbeat response returned no valid servers")
}
c.servers.set(servers)
c.servers.SetServers(nomadServers)

// Begin polling Consul if there is no Nomad leader. We could be
// heartbeating to a Nomad server that is in the minority of a
Expand Down Expand Up @@ -1840,7 +1860,7 @@ func (c *Client) consulDiscoveryImpl() error {

serviceName := c.configCopy.ConsulConfig.ServerServiceName
var mErr multierror.Error
var servers endpoints
var nomadServers servers.Servers
c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs)
DISCOLOOP:
for _, dc := range dcs {
Expand Down Expand Up @@ -1880,22 +1900,23 @@ DISCOLOOP:
if err != nil {
mErr.Errors = append(mErr.Errors, err)
}
servers = append(servers, &endpoint{name: p, addr: addr})
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}
if len(servers) > 0 {
if len(nomadServers) > 0 {
break DISCOLOOP
}
}
}
if len(servers) == 0 {
if len(nomadServers) == 0 {
if len(mErr.Errors) > 0 {
return mErr.ErrorOrNil()
}
return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %+q", serviceName, dcs)
}

c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", servers)
c.servers.set(servers)
c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", nomadServers)
c.servers.SetServers(nomadServers)

// Notify waiting rpc calls. If a goroutine just failed an RPC call and
// isn't receiving on this chan yet they'll still retry eventually.
Expand Down
24 changes: 24 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,3 +904,27 @@ func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) {

assert.Equal(c.ValidateMigrateToken("", ""), true)
}

// TestClient_ServerList tests client methods that interact with the internal
// nomad server list.
func TestClient_ServerList(t *testing.T) {
t.Parallel()
client := TestClient(t, func(c *config.Config) {})

if s := client.GetServers(); len(s) != 0 {
t.Fatalf("expected server lit to be empty but found: %+q", s)
}
if err := client.SetServers(nil); err != noServersErr {
t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err)
}
if err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
t.Fatalf("expected setting a bad server to return an error")
}
if err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
t.Fatalf("expected setting at least one good server to succeed but received: %v", err)
}
s := client.GetServers()
if len(s) != 0 {
t.Fatalf("expected 2 servers but received: %+q", s)
}
}
8 changes: 8 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ type Config struct {
// BackwardsCompatibleMetrics determines whether to show methods of
// displaying metrics for older verions, or to only show the new format
BackwardsCompatibleMetrics bool

// RPCHoldTimeout is how long an RPC can be "held" before it is errored.
// This is used to paper over a loss of leadership by instead holding RPCs,
// so that the caller experiences a slow response rather than an error.
// This period is meant to be long enough for a leader election to take
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration
}

func (c *Config) Copy() *Config {
Expand Down Expand Up @@ -228,6 +235,7 @@ func DefaultConfig() *Config {
NoHostUUID: true,
DisableTaggedMetrics: false,
BackwardsCompatibleMetrics: false,
RPCHoldTimeout: 5 * time.Second,
}
}

Expand Down
75 changes: 59 additions & 16 deletions client/rpc.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package client

import (
"fmt"
"io"
"net"
"net/rpc"
"strings"
"time"

metrics "github.com/armon/go-metrics"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
)

Expand Down Expand Up @@ -39,26 +40,60 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
return c.config.RPCHandler.RPC(method, args, reply)
}

servers := c.servers.all()
if len(servers) == 0 {
// This is subtle but we start measuring the time on the client side
// right at the time of the first request, vs. on the first retry as
// is done on the server side inside forward(). This is because the
// servers may already be applying the RPCHoldTimeout up there, so by
// starting the timer here we won't potentially double up the delay.
firstCheck := time.Now()

TRY:
server := c.servers.FindServer()
if server == nil {
return noServersErr
}

var mErr multierror.Error
for _, s := range servers {
// Make the RPC request
if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil {
errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err)
mErr.Errors = append(mErr.Errors, errmsg)
c.logger.Printf("[DEBUG] client: %v", errmsg)
c.servers.failed(s)
continue
}
c.servers.good(s)
// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)
if rpcErr == nil {
return nil
}

return mErr.ErrorOrNil()
// Move off to another server, and see if we can retry.
c.logger.Printf("[ERR] nomad: %q RPC failed to server %s: %v", method, server.Addr, rpcErr)
c.servers.NotifyFailedServer(server)
if retry := canRetry(args, rpcErr); !retry {
return rpcErr
}

// We can wait a bit and retry!
if time.Since(firstCheck) < c.config.RPCHoldTimeout {
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
select {
case <-time.After(jitter):
goto TRY
case <-c.shutdownCh:
}
}
return rpcErr
}

// canRetry returns true if the given situation is safe for a retry.
func canRetry(args interface{}, err error) bool {
// No leader errors are always safe to retry since no state could have
// been changed.
if structs.IsErrNoLeader(err) {
return true
}

// Reads are safe to retry for stream errors, such as if a server was
// being shut down.
info, ok := args.(structs.RPCInfo)
if ok && info.IsRead() && lib.IsErrEOF(err) {
return true
}

return false
}

// setupClientRpc is used to setup the Client's RPC endpoints
Expand Down Expand Up @@ -159,3 +194,11 @@ func resolveServer(s string) (net.Addr, error) {
}
return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port))
}

// Ping is used to ping a particular server and returns whether it is healthy or
// a potential error.
func (c *Client) Ping(srv net.Addr) error {
var reply struct{}
err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply)
return err
}
Loading