Skip to content

Commit

Permalink
idle sweeper: Don't close connections with pending calls
Browse files Browse the repository at this point in the history
As documented in #701, we currently attempt closing connections even
if they have pending calls. This can cause issues if the calls is stuck
as the connection enters start-close, so it'll reject new incoming
calls, but it doesn't close, so the remote side keeps sending calls over
the same connection.

Fixes #701.
  • Loading branch information
prashantv committed Jul 27, 2018
1 parent dfb0917 commit 6178642
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 1 deletion.
5 changes: 5 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,11 @@ func (c *Connection) closeSendCh(connID uint32) {
close(c.stopCh)
}

// hasExchanges returns whether there's any pending inbound or outbound calls on this connection.
func (c *Connection) hasExchanges() bool {
return c.inbound.count() > 0 || c.outbound.count() > 0
}

// checkExchanges is called whenever an exchange is removed, and when Close is called.
func (c *Connection) checkExchanges() {
c.callOnExchangeChange()
Expand Down
7 changes: 7 additions & 0 deletions idle_sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ func (is *idleSweep) checkIdleConnections() {
continue
}

// We shouldn't get to a state where we have pending calls, but the connection
// is idle. This either means the max-idle time is too low, or there's a stuck call.
if conn.hasExchanges() {
conn.log.Error("Skip closing idle Connection as it has pending calls.")
continue
}

is.ch.log.WithFields(
LogField{"remotePeer", conn.remotePeerInfo},
LogField{"lastActivityTime", conn.getLastActivityTime()},
Expand Down
67 changes: 66 additions & 1 deletion idle_sweep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/tchannel-go/testutils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// peerStatusListener is a test tool used to wait for connections to drop by
Expand All @@ -51,6 +52,8 @@ func (pl *peerStatusListener) onStatusChange(p *Peer) {
}

func (pl *peerStatusListener) waitForZeroConnections(t *testing.T, channels ...*Channel) bool {
t.Helper()

for {
select {
case <-pl.changes:
Expand All @@ -59,7 +62,8 @@ func (pl *peerStatusListener) waitForZeroConnections(t *testing.T, channels ...*
}

case <-time.After(testutils.Timeout(500 * time.Millisecond)):
return assert.Fail(t, "Some connections are still open: %s", connectionStatus(channels))
t.Fatalf("Some connections are still open: %s", connectionStatus(channels))
return false
}
}
}
Expand Down Expand Up @@ -280,3 +284,64 @@ func TestIdleSweepMisconfiguration(t *testing.T) {
assert.Nil(t, ch, "NewChannel should not return a channel")
assert.Error(t, err, "NewChannel should fail")
}

func TestIdleSweepIgnoresConnectionsWithCalls(t *testing.T) {
ctx, cancel := NewContext(time.Second)
defer cancel()

clientTicker := testutils.NewFakeTicker()
clock := testutils.NewStubClock(time.Now())

listener := newPeerStatusListener()
serverOpts := testutils.NewOpts().
AddLogFilter("Skip closing idle Connection as it has pending calls.", 1).
SetOnPeerStatusChanged(listener.onStatusChange).
SetTimeNow(clock.Now).
SetTimeTicker(clientTicker.New).
SetMaxIdleTime(3 * time.Minute).
SetIdleCheckInterval(30 * time.Second).
NoRelay()

testutils.WithTestServer(t, serverOpts, func(ts *testutils.TestServer) {
var (
gotCall = make(chan struct{})
block = make(chan struct{})
)
testutils.RegisterEcho(ts.Server(), func() {
close(gotCall)
<-block
})

clientOpts := testutils.NewOpts().SetOnPeerStatusChanged(listener.onStatusChange)

// Client 1 will just ping, so we create a connection that should be closed.
c1 := ts.NewClient(clientOpts)
require.NoError(t, c1.Ping(ctx, ts.HostPort()), "Ping failed")

// Client 2 will make a call that will be blocked. Wait for the call to be received.
c2CallComplete := make(chan struct{})
c2 := ts.NewClient(clientOpts)
go func() {
testutils.AssertEcho(t, c2, ts.HostPort(), ts.ServiceName())
close(c2CallComplete)
}()
<-gotCall

// Ensure we have 2 connections on the server side.
assert.Equal(t, 2, ts.Server().IntrospectNumConnections(), "Expect connection to client 1 and client 2")

// Let the idle checker to close client 1's connection.
clock.Elapse(5 * time.Minute)
clientTicker.Tick()
listener.waitForZeroConnections(t, c1)

assert.Equal(t, 1, ts.Server().IntrospectNumConnections(), "Expect connection only to client 2")

// Unblock the call.
close(block)
<-c2CallComplete
clock.Elapse(5 * time.Minute)
clientTicker.Tick()
listener.waitForZeroConnections(t, ts.Server(), c2)
})
}

0 comments on commit 6178642

Please sign in to comment.