Skip to content

Commit

Permalink
Merge pull request #4222 from hashicorp/b-proxy
Browse files Browse the repository at this point in the history
Track all client connections
  • Loading branch information
dadgar authored Apr 26, 2018
2 parents a4f32ea + 16272ad commit 821a223
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 11 deletions.
49 changes: 40 additions & 9 deletions nomad/client_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,16 @@ type nodeConnState struct {
func (s *Server) getNodeConn(nodeID string) (*nodeConnState, bool) {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
state, ok := s.nodeConns[nodeID]
conns, ok := s.nodeConns[nodeID]

// Return the latest conn
var state *nodeConnState
for _, conn := range conns {
if state == nil || state.Established.Before(conn.Established) {
state = conn
}
}

return state, ok
}

Expand All @@ -39,8 +48,12 @@ func (s *Server) connectedNodes() map[string]time.Time {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
nodes := make(map[string]time.Time, len(s.nodeConns))
for nodeID, state := range s.nodeConns {
nodes[nodeID] = state.Established
for nodeID, conns := range s.nodeConns {
for _, conn := range conns {
if nodes[nodeID].Before(conn.Established) {
nodes[nodeID] = conn.Established
}
}
}
return nodes
}
Expand All @@ -54,11 +67,26 @@ func (s *Server) addNodeConn(ctx *RPCContext) {

s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
s.nodeConns[ctx.NodeID] = &nodeConnState{

// Capture the tracked connections so far
currentConns := s.nodeConns[ctx.NodeID]

// Check if we already have the connection. If we do, just update the
// establish time.
for _, c := range currentConns {
if c.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() &&
c.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() {
c.Established = time.Now()
return
}
}

// Add the new conn
s.nodeConns[ctx.NodeID] = append(s.nodeConns[ctx.NodeID], &nodeConnState{
Session: ctx.Session,
Established: time.Now(),
Ctx: ctx,
}
})
}

// removeNodeConn removes the mapping between a node and its session.
Expand All @@ -70,7 +98,7 @@ func (s *Server) removeNodeConn(ctx *RPCContext) {

s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
state, ok := s.nodeConns[ctx.NodeID]
conns, ok := s.nodeConns[ctx.NodeID]
if !ok {
return
}
Expand All @@ -80,9 +108,12 @@ func (s *Server) removeNodeConn(ctx *RPCContext) {
// dial various addresses that all route to the same server. The most common
// case for this is the original address the client uses to connect to the
// server differs from the advertised address sent by the heartbeat.
if state.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() &&
state.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() {
delete(s.nodeConns, ctx.NodeID)
for i, conn := range conns {
if conn.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() &&
conn.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() {
s.nodeConns[ctx.NodeID] = append(s.nodeConns[ctx.NodeID][:i], s.nodeConns[ctx.NodeID][i+1:]...)
return
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions nomad/client_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestServer_removeNodeConn_differentAddrs(t *testing.T) {
s1.addNodeConn(ctx1)
s1.addNodeConn(ctx2)
require.Len(s1.connectedNodes(), 1)
require.Len(s1.nodeConns[nodeID], 2)

// Check that the value is the second conn.
state, ok := s1.getNodeConn(nodeID)
Expand All @@ -66,6 +67,7 @@ func TestServer_removeNodeConn_differentAddrs(t *testing.T) {
// Delete the first
s1.removeNodeConn(ctx1)
require.Len(s1.connectedNodes(), 1)
require.Len(s1.nodeConns[nodeID], 1)

// Check that the value is the second conn.
state, ok = s1.getNodeConn(nodeID)
Expand Down
4 changes: 2 additions & 2 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type Server struct {

// nodeConns is the set of multiplexed node connections we have keyed by
// NodeID
nodeConns map[string]*nodeConnState
nodeConns map[string][]*nodeConnState
nodeConnsLock sync.RWMutex

// peers is used to track the known Nomad servers. This is
Expand Down Expand Up @@ -294,7 +294,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
tlsWrap: tlsWrap,
rpcServer: rpc.NewServer(),
streamingRpcs: structs.NewStreamingRpcRegistry(),
nodeConns: make(map[string]*nodeConnState),
nodeConns: make(map[string][]*nodeConnState),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reconcileCh: make(chan serf.Member, 32),
Expand Down

0 comments on commit 821a223

Please sign in to comment.