diff --git a/client/rpc.go b/client/rpc.go index c106f5d4f4c..01c0bf35869 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -15,7 +15,6 @@ import ( inmem "github.com/hashicorp/nomad/helper/codec" "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/yamux" ) // rpcEndpoints holds the RPC endpoints @@ -282,30 +281,30 @@ func (c *Client) setupClientRpcServer(server *rpc.Server) { // connection. func (c *Client) rpcConnListener() { // Make a channel for new connections. - conns := make(chan *yamux.Session, 4) + conns := make(chan *pool.Conn, 4) c.connPool.SetConnListener(conns) for { select { case <-c.shutdownCh: return - case session, ok := <-conns: + case conn, ok := <-conns: if !ok { continue } - go c.listenConn(session) + go c.listenConn(conn) } } } // listenConn is used to listen for connections being made from the server on // pre-existing connection. This should be called in a goroutine. -func (c *Client) listenConn(s *yamux.Session) { +func (c *Client) listenConn(conn *pool.Conn) { for { - conn, err := s.Accept() + stream, err := conn.AcceptStream() if err != nil { - if s.IsClosed() { + if conn.IsClosed() { return } @@ -313,7 +312,7 @@ func (c *Client) listenConn(s *yamux.Session) { continue } - go c.handleConn(conn) + go c.handleConn(stream) metrics.IncrCounter([]string{"client", "rpc", "accept_conn"}, 1) } } diff --git a/helper/pool/pool.go b/helper/pool/pool.go index c6960759ade..f1899d11f29 100644 --- a/helper/pool/pool.go +++ b/helper/pool/pool.go @@ -56,12 +56,21 @@ type Conn struct { clientLock sync.Mutex } -// markForUse does all the bookkeeping required to ready a connection for use. +// markForUse does all the bookkeeping required to ready a connection for use, +// and ensure that active connections don't get reaped. func (c *Conn) markForUse() { c.lastUsed = time.Now() atomic.AddInt32(&c.refCount, 1) } +// releaseUse is the complement of `markForUse`, to free up the reference count +func (c *Conn) releaseUse() { + refCount := atomic.AddInt32(&c.refCount, -1) + if refCount == 0 && atomic.LoadInt32(&c.shouldClose) == 1 { + c.Close() + } +} + func (c *Conn) Close() error { return c.session.Close() } @@ -122,6 +131,40 @@ func (c *Conn) returnClient(client *StreamClient) { } } +func (c *Conn) IsClosed() bool { + return c.session.IsClosed() +} + +func (c *Conn) AcceptStream() (net.Conn, error) { + s, err := c.session.AcceptStream() + if err != nil { + return nil, err + } + + c.markForUse() + return &incomingStream{ + Stream: s, + parent: c, + }, nil +} + +// incomingStream wraps yamux.Stream but frees the underlying yamux.Session +// when closed +type incomingStream struct { + *yamux.Stream + + parent *Conn +} + +func (s *incomingStream) Close() error { + err := s.Stream.Close() + + // always release parent even if error + s.parent.releaseUse() + + return err +} + // ConnPool is used to maintain a connection pool to other // Nomad servers. This is used to reduce the latency of // RPC requests between servers. It is only used to pool @@ -157,7 +200,7 @@ type ConnPool struct { // connListener is used to notify a potential listener of a new connection // being made. - connListener chan<- *yamux.Session + connListener chan<- *Conn } // NewPool is used to make a new connection pool @@ -220,7 +263,7 @@ func (p *ConnPool) ReloadTLS(tlsWrap tlsutil.RegionWrapper) { // SetConnListener is used to listen to new connections being made. The // channel will be closed when the conn pool is closed or a new listener is set. -func (p *ConnPool) SetConnListener(l chan<- *yamux.Session) { +func (p *ConnPool) SetConnListener(l chan<- *Conn) { p.Lock() defer p.Unlock() @@ -276,7 +319,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er // If there is a connection listener, notify them of the new connection. if p.connListener != nil { select { - case p.connListener <- c.session: + case p.connListener <- c: default: } } @@ -386,14 +429,6 @@ func (p *ConnPool) clearConn(conn *Conn) { } } -// releaseConn is invoked when we are done with a conn to reduce the ref count -func (p *ConnPool) releaseConn(conn *Conn) { - refCount := atomic.AddInt32(&conn.refCount, -1) - if refCount == 0 && atomic.LoadInt32(&conn.shouldClose) == 1 { - conn.Close() - } -} - // getClient is used to get a usable client for an address and protocol version func (p *ConnPool) getRPCClient(region string, addr net.Addr, version int) (*Conn, *StreamClient, error) { retries := 0 @@ -408,7 +443,7 @@ START: client, err := conn.getRPCClient() if err != nil { p.clearConn(conn) - p.releaseConn(conn) + conn.releaseUse() // Try to redial, possible that the TCP session closed due to timeout if retries == 0 { @@ -448,6 +483,7 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string, if err != nil { return fmt.Errorf("rpc error: %w", err) } + defer conn.releaseUse() // Make the RPC call err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply) @@ -461,8 +497,6 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string, p.clearConn(conn) } - p.releaseConn(conn) - // If the error is an RPC Coded error // return the coded error without wrapping if structs.IsErrRPCCoded(err) { @@ -475,7 +509,6 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string, // Done with the connection conn.returnClient(sc) - p.releaseConn(conn) return nil } diff --git a/helper/pool/pool_test.go b/helper/pool/pool_test.go index 260e9861ac0..f6e4f8754a1 100644 --- a/helper/pool/pool_test.go +++ b/helper/pool/pool_test.go @@ -9,7 +9,6 @@ import ( "github.com/hashicorp/nomad/helper/freeport" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/yamux" "github.com/stretchr/testify/require" ) @@ -47,7 +46,7 @@ func TestConnPool_ConnListener(t *testing.T) { pool := newTestPool(t) // Setup a listener - c := make(chan *yamux.Session, 1) + c := make(chan *Conn, 1) pool.SetConnListener(c) // Make an RPC