Skip to content

Commit

Permalink
connection: Separate out lastActivity into recv and send activity
Browse files Browse the repository at this point in the history
`lastActivity` recorded when the last receive or send activity occurred over
its connection.  Separate this out into `lastActivityRecv` and
`lastActivitySend` to observe the last receives or sends allowing monitoring of
the elapsed time between reading from the recv buffer and writing to the send
buffer.   This will provide information on how stalled either buffer is.
  • Loading branch information
witriew committed Mar 19, 2020
1 parent 2a4a6f9 commit 6489da6
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 29 deletions.
43 changes: 31 additions & 12 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,10 @@ type Connection struct {
healthCheckDone chan struct{}
healthCheckHistory *healthHistory

// lastActivity is used to track how long the connection has been idle.
// (unix time, nano)
lastActivity atomic.Int64
// lastActivity{Recv,Send} is used to track how long the connection has been
// idle for the recieve and send connections respectively. (unix time, nano)
lastActivityRecv atomic.Int64
lastActivitySend atomic.Int64
}

type peerAddressComponents struct {
Expand Down Expand Up @@ -324,7 +325,8 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str
events: events,
commonStatsTags: ch.commonStatsTags,
healthCheckHistory: newHealthHistory(),
lastActivity: *atomic.NewInt64(ch.timeNow().UnixNano()),
lastActivityRecv: *atomic.NewInt64(ch.timeNow().UnixNano()),
lastActivitySend: *atomic.NewInt64(ch.timeNow().UnixNano()),
}

if tosPriority := opts.TosPriority; tosPriority > 0 {
Expand Down Expand Up @@ -645,7 +647,7 @@ func (c *Connection) readFrames(_ uint32) {
return
}

c.updateLastActivity(frame)
c.updateLastActivityRecv(frame)

var releaseFrame bool
if c.relay == nil {
Expand Down Expand Up @@ -715,7 +717,7 @@ func (c *Connection) writeFrames(_ uint32) {
c.log.Debugf("Writing frame %s", f.Header)
}

c.updateLastActivity(f)
c.updateLastActivitySend(f)
err := f.WriteOut(c.conn)
c.opts.FramePool.Release(f)
if err != nil {
Expand All @@ -734,13 +736,23 @@ func (c *Connection) writeFrames(_ uint32) {
}
}

// updateLastActivity marks when the last message was received/sent on the channel.
// updateLastActivityRecv marks when the last message was received on the channel.
// This is used for monitoring idle connections and timing them out.
func (c *Connection) updateLastActivity(frame *Frame) {
func (c *Connection) updateLastActivityRecv(frame *Frame) {
// Pings are ignored for last activity.
switch frame.Header.messageType {
case messageTypeCallReq, messageTypeCallReqContinue, messageTypeCallRes, messageTypeCallResContinue, messageTypeError:
c.lastActivity.Store(c.timeNow().UnixNano())
c.lastActivityRecv.Store(c.timeNow().UnixNano())
}
}

// updateLastActivitySend marks when the last message was sent on the channel.
// This is used for monitoring idle connections and timing them out.
func (c *Connection) updateLastActivitySend(frame *Frame) {
// Pings are ignored for last activity.
switch frame.Header.messageType {
case messageTypeCallReq, messageTypeCallReqContinue, messageTypeCallRes, messageTypeCallResContinue, messageTypeError:
c.lastActivitySend.Store(c.timeNow().UnixNano())
}
}

Expand Down Expand Up @@ -874,11 +886,18 @@ func (c *Connection) closeNetwork() {
}
}

// getLastActivityTime returns the timestamp of the last frame read or written,
// getLastActivityRecvTime returns the timestamp of the last frame read or written,
// excluding pings. If no frames were transmitted yet, it will return the time
// this connection was created.
func (c *Connection) getLastActivityRecvTime() time.Time {
return time.Unix(0, c.lastActivityRecv.Load())
}

// getLastActivitySendTime returns the timestamp of the last frame read or written,
// excluding pings. If no frames were transmitted yet, it will return the time
// this connection was created.
func (c *Connection) getLastActivityTime() time.Time {
return time.Unix(0, c.lastActivity.Load())
func (c *Connection) getLastActivitySendTime() time.Time {
return time.Unix(0, c.lastActivitySend.Load())
}

func (c *Connection) sendBufSize() (sendBufUsage int, sendBufSize int, _ error) {
Expand Down
42 changes: 29 additions & 13 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,20 +1165,28 @@ func TestLastActivityTime(t *testing.T) {
responseReceived := make(chan struct{})

// Helper function that checks the last activity time on client, server and relay.
validateLastActivityTime := func(expected time.Time) {
validateLastActivityTime := func(expectedReq time.Time, expectedResp time.Time) {
clientConn := getConnection(t, client, outbound)
serverConn := getConnection(t, server, inbound)
now := expected.UnixNano()
reqTime := expectedReq.UnixNano()
respTime := expectedResp.UnixNano()

assert.Equal(t, now, clientConn.LastActivity)
assert.Equal(t, now, serverConn.LastActivity)
assert.Equal(t, reqTime, clientConn.LastActivitySend)
assert.Equal(t, reqTime, serverConn.LastActivityRecv)

assert.Equal(t, respTime, clientConn.LastActivityRecv)
assert.Equal(t, respTime, serverConn.LastActivitySend)

// Relays should act like both clients and servers.
if ts.HasRelay() {
relayInbound := getConnection(t, ts.Relay(), inbound)
relayOutbound := getConnection(t, ts.Relay(), outbound)
assert.Equal(t, now, relayInbound.LastActivity)
assert.Equal(t, now, relayOutbound.LastActivity)

assert.Equal(t, reqTime, relayInbound.LastActivityRecv)
assert.Equal(t, reqTime, relayOutbound.LastActivitySend)

assert.Equal(t, respTime, relayInbound.LastActivitySend)
assert.Equal(t, respTime, relayOutbound.LastActivityRecv)
}
}

Expand All @@ -1191,6 +1199,7 @@ func TestLastActivityTime(t *testing.T) {
clock.Elapse(1 * time.Second)
})

initTime := clock.Now()
// Run the test twice, because the first call will also establish a connection.
for i := 0; i < 2; i++ {
beforeCallSent := clock.Now()
Expand All @@ -1202,15 +1211,19 @@ func TestLastActivityTime(t *testing.T) {

// Verify that the last activity time was updated before a response is received.
<-callReceived
validateLastActivityTime(beforeCallSent)
validateLastActivityTime(beforeCallSent, initTime)

// Let the server respond.
blockResponse <- struct{}{}

// After a response was received, time should be +1s. Validate again that
// the last activity time was updated.
// After a response was received, time of the response should be +1s,
// without a change to the requet time. Validate again that the last
// activity time was updated.
<-responseReceived
validateLastActivityTime(beforeCallSent.Add(1 * time.Second))
validateLastActivityTime(beforeCallSent, beforeCallSent.Add(1*time.Second))

// Set the initTime as the time of the last response.
initTime = beforeCallSent.Add(1 * time.Second)

// Elapse the clock for our next iteration.
clock.Elapse(1 * time.Minute)
Expand Down Expand Up @@ -1244,16 +1257,19 @@ func TestLastActivityTimePings(t *testing.T) {

// Verify last activity time.
clientConn := getConnection(t, client, outbound)
assert.Equal(t, timeAtStart, clientConn.LastActivity)
assert.Equal(t, timeAtStart, clientConn.LastActivityRecv)
assert.Equal(t, timeAtStart, clientConn.LastActivitySend)

// Relays do not pass pings on to the server.
if ts.HasRelay() {
relayInbound := getConnection(t, ts.Relay(), inbound)
assert.Equal(t, timeAtStart, relayInbound.LastActivity)
assert.Equal(t, timeAtStart, relayInbound.LastActivityRecv)
assert.Equal(t, timeAtStart, relayInbound.LastActivitySend)
}

serverConn := getConnection(t, ts.Server(), inbound)
assert.Equal(t, timeAtStart, serverConn.LastActivity)
assert.Equal(t, timeAtStart, serverConn.LastActivityRecv)
assert.Equal(t, timeAtStart, serverConn.LastActivitySend)

clock.Elapse(1 * time.Second)
}
Expand Down
10 changes: 8 additions & 2 deletions idle_sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ func (is *idleSweep) checkIdleConnections() {
idleConnections := make([]*Connection, 0, 10)
is.ch.mutable.RLock()
for _, conn := range is.ch.mutable.conns {
if idleTime := now.Sub(conn.getLastActivityTime()); idleTime >= is.maxIdleTime {
lastActivityTime := conn.getLastActivityRecvTime()
if sendActivityTime := conn.getLastActivitySendTime(); lastActivityTime.Before(sendActivityTime) {
lastActivityTime = sendActivityTime
}

if idleTime := now.Sub(lastActivityTime); idleTime >= is.maxIdleTime {
idleConnections = append(idleConnections, conn)
}
}
Expand All @@ -116,7 +121,8 @@ func (is *idleSweep) checkIdleConnections() {

is.ch.log.WithFields(
LogField{"remotePeer", conn.remotePeerInfo},
LogField{"lastActivityTime", conn.getLastActivityTime()},
LogField{"lastActivityTimeRecv", conn.getLastActivityRecvTime()},
LogField{"lastActivityTimeSend", conn.getLastActivitySendTime()},
).Info("Closing idle inbound connection.")
conn.close(LogField{"reason", "Idle connection closed"})
}
Expand Down
6 changes: 4 additions & 2 deletions introspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ type ConnectionRuntimeState struct {
OutboundExchange ExchangeSetRuntimeState `json:"outboundExchange"`
Relayer RelayerRuntimeState `json:"relayer"`
HealthChecks []bool `json:"healthChecks,omitempty"`
LastActivity int64 `json:"lastActivity"`
LastActivityRecv int64 `json:"lastActivityRecv"`
LastActivitySend int64 `json:"lastActivitySend"`
SendChQueued int `json:"sendChQueued"`
SendChCapacity int `json:"sendChCapacity"`
SendBufferUsage int `json:"sendBufferUsage"`
Expand Down Expand Up @@ -370,7 +371,8 @@ func (c *Connection) IntrospectState(opts *IntrospectionOptions) ConnectionRunti
InboundExchange: c.inbound.IntrospectState(opts),
OutboundExchange: c.outbound.IntrospectState(opts),
HealthChecks: c.healthCheckHistory.asBools(),
LastActivity: c.lastActivity.Load(),
LastActivityRecv: c.lastActivityRecv.Load(),
LastActivitySend: c.lastActivitySend.Load(),
SendChQueued: len(c.sendCh),
SendChCapacity: cap(c.sendCh),
SendBufferUsage: sendBufUsage,
Expand Down

0 comments on commit 6489da6

Please sign in to comment.