-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pool: track usage of incoming streams #10710
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,12 +56,21 @@ type Conn struct { | |
clientLock sync.Mutex | ||
} | ||
|
||
// markForUse does all the bookkeeping required to ready a connection for use. | ||
// markForUse does all the bookkeeping required to ready a connection for use, | ||
// and ensure that active connections don't get reaped. | ||
func (c *Conn) markForUse() { | ||
c.lastUsed = time.Now() | ||
atomic.AddInt32(&c.refCount, 1) | ||
} | ||
|
||
// releaseUse is the complement of `markForUse`, to free up the reference count | ||
func (c *Conn) releaseUse() { | ||
refCount := atomic.AddInt32(&c.refCount, -1) | ||
if refCount == 0 && atomic.LoadInt32(&c.shouldClose) == 1 { | ||
c.Close() | ||
} | ||
} | ||
|
||
func (c *Conn) Close() error { | ||
return c.session.Close() | ||
} | ||
|
@@ -122,6 +131,40 @@ func (c *Conn) returnClient(client *StreamClient) { | |
} | ||
} | ||
|
||
func (c *Conn) IsClosed() bool { | ||
return c.session.IsClosed() | ||
} | ||
|
||
func (c *Conn) Accept() (net.Conn, error) { | ||
s, err := c.session.AcceptStream() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
c.markForUse() | ||
return &incomingStream{ | ||
Stream: s, | ||
parent: c, | ||
}, nil | ||
} | ||
|
||
// incomingStream wraps yamux.Stream but frees the underlying yamux.Session | ||
// when closed | ||
type incomingStream struct { | ||
*yamux.Stream | ||
|
||
parent *Conn | ||
} | ||
|
||
func (s *incomingStream) Close() error { | ||
err := s.Stream.Close() | ||
|
||
// always release parent even if error | ||
s.parent.releaseUse() | ||
|
||
return err | ||
} | ||
|
||
// ConnPool is used to maintain a connection pool to other | ||
// Nomad servers. This is used to reduce the latency of | ||
// RPC requests between servers. It is only used to pool | ||
|
@@ -157,7 +200,7 @@ type ConnPool struct { | |
|
||
// connListener is used to notify a potential listener of a new connection | ||
// being made. | ||
connListener chan<- *yamux.Session | ||
connListener chan<- *Conn | ||
} | ||
|
||
// NewPool is used to make a new connection pool | ||
|
@@ -220,7 +263,7 @@ func (p *ConnPool) ReloadTLS(tlsWrap tlsutil.RegionWrapper) { | |
|
||
// SetConnListener is used to listen to new connections being made. The | ||
// channel will be closed when the conn pool is closed or a new listener is set. | ||
func (p *ConnPool) SetConnListener(l chan<- *yamux.Session) { | ||
func (p *ConnPool) SetConnListener(l chan<- *Conn) { | ||
p.Lock() | ||
defer p.Unlock() | ||
|
||
|
@@ -276,7 +319,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er | |
// If there is a connection listener, notify them of the new connection. | ||
if p.connListener != nil { | ||
select { | ||
case p.connListener <- c.session: | ||
case p.connListener <- c: | ||
default: | ||
} | ||
} | ||
|
@@ -386,14 +429,6 @@ func (p *ConnPool) clearConn(conn *Conn) { | |
} | ||
} | ||
|
||
// releaseConn is invoked when we are done with a conn to reduce the ref count | ||
func (p *ConnPool) releaseConn(conn *Conn) { | ||
refCount := atomic.AddInt32(&conn.refCount, -1) | ||
if refCount == 0 && atomic.LoadInt32(&conn.shouldClose) == 1 { | ||
conn.Close() | ||
} | ||
} | ||
|
||
// getClient is used to get a usable client for an address and protocol version | ||
func (p *ConnPool) getRPCClient(region string, addr net.Addr, version int) (*Conn, *StreamClient, error) { | ||
retries := 0 | ||
|
@@ -408,7 +443,7 @@ START: | |
client, err := conn.getRPCClient() | ||
if err != nil { | ||
p.clearConn(conn) | ||
p.releaseConn(conn) | ||
conn.releaseUse() | ||
|
||
// Try to redial, possible that the TCP session closed due to timeout | ||
if retries == 0 { | ||
|
@@ -461,7 +496,7 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string, | |
p.clearConn(conn) | ||
} | ||
|
||
p.releaseConn(conn) | ||
conn.releaseUse() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While reviewing this PR I'm noticing we have an existing hidden temporal coupling of Two suggestions:
// releaseUse is the complement of `markForUse`, to free up the reference count
func (c *Conn) releaseUse() {
refCount := atomic.AddInt32(&c.refCount, -1)
if refCount < 1 && atomic.LoadInt32(&c.shouldClose) == 1 {
c.Close()
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree it's brittle, I'll move to defer. I'm unsure about handling negative refCounts. It's unclear how refCount can be negative by re-ordering calls. Only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's a good point. Totally agreed on that. |
||
|
||
// If the error is an RPC Coded error | ||
// return the coded error without wrapping | ||
|
@@ -475,7 +510,7 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string, | |
|
||
// Done with the connection | ||
conn.returnClient(sc) | ||
p.releaseConn(conn) | ||
conn.releaseUse() | ||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tiniest of nitpicks, but with the type being changed to
*pool.Conn
, naming this arguments
doesn't make sense anymore. Maybe rename the arg top
?(
pool.Conn
is kind of a weirdly named type in general because it's more like "connection factory" or "connect proxy" given thatAccept()
returnsnet.Conn
, but it isn't a "connection pool" either as it contains the connection pool. But probably best not to rework the whole thing. 😀 )There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change it to
c
. FWIW,pool.Conn
is the underlying physical connection -pool.Accept()
returns a wrappedyamux.Session
which also implementsnet.Conn
(and clients expect)..