From 617864261f196c052051713cf9777367cf4edb2f Mon Sep 17 00:00:00 2001 From: Prashant Varanasi Date: Fri, 27 Jul 2018 14:32:34 -0700 Subject: [PATCH] idle sweeper: Don't close connections with pending calls As documented in #701, we currently attempt closing connections even if they have pending calls. This can cause issues if the calls is stuck as the connection enters start-close, so it'll reject new incoming calls, but it doesn't close, so the remote side keeps sending calls over the same connection. Fixes #701. --- connection.go | 5 ++++ idle_sweep.go | 7 +++++ idle_sweep_test.go | 67 +++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 78 insertions(+), 1 deletion(-) diff --git a/connection.go b/connection.go index 1d592bd8..86a759d6 100644 --- a/connection.go +++ b/connection.go @@ -766,6 +766,11 @@ func (c *Connection) closeSendCh(connID uint32) { close(c.stopCh) } +// hasExchanges returns whether there's any pending inbound or outbound calls on this connection. +func (c *Connection) hasExchanges() bool { + return c.inbound.count() > 0 || c.outbound.count() > 0 +} + // checkExchanges is called whenever an exchange is removed, and when Close is called. func (c *Connection) checkExchanges() { c.callOnExchangeChange() diff --git a/idle_sweep.go b/idle_sweep.go index 87ad81c0..a1fcbc49 100644 --- a/idle_sweep.go +++ b/idle_sweep.go @@ -107,6 +107,13 @@ func (is *idleSweep) checkIdleConnections() { continue } + // We shouldn't get to a state where we have pending calls, but the connection + // is idle. This either means the max-idle time is too low, or there's a stuck call. + if conn.hasExchanges() { + conn.log.Error("Skip closing idle Connection as it has pending calls.") + continue + } + is.ch.log.WithFields( LogField{"remotePeer", conn.remotePeerInfo}, LogField{"lastActivityTime", conn.getLastActivityTime()}, diff --git a/idle_sweep_test.go b/idle_sweep_test.go index 4dacc410..a14e4762 100644 --- a/idle_sweep_test.go +++ b/idle_sweep_test.go @@ -32,6 +32,7 @@ import ( "github.com/uber/tchannel-go/testutils" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // peerStatusListener is a test tool used to wait for connections to drop by @@ -51,6 +52,8 @@ func (pl *peerStatusListener) onStatusChange(p *Peer) { } func (pl *peerStatusListener) waitForZeroConnections(t *testing.T, channels ...*Channel) bool { + t.Helper() + for { select { case <-pl.changes: @@ -59,7 +62,8 @@ func (pl *peerStatusListener) waitForZeroConnections(t *testing.T, channels ...* } case <-time.After(testutils.Timeout(500 * time.Millisecond)): - return assert.Fail(t, "Some connections are still open: %s", connectionStatus(channels)) + t.Fatalf("Some connections are still open: %s", connectionStatus(channels)) + return false } } } @@ -280,3 +284,64 @@ func TestIdleSweepMisconfiguration(t *testing.T) { assert.Nil(t, ch, "NewChannel should not return a channel") assert.Error(t, err, "NewChannel should fail") } + +func TestIdleSweepIgnoresConnectionsWithCalls(t *testing.T) { + ctx, cancel := NewContext(time.Second) + defer cancel() + + clientTicker := testutils.NewFakeTicker() + clock := testutils.NewStubClock(time.Now()) + + listener := newPeerStatusListener() + serverOpts := testutils.NewOpts(). + AddLogFilter("Skip closing idle Connection as it has pending calls.", 1). + SetOnPeerStatusChanged(listener.onStatusChange). + SetTimeNow(clock.Now). + SetTimeTicker(clientTicker.New). + SetMaxIdleTime(3 * time.Minute). + SetIdleCheckInterval(30 * time.Second). + NoRelay() + + testutils.WithTestServer(t, serverOpts, func(ts *testutils.TestServer) { + var ( + gotCall = make(chan struct{}) + block = make(chan struct{}) + ) + testutils.RegisterEcho(ts.Server(), func() { + close(gotCall) + <-block + }) + + clientOpts := testutils.NewOpts().SetOnPeerStatusChanged(listener.onStatusChange) + + // Client 1 will just ping, so we create a connection that should be closed. + c1 := ts.NewClient(clientOpts) + require.NoError(t, c1.Ping(ctx, ts.HostPort()), "Ping failed") + + // Client 2 will make a call that will be blocked. Wait for the call to be received. + c2CallComplete := make(chan struct{}) + c2 := ts.NewClient(clientOpts) + go func() { + testutils.AssertEcho(t, c2, ts.HostPort(), ts.ServiceName()) + close(c2CallComplete) + }() + <-gotCall + + // Ensure we have 2 connections on the server side. + assert.Equal(t, 2, ts.Server().IntrospectNumConnections(), "Expect connection to client 1 and client 2") + + // Let the idle checker to close client 1's connection. + clock.Elapse(5 * time.Minute) + clientTicker.Tick() + listener.waitForZeroConnections(t, c1) + + assert.Equal(t, 1, ts.Server().IntrospectNumConnections(), "Expect connection only to client 2") + + // Unblock the call. + close(block) + <-c2CallComplete + clock.Elapse(5 * time.Minute) + clientTicker.Tick() + listener.waitForZeroConnections(t, ts.Server(), c2) + }) +}