Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

idle sweeper: Don't close connections with pending calls #712

Merged
merged 2 commits into from
Jul 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,11 @@ func (c *Connection) closeSendCh(connID uint32) {
close(c.stopCh)
}

// hasExchanges returns whether there's any pending inbound or outbound calls on this connection.
func (c *Connection) hasExchanges() bool {
return c.inbound.count() > 0 || c.outbound.count() > 0
}

// checkExchanges is called whenever an exchange is removed, and when Close is called.
func (c *Connection) checkExchanges() {
c.callOnExchangeChange()
Expand Down
7 changes: 7 additions & 0 deletions idle_sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ func (is *idleSweep) checkIdleConnections() {
continue
}

// We shouldn't get to a state where we have pending calls, but the connection
// is idle. This either means the max-idle time is too low, or there's a stuck call.
if conn.hasExchanges() {
conn.log.Error("Skip closing idle Connection as it has pending calls.")
continue
}

is.ch.log.WithFields(
LogField{"remotePeer", conn.remotePeerInfo},
LogField{"lastActivityTime", conn.getLastActivityTime()},
Expand Down
66 changes: 65 additions & 1 deletion idle_sweep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/tchannel-go/testutils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// peerStatusListener is a test tool used to wait for connections to drop by
Expand Down Expand Up @@ -59,7 +60,8 @@ func (pl *peerStatusListener) waitForZeroConnections(t *testing.T, channels ...*
}

case <-time.After(testutils.Timeout(500 * time.Millisecond)):
return assert.Fail(t, "Some connections are still open: %s", connectionStatus(channels))
t.Fatalf("Some connections are still open: %s", connectionStatus(channels))
return false
}
}
}
Expand Down Expand Up @@ -280,3 +282,65 @@ func TestIdleSweepMisconfiguration(t *testing.T) {
assert.Nil(t, ch, "NewChannel should not return a channel")
assert.Error(t, err, "NewChannel should fail")
}

func TestIdleSweepIgnoresConnectionsWithCalls(t *testing.T) {
ctx, cancel := NewContext(time.Second)
defer cancel()

clientTicker := testutils.NewFakeTicker()
clock := testutils.NewStubClock(time.Now())

listener := newPeerStatusListener()
// TODO: Log filtering doesn't require the message to be seen.
serverOpts := testutils.NewOpts().
AddLogFilter("Skip closing idle Connection as it has pending calls.", 1).
SetOnPeerStatusChanged(listener.onStatusChange).
SetTimeNow(clock.Now).
SetTimeTicker(clientTicker.New).
SetMaxIdleTime(3 * time.Minute).
SetIdleCheckInterval(30 * time.Second).
NoRelay()

testutils.WithTestServer(t, serverOpts, func(ts *testutils.TestServer) {
var (
gotCall = make(chan struct{})
block = make(chan struct{})
)
testutils.RegisterEcho(ts.Server(), func() {
close(gotCall)
<-block
})

clientOpts := testutils.NewOpts().SetOnPeerStatusChanged(listener.onStatusChange)

// Client 1 will just ping, so we create a connection that should be closed.
c1 := ts.NewClient(clientOpts)
require.NoError(t, c1.Ping(ctx, ts.HostPort()), "Ping failed")

// Client 2 will make a call that will be blocked. Wait for the call to be received.
c2CallComplete := make(chan struct{})
c2 := ts.NewClient(clientOpts)
go func() {
testutils.AssertEcho(t, c2, ts.HostPort(), ts.ServiceName())
close(c2CallComplete)
}()
<-gotCall

// Ensure we have 2 connections on the server side.
assert.Equal(t, 2, ts.Server().IntrospectNumConnections(), "Expect connection to client 1 and client 2")

// Let the idle checker to close client 1's connection.
clock.Elapse(5 * time.Minute)
clientTicker.Tick()
listener.waitForZeroConnections(t, c1)

assert.Equal(t, 1, ts.Server().IntrospectNumConnections(), "Expect connection only to client 2")

// Unblock the call.
close(block)
<-c2CallComplete
clock.Elapse(5 * time.Minute)
clientTicker.Tick()
listener.waitForZeroConnections(t, ts.Server(), c2)
})
}