-
Notifications
You must be signed in to change notification settings - Fork 700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
More protocol friendly request pattern. Fix for #294. #295
Conversation
This request pattern is sematically the same but utilizes a single wildcard subscription and the last token context for the response subject binding to each request. Since NATS floods interest (subscriptions), the old requestor pattern would create a new inbox subscription for each request. It would then auto-unsubscribe and delete the subscription after the response was received, causing quite a bit of protocol traffic. So although this does have a performance gain as can be seen in the benchmarks, it's implementation is specifically designed to be more friendly for NATS clusters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments. Also, what about the new WithContext mode?
nats.go
Outdated
// DisconnectedCB sets the disconnected handler called whenever we | ||
// are disconnected. | ||
// DisconnectedCB sets the disconnected handler that is called | ||
// whenever we are disconnected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we replaced "we are" by "the client is"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
nats.go
Outdated
Dialer *net.Dialer | ||
|
||
// UseOldRequestStyle force older method of Requests that utilize a new Inbox |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that you have changed some of the verbs for other comments, so should it be "forces" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
nats.go
Outdated
@@ -1941,10 +1961,122 @@ func (nc *Conn) publish(subj, reply string, data []byte) error { | |||
return nil | |||
} | |||
|
|||
// Request will create an Inbox and perform a Request() call | |||
// respHandler is the global respnse handler. It will look up | |||
// the apprioriate channel based on the last token and place |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typos: "response handler" and "appropriate"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, wonder why misspell did not catch?
} | ||
|
||
// Grab mch | ||
mch := nc.respMap[rt] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check for nil
here? We remove the channel from the map next line, so if the responder was to send 2 responses, the second response would get a nil channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sending to a nil channel will block forever, so select below will do the right thing.
// Don't block, let Request timeout instead, mch is | ||
// buffered and we should delete the key before a | ||
// second response is processed. | ||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we allow only 1 response per request, then why make the backing channel more than size 1, and again, since there is only 1 response (we remove the channel from the map), we should be sure to be able to put the message in the channel and not need the select, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above.
nats.go
Outdated
// We could be racing here. So will we double check | ||
// respMux here and discard the new one if set. | ||
nc.mu.Lock() | ||
defer nc.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use defer
here because otherwise you will call s.Unsubscribe()
under nc.mu.Lock()
which will deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
nc.respMap = make(map[string]chan *Msg) | ||
} else { | ||
// Discard duplicate, don't set others. | ||
defer s.Unsubscribe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to explicitly release lock before calling s.Unsubscribe()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, your test of simultaneous requests did expose that (I was lucky that I got it the first time, but sometimes it would pass, but most times it would not on my machine).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes my slow laptop on the plane always passed but my office machine shows it. Good catch. Will fix later tonight when the schedule frees up..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And travis is even slower so they all passed too, but once on a fast machine you see it right away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least I did write the test.. ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} | ||
} | ||
// Create literal Inbox and map to a chan msg. | ||
mch := make(chan *Msg, RequestChanLen) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just size 1 here since we handle only 1 response in response handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
paranoia around blocking channels, prefer buffered ones, even if small for safety. I also believe it changes behavior of the Go scheduler to some degree when unbuffered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not say unbuffered, since we would lose the message or block. I am saying why not size 1 (RequestChanLen is 8 I believe).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe 1 or 8 is same underneath the covers memory wise.. But I could be wrong, I noticed I used 8 elsewhere and figured I picked it for a reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine, I just wanted to make sure that I understood the code correctly and that only one message should ever be put into the channel.
I will take a look. Thought we had spell checks, etc. Good call on context, will take a look to make sure we have the appropriate tests in place, etc. |
nats.go
Outdated
return nil | ||
} | ||
|
||
// New style request that will mux on a single Subscription. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not true if UseOldRequestStyle
is set.
Since this is public, seems like the docstring should describe what this does/is for rather than impl details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, is there a reason we need to expose the old style? There shouldn't be any compatibility issues with the switch, right? New style vs. old style is confusing from an API "UX" perspective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point on comment for public doc, will update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is such a big change, I want to make sure clients who need to go revert back to the old way. Adding option was simple and will not affect any code unless the client is stuck for some unknown reason and wants the old way. Only situation I can think of is use of circuit breaker with 1:N where all N respond and N is large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update doc, keeping ability to set option to force old style semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only comments are related to code coverage, otherwise, LGTM.
@@ -16,7 +16,58 @@ func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte | |||
if ctx == nil { | |||
return nil, ErrInvalidContext | |||
} | |||
if nc == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code coverage wise, this is not tested.
nc.mu.Unlock() | ||
|
||
// If user wants the old style. | ||
if useOldRequestStyle { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code coverage wise, this is not tested.
|
||
// oldRequestWithContext utilizes inbox and subscription per request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole function shows up as not covered, which brings the code coverage of this file down to 71%.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix..
// Request will send a request payload and deliver the response message, | ||
// or an error, including a timeout if no message was received properly. | ||
func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) { | ||
if nc == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test for this?
select { | ||
case msg, ok = <-mch: | ||
if !ok { | ||
return nil, ErrConnectionClosed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would like to see a test where we spawn a go routine that will close the connection after a delay, and issue a request in while there is no responder running, and make sure that Request() returns ErrConnectionClosed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, but hard to make timing work correctly I think. I did consider it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about something like this:
...
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(250 * time.Millisecond)
nc.Close()
}()
if _, err := nc.Request("foo", []byte("help"), 3*time.Second); err != nats.ErrInvalidConnection || err != nats.ErrConnectionClosed {
t.Fatalf("Unexpected error: %v", err)
}
wg.Wait()
The test for nats.ErrInvalidConnection
would be to avoid a flapping test where the connection would be closed in the go routine before the Request()
call has a chance to execute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That may work..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I quickly tried that and looks like the Request is not kicked out...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently this always gives us a timeout error, by design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we would never get this ErrConnectionClosed for a closed channel, since this channel is never closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, if you look at Close(), we do go through the list of subscriptions and close the mch or signal in case of async. So NextMsg() (in old Request style) would get kicked out and will get ErrConnectionClosed. You are changing the behavior in that the Request will now return ErrConnectionClosed but only after waiting for the whole timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, fixed.
@kozlovic @tylertreat @wallyqs Take a look now, should be good. |
I will add some tests for RequestContext and switch to old style, and check for nil. |
Am I good for a merge? /cc @kozlovic @tylertreat @wallyqs |
lgtm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should make an effort to close all channels from respMux in connection close.
nats.go
Outdated
return nil, ErrConnectionClosed | ||
} | ||
case <-t.C: | ||
if nc.isClosed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is right (see previous comment). Also, this version of is closed is not locked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And for the test I described above (doing the close in go routine), I would also take the time before the request and after it returns and make sure that it is less than the timeout given to the request call, to make sure that the request is really kicked out ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, fixed.
nats.go
Outdated
close(ch) | ||
} | ||
} | ||
nc.pongs = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you meant nc.respMap here?
nats.go
Outdated
@@ -2068,7 +2070,7 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, | |||
return nil, ErrConnectionClosed | |||
} | |||
case <-t.C: | |||
if nc.isClosed() { | |||
if nc.IsClosed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not put that here. It is trued that there is possibly a race that the connection is closed while the request just timed-out, but we don't do that in NextMsg, etc..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok fixed.
nats.go
Outdated
// closing channel. | ||
if nc.isClosed() { | ||
nc.mu.Unlock() | ||
close(mch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is close of a nil channel valid? (remember, mch could be nil here...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Believe so..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually no, but removed it anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it will panic, so this needs to be fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@derekcollison Seems that you missed above comment: closing a nil channel will panic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted back to just returning.
nats.go
Outdated
for _, ch := range nc.respMap { | ||
if ch != nil { | ||
close(ch) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should empty the map. In a test, you got this:
github.com/nats-io/go-nats.(*Conn).respHandler(0xc420406a00, 0xc420ac21e0)
/home/travis/gopath/src/github.com/nats-io/go-nats/nats.go:1982 +0x23a
github.com/nats-io/go-nats.(*Conn).(github.com/nats-io/go-nats.respHandler)-fm(0xc420ac21e0)
/home/travis/gopath/src/github.com/nats-io/go-nats/nats.go:2004 +0x4c
github.com/nats-io/go-nats.(*Conn).waitForMsgs(0xc420406a00, 0xc4200f6f00)
/home/travis/gopath/src/github.com/nats-io/go-nats/nats.go:1559 +0x1f9
created by github.com/nats-io/go-nats.(*Conn).subscribe
/home/travis/gopath/src/github.com/nats-io/go-nats/nats.go:2223 +0x3d9
FAIL github.com/nats-io/go-nats/test 6.891s
? github.com/nats-io/go-nats/util [no test files]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's because the channel was closed, but left in the map, then respHandler tried to close again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
LGTM! |
Sorry that was so much a PITA.. old and rusty, uug. |
This request pattern is sematically the same but utilizes a single
wildcard subscription and the last token context for the response
subject binding to each request. Since NATS floods interest (subscriptions),
the old requestor pattern would create a new inbox subscription for each request.
It would then auto-unsubscribe and delete the subscription after the
response was received, causing quite a bit of protocol traffic.
So although this does have a performance gain as can be seen in the benchmarks,
it's implementation is specifically designed to be more friendly for NATS
clusters.