-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow blackholing requests #681
Conversation
Codecov Report
@@ Coverage Diff @@
## dev #681 +/- ##
==========================================
+ Coverage 86.68% 86.87% +0.18%
==========================================
Files 39 39
Lines 3959 3961 +2
==========================================
+ Hits 3432 3441 +9
+ Misses 403 399 -4
+ Partials 124 121 -3
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather avoid a new channel allocation in this path. I think we can reuse response.cancel
as-is right now. Just call that method, and the current code will hit the ctx.Done()
case, and will expire the inbound.
No strong opinions on this, but instead of a new method, should this be a new error that we can detect in SendSystemError
?
I used I'd rather use this method than create a new error with I've also added a test, but it relies on the context timing out so its essentially a sleep for 10 milliseconds. I'm not sure what your thoughts on this are @prashantv, but I think that this is the most reasonable way for me to test the change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest eyes from other reviewers as well :)
mex.go
Outdated
@@ -390,7 +390,7 @@ func (mexset *messageExchangeSet) removeExchange(msgID uint32) { | |||
// will write to the send channel. | |||
func (mexset *messageExchangeSet) expireExchange(msgID uint32) { | |||
mexset.log.Debugf( | |||
"Removing %s message exchange %d due to timeout or cancelation", | |||
"Removing %s message exchange %d due to timeout, cancelation or blackhole", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cancellation*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. Corroboration for consistency (since technically both spellings are accepted):
tchannel-go ❯ git grep -i cancellation
CHANGELOG.md:* Fix context cancellation not cancelling Dial attempts (#541)
inbound.go: // TODO(prashant): This is an expensive way to check for cancellation. Use a heap for timeouts.
inbound.go: call.log.Debugf("Wait for timeout/cancellation interrupted by error: %v", call.mex.errCh.err)
mex.go:// It returns any existing errors (timeout, cancellation, connection errors).
mex.go: // 1. Timeouts/cancellation (mex.ctx errors)
mex.go: // 1. Timeouts/cancellation (mex.ctx errors)
tchannel-go ❯ git grep -i cancelation
mex.go: "Removing %s message exchange %d due to timeout or cancelation",
inbound.go
Outdated
@@ -361,6 +361,12 @@ func (response *InboundCallResponse) SetApplicationError() error { | |||
return nil | |||
} | |||
|
|||
// Blackhole indicates that there should be no response and provides | |||
// an opportunity to clean up resources. | |||
func (response *InboundCallResponse) Blackhole() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simpler than I thought it was going to be.
inbound_test.go
Outdated
clientCh := tServer.NewClient(testutils.NewOpts()) | ||
defer clientCh.Close() | ||
|
||
ctx, cancel := NewContext(time.Millisecond * 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use testutils.Timeout
here
inbound_test.go
Outdated
testutils.WithTestServer(t, nil, func(tServer *testutils.TestServer) { | ||
serviceName := tServer.ServiceName() | ||
|
||
server := tServer.NewServer(testutils.NewOpts()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you creating a new server here? the tServer
can have methods registered directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, my bad. I overlooked the register
function completely.
inbound_test.go
Outdated
inbound.Response().Blackhole() | ||
})) | ||
|
||
clientCh := tServer.NewClient(testutils.NewOpts()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: pass in nil
for default options to be consistent with other tests
inbound_test.go
Outdated
subCh := server.GetSubChannel(serviceName) | ||
subCh.SetHandler(HandlerFunc(func(ctx context.Context, inbound *InboundCall) { | ||
// blackhole all requests | ||
inbound.Response().Blackhole() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you replace the implementation of Blackhole
with a time.Sleep
, the test will still pass.
diff --git a/inbound.go b/inbound.go
index a7f7739b..605bc52f 100644
--- a/inbound.go
+++ b/inbound.go
@@ -364,7 +364,8 @@ func (response *InboundCallResponse) SetApplicationError() error {
// Blackhole indicates that there should be no response and provides
// an opportunity to clean up resources.
func (response *InboundCallResponse) Blackhole() {
- response.cancel()
+ time.Sleep(200 * time.Millisecond)
+ //response.cancel()
}
> go test -v -run TestBlackhole
=== RUN TestBlackhole
--- PASS: TestBlackhole (0.42s)
PASS
ok github.com/uber/tchannel-go 0.439s
That means the test isn't covering that the blackhole implementation is more efficient than a non-blackhole implementation.
Some suggests on how the test can actually validate it:
-
Before the
Blackhole
call, if you introspect, you should see that there's a pending message exchange
https://godoc.org/github.com/uber/tchannel-go#ExchangeSetRuntimeState -
After the
Blackhole
call, there should be no exchanges.
Instead of using a short timeout, you could actually:
- make a call with a huge timeout (e.g., 1 hour), so we know the exchange isn't being cleared due to timeout
- after the
Blackhole
command, we can cancel the client context, so the client returns
inbound.go
Outdated
@@ -361,6 +361,12 @@ func (response *InboundCallResponse) SetApplicationError() error { | |||
return nil | |||
} | |||
|
|||
// Blackhole indicates that there should be no response and provides |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more context here,
// Blackhole indicates no response will be sent, and cleans up any resources associated with this request. This allows for services to trigger a timeout in clients without holding on to any goroutines on the server.
This enables us to blackhole requests and release all long-lived go routines. This needs more testing, but I've confirmed with pprof + YARPC that the only goroutines left running are per connection. Relates to yarpc/yarpc-go/#1436.
c4f282a
to
46248c9
Compare
Updated with your suggestions @prashantv. Since the message exchange cleanup happens in a separate goroutine, I still found the need to add |
inbound_test.go
Outdated
inbound.Response().Blackhole() | ||
|
||
// give time for exchange to cleanup | ||
time.Sleep(10 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of sleep, take a look at testutils.WaitFor
-- it's a lot less flaky for testing since it'll sleep a small amount initially, check whether what we're waiting for is true, if not, sleep a little longer, and repeat.
The issue with a single time.Sleep
is that it could finish in 1ms, in which case you're wasting 9ms, or it could take 11ms, in which case the test becomes flaky and fails. The only way to avoid flakiness is to use a huge value like a second or more, but then you're likely wasting 900ms for each run just doing nothing.
WaitFor
is a compromise that lets you sleep (since some things can't be tested without waiting for some background work to be scheduled), without making the test slower or flaky.
inbound_test.go
Outdated
defer clientCh.Close() | ||
|
||
_, _, _, err := raw.Call(ctx, clientCh, server.HostPort(), serviceName, handlerName, nil, nil) | ||
require.Error(t, err, "expected to timeout") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're actually expecting a cancelled error, since you call cancel
.
Can you also add a assert.Equal(t, ErrCodeCanceled, GetSystemErrorCode(err))
(or similar) to make sure we're getting an error for the right reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pleasantly surprised at how small this change turned out to be!
inbound_test.go
Outdated
require.Error(t, err, "expected call error") | ||
|
||
errCode := GetSystemErrorCode(err) | ||
assert.Equal(t, ErrCodeCancelled, errCode, "expected cancelled error code, got: %q", errCode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for "got %v", assert
will print the value it expected and what it got. can just make error message something like "got unexpected error code"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed in person, this would output the byte
value (eg. 0x02
) since SystemErrCode
is an alias of byte
. I'll update with a comment to indicate why I wrote it out this way.
This enables us to blackhole requests and release unnecessary long-lived goroutines.
Confirmed with pprof and YARPC that the only goroutines left running are for each connection.
Relates to yarpc/yarpc-go#1436.