Skip to content

Commit

Permalink
Allow blackholing requests (#681)
Browse files Browse the repository at this point in the history
This enables us to blackhole requests and release all long-lived
go routines.

Relates to yarpc/yarpc-go/#1436
  • Loading branch information
peats-bond authored and prashantv committed Mar 23, 2018
1 parent 75acd77 commit 0cf15e0
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 1 deletion.
7 changes: 7 additions & 0 deletions inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,13 @@ func (response *InboundCallResponse) SetApplicationError() error {
return nil
}

// Blackhole indicates no response will be sent, and cleans up any resources
// associated with this request. This allows for services to trigger a timeout in
// clients without holding on to any goroutines on the server.
func (response *InboundCallResponse) Blackhole() {
response.cancel()
}

// Arg2Writer returns a WriteCloser that can be used to write the second argument.
// The returned writer must be closed once the write is complete.
func (response *InboundCallResponse) Arg2Writer() (ArgWriter, error) {
Expand Down
43 changes: 43 additions & 0 deletions inbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,46 @@ func TestInboundConnection_CallOptions(t *testing.T) {
require.NoError(t, err, "Call through proxy failed")
})
}

func TestBlackhole(t *testing.T) {
ctx, cancel := NewContext(testutils.Timeout(time.Hour))

testutils.WithTestServer(t, nil, func(server *testutils.TestServer) {
serviceName := server.ServiceName()
handlerName := "test-handler"

server.Register(HandlerFunc(func(ctx context.Context, inbound *InboundCall) {
// cancel client context in handler so the client can return after being blackholed
defer cancel()

c, _ := InboundConnection(inbound)
require.NotNil(t, c)

state := c.IntrospectState(&IntrospectionOptions{})
require.Equal(t, 1, state.InboundExchange.Count, "expected exactly one inbound exchange")

// blackhole request
inbound.Response().Blackhole()

// give time for exchange to cleanup
require.True(t, testutils.WaitFor(10*time.Millisecond, func() bool {
state = c.IntrospectState(&IntrospectionOptions{})
return state.InboundExchange.Count == 0
}),
"expected no inbound exchanges",
)

}), handlerName)

clientCh := server.NewClient(nil)
defer clientCh.Close()

_, _, _, err := raw.Call(ctx, clientCh, server.HostPort(), serviceName, handlerName, nil, nil)
require.Error(t, err, "expected call error")

errCode := GetSystemErrorCode(err)
// Providing 'got: %q' is necessary since SystemErrCode is a type alias of byte; testify's
// failed test ouput would otherwise print out hex codes.
assert.Equal(t, ErrCodeCancelled, errCode, "expected cancelled error code, got: %q", errCode)
})
}
2 changes: 1 addition & 1 deletion mex.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (mexset *messageExchangeSet) removeExchange(msgID uint32) {
// will write to the send channel.
func (mexset *messageExchangeSet) expireExchange(msgID uint32) {
mexset.log.Debugf(
"Removing %s message exchange %d due to timeout or cancelation",
"Removing %s message exchange %d due to timeout, cancellation or blackhole",
mexset.name,
msgID,
)
Expand Down

0 comments on commit 0cf15e0

Please sign in to comment.