Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection: Separate out lastActivity into recv and send activity #770

Merged
merged 1 commit into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,10 @@ type Connection struct {
healthCheckDone chan struct{}
healthCheckHistory *healthHistory

// lastActivity is used to track how long the connection has been idle.
// (unix time, nano)
lastActivity atomic.Int64
// lastActivity{Read,Write} is used to track how long the connection has been
// idle for the recieve and send connections respectively. (unix time, nano)
lastActivityRead atomic.Int64
lastActivityWrite atomic.Int64
}

type peerAddressComponents struct {
Expand Down Expand Up @@ -322,6 +323,7 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str

log = log.WithFields(LogField{"connectionDirection", connDirection})
peerInfo := ch.PeerInfo()
timeNow := ch.timeNow().UnixNano()

c := &Connection{
channelConnectionCommon: ch.channelConnectionCommon,
Expand All @@ -345,7 +347,8 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str
events: events,
commonStatsTags: ch.commonStatsTags,
healthCheckHistory: newHealthHistory(),
lastActivity: *atomic.NewInt64(ch.timeNow().UnixNano()),
lastActivityRead: *atomic.NewInt64(timeNow),
lastActivityWrite: *atomic.NewInt64(timeNow),
}

if tosPriority := opts.TosPriority; tosPriority > 0 {
Expand Down Expand Up @@ -666,7 +669,7 @@ func (c *Connection) readFrames(_ uint32) {
return
}

c.updateLastActivity(frame)
c.updateLastActivityRead(frame)

var releaseFrame bool
if c.relay == nil {
Expand Down Expand Up @@ -736,7 +739,7 @@ func (c *Connection) writeFrames(_ uint32) {
c.log.Debugf("Writing frame %s", f.Header)
}

c.updateLastActivity(f)
c.updateLastActivityWrite(f)
err := f.WriteOut(c.conn)
c.opts.FramePool.Release(f)
if err != nil {
Expand All @@ -755,13 +758,19 @@ func (c *Connection) writeFrames(_ uint32) {
}
}

// updateLastActivity marks when the last message was received/sent on the channel.
// updateLastActivityRead marks when the last message was received on the channel.
// This is used for monitoring idle connections and timing them out.
func (c *Connection) updateLastActivity(frame *Frame) {
// Pings are ignored for last activity.
switch frame.Header.messageType {
case messageTypeCallReq, messageTypeCallReqContinue, messageTypeCallRes, messageTypeCallResContinue, messageTypeError:
c.lastActivity.Store(c.timeNow().UnixNano())
func (c *Connection) updateLastActivityRead(frame *Frame) {
if isMessageTypeCall(frame) {
c.lastActivityRead.Store(c.timeNow().UnixNano())
}
}

// updateLastActivityWrite marks when the last message was sent on the channel.
// This is used for monitoring idle connections and timing them out.
func (c *Connection) updateLastActivityWrite(frame *Frame) {
if isMessageTypeCall(frame) {
c.lastActivityWrite.Store(c.timeNow().UnixNano())
}
}

Expand Down Expand Up @@ -895,11 +904,18 @@ func (c *Connection) closeNetwork() {
}
}

// getLastActivityTime returns the timestamp of the last frame read or written,
// getLastActivityReadTime returns the timestamp of the last frame read,
// excluding pings. If no frames were transmitted yet, it will return the time
// this connection was created.
func (c *Connection) getLastActivityReadTime() time.Time {
return time.Unix(0, c.lastActivityRead.Load())
}

// getLastActivityWriteTime returns the timestamp of the last frame written,
// excluding pings. If no frames were transmitted yet, it will return the time
// this connection was created.
func (c *Connection) getLastActivityTime() time.Time {
return time.Unix(0, c.lastActivity.Load())
func (c *Connection) getLastActivityWriteTime() time.Time {
return time.Unix(0, c.lastActivityWrite.Load())
}

func (c *Connection) sendBufSize() (sendBufUsage int, sendBufSize int, _ error) {
Expand Down Expand Up @@ -937,3 +953,13 @@ func getSysConn(conn net.Conn, log Logger) syscall.RawConn {

return sysConn
}

func isMessageTypeCall(frame *Frame) bool {
// Pings are ignored for last activity.
switch frame.Header.messageType {
case messageTypeCallReq, messageTypeCallReqContinue, messageTypeCallRes, messageTypeCallResContinue, messageTypeError:
return true
}

return false
}
42 changes: 29 additions & 13 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,20 +1165,28 @@ func TestLastActivityTime(t *testing.T) {
responseReceived := make(chan struct{})

// Helper function that checks the last activity time on client, server and relay.
validateLastActivityTime := func(expected time.Time) {
validateLastActivityTime := func(expectedReq time.Time, expectedResp time.Time) {
clientConn := getConnection(t, client, outbound)
serverConn := getConnection(t, server, inbound)
now := expected.UnixNano()
reqTime := expectedReq.UnixNano()
respTime := expectedResp.UnixNano()

assert.Equal(t, now, clientConn.LastActivity)
assert.Equal(t, now, serverConn.LastActivity)
assert.Equal(t, reqTime, clientConn.LastActivityWrite)
assert.Equal(t, reqTime, serverConn.LastActivityRead)

assert.Equal(t, respTime, clientConn.LastActivityRead)
assert.Equal(t, respTime, serverConn.LastActivityWrite)

// Relays should act like both clients and servers.
if ts.HasRelay() {
relayInbound := getConnection(t, ts.Relay(), inbound)
relayOutbound := getConnection(t, ts.Relay(), outbound)
assert.Equal(t, now, relayInbound.LastActivity)
assert.Equal(t, now, relayOutbound.LastActivity)

assert.Equal(t, reqTime, relayInbound.LastActivityRead)
assert.Equal(t, reqTime, relayOutbound.LastActivityWrite)

assert.Equal(t, respTime, relayInbound.LastActivityWrite)
assert.Equal(t, respTime, relayOutbound.LastActivityRead)
}
}

Expand All @@ -1191,6 +1199,7 @@ func TestLastActivityTime(t *testing.T) {
clock.Elapse(1 * time.Second)
})

initTime := clock.Now()
// Run the test twice, because the first call will also establish a connection.
for i := 0; i < 2; i++ {
beforeCallSent := clock.Now()
Expand All @@ -1202,15 +1211,19 @@ func TestLastActivityTime(t *testing.T) {

// Verify that the last activity time was updated before a response is received.
<-callReceived
validateLastActivityTime(beforeCallSent)
validateLastActivityTime(beforeCallSent, initTime)

// Let the server respond.
blockResponse <- struct{}{}

// After a response was received, time should be +1s. Validate again that
// the last activity time was updated.
// After a response was received, time of the response should be +1s,
// without a change to the requet time. Validate again that the last
// activity time was updated.
<-responseReceived
validateLastActivityTime(beforeCallSent.Add(1 * time.Second))
validateLastActivityTime(beforeCallSent, beforeCallSent.Add(1*time.Second))

// Set the initTime as the time of the last response.
initTime = beforeCallSent.Add(1 * time.Second)

// Elapse the clock for our next iteration.
clock.Elapse(1 * time.Minute)
Expand Down Expand Up @@ -1244,16 +1257,19 @@ func TestLastActivityTimePings(t *testing.T) {

// Verify last activity time.
clientConn := getConnection(t, client, outbound)
assert.Equal(t, timeAtStart, clientConn.LastActivity)
assert.Equal(t, timeAtStart, clientConn.LastActivityRead)
assert.Equal(t, timeAtStart, clientConn.LastActivityWrite)

// Relays do not pass pings on to the server.
if ts.HasRelay() {
relayInbound := getConnection(t, ts.Relay(), inbound)
assert.Equal(t, timeAtStart, relayInbound.LastActivity)
assert.Equal(t, timeAtStart, relayInbound.LastActivityRead)
assert.Equal(t, timeAtStart, relayInbound.LastActivityWrite)
}

serverConn := getConnection(t, ts.Server(), inbound)
assert.Equal(t, timeAtStart, serverConn.LastActivity)
assert.Equal(t, timeAtStart, serverConn.LastActivityRead)
assert.Equal(t, timeAtStart, serverConn.LastActivityWrite)

clock.Elapse(1 * time.Second)
}
Expand Down
10 changes: 8 additions & 2 deletions idle_sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ func (is *idleSweep) checkIdleConnections() {
idleConnections := make([]*Connection, 0, 10)
is.ch.mutable.RLock()
for _, conn := range is.ch.mutable.conns {
if idleTime := now.Sub(conn.getLastActivityTime()); idleTime >= is.maxIdleTime {
lastActivityTime := conn.getLastActivityReadTime()
if sendActivityTime := conn.getLastActivityWriteTime(); lastActivityTime.Before(sendActivityTime) {
lastActivityTime = sendActivityTime
}

if idleTime := now.Sub(lastActivityTime); idleTime >= is.maxIdleTime {
idleConnections = append(idleConnections, conn)
}
}
Expand All @@ -116,7 +121,8 @@ func (is *idleSweep) checkIdleConnections() {

is.ch.log.WithFields(
LogField{"remotePeer", conn.remotePeerInfo},
LogField{"lastActivityTime", conn.getLastActivityTime()},
LogField{"lastActivityTimeRead", conn.getLastActivityReadTime()},
LogField{"lastActivityTimeWrite", conn.getLastActivityWriteTime()},
).Info("Closing idle inbound connection.")
conn.close(LogField{"reason", "Idle connection closed"})
}
Expand Down
60 changes: 31 additions & 29 deletions introspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,22 @@ type SubPeerScore struct {

// ConnectionRuntimeState is the runtime state for a single connection.
type ConnectionRuntimeState struct {
ID uint32 `json:"id"`
ConnectionState string `json:"connectionState"`
LocalHostPort string `json:"localHostPort"`
RemoteHostPort string `json:"remoteHostPort"`
OutboundHostPort string `json:"outboundHostPort"`
RemotePeer PeerInfo `json:"remotePeer"`
InboundExchange ExchangeSetRuntimeState `json:"inboundExchange"`
OutboundExchange ExchangeSetRuntimeState `json:"outboundExchange"`
Relayer RelayerRuntimeState `json:"relayer"`
HealthChecks []bool `json:"healthChecks,omitempty"`
LastActivity int64 `json:"lastActivity"`
SendChQueued int `json:"sendChQueued"`
SendChCapacity int `json:"sendChCapacity"`
SendBufferUsage int `json:"sendBufferUsage"`
SendBufferSize int `json:"sendBufferSize"`
ID uint32 `json:"id"`
ConnectionState string `json:"connectionState"`
LocalHostPort string `json:"localHostPort"`
RemoteHostPort string `json:"remoteHostPort"`
OutboundHostPort string `json:"outboundHostPort"`
RemotePeer PeerInfo `json:"remotePeer"`
InboundExchange ExchangeSetRuntimeState `json:"inboundExchange"`
OutboundExchange ExchangeSetRuntimeState `json:"outboundExchange"`
Relayer RelayerRuntimeState `json:"relayer"`
HealthChecks []bool `json:"healthChecks,omitempty"`
LastActivityRead int64 `json:"lastActivityRead"`
LastActivityWrite int64 `json:"lastActivityWrite"`
SendChQueued int `json:"sendChQueued"`
SendChCapacity int `json:"sendChCapacity"`
SendBufferUsage int `json:"sendBufferUsage"`
SendBufferSize int `json:"sendBufferSize"`
}

// RelayerRuntimeState is the runtime state for a single relayer.
Expand Down Expand Up @@ -361,20 +362,21 @@ func (c *Connection) IntrospectState(opts *IntrospectionOptions) ConnectionRunti

// TODO(prashantv): Add total number of health checks, and health check options.
state := ConnectionRuntimeState{
ID: c.connID,
ConnectionState: c.state.String(),
LocalHostPort: c.conn.LocalAddr().String(),
RemoteHostPort: c.conn.RemoteAddr().String(),
OutboundHostPort: c.outboundHP,
RemotePeer: c.remotePeerInfo,
InboundExchange: c.inbound.IntrospectState(opts),
OutboundExchange: c.outbound.IntrospectState(opts),
HealthChecks: c.healthCheckHistory.asBools(),
LastActivity: c.lastActivity.Load(),
SendChQueued: len(c.sendCh),
SendChCapacity: cap(c.sendCh),
SendBufferUsage: sendBufUsage,
SendBufferSize: sendBufSize,
ID: c.connID,
ConnectionState: c.state.String(),
LocalHostPort: c.conn.LocalAddr().String(),
RemoteHostPort: c.conn.RemoteAddr().String(),
OutboundHostPort: c.outboundHP,
RemotePeer: c.remotePeerInfo,
InboundExchange: c.inbound.IntrospectState(opts),
OutboundExchange: c.outbound.IntrospectState(opts),
HealthChecks: c.healthCheckHistory.asBools(),
LastActivityRead: c.lastActivityRead.Load(),
LastActivityWrite: c.lastActivityWrite.Load(),
SendChQueued: len(c.sendCh),
SendChCapacity: cap(c.sendCh),
SendBufferUsage: sendBufUsage,
SendBufferSize: sendBufSize,
}
if c.relay != nil {
state.Relayer = c.relay.IntrospectState(opts)
Expand Down