Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More protocol friendly request pattern. Fix for #294. #295

Merged
merged 10 commits into from
May 27, 2017
33 changes: 26 additions & 7 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,18 +1968,20 @@ func (nc *Conn) respHandler(m *Msg) {
rt := respToken(m.Subject)

nc.mu.Lock()
// Just return if closed, let Request timeout.
if nc.isClosed() {
nc.mu.Unlock()
return
}

// Grab mch
mch := nc.respMap[rt]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check for nil here? We remove the channel from the map next line, so if the responder was to send 2 responses, the second response would get a nil channel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending to a nil channel will block forever, so select below will do the right thing.

// Delete the key regardless, one response only.
// FIXME(dlc) - should we track responses past 1
// just statistics wise?
delete(nc.respMap, rt)

// Just return if closed, kick out Request by
// closing channel.
if nc.isClosed() {
nc.mu.Unlock()
close(mch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is close of a nil channel valid? (remember, mch could be nil here...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Believe so..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no, but removed it anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it will panic, so this needs to be fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@derekcollison Seems that you missed above comment: closing a nil channel will panic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted back to just returning.

return
}
nc.mu.Unlock()

// Don't block, let Request timeout instead, mch is
Expand Down Expand Up @@ -2068,7 +2070,7 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg,
return nil, ErrConnectionClosed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to see a test where we spawn a go routine that will close the connection after a delay, and issue a request in while there is no responder running, and make sure that Request() returns ErrConnectionClosed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, but hard to make timing work correctly I think. I did consider it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about something like this:

...
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
    defer wg.Done()
    time.Sleep(250 * time.Millisecond)
    nc.Close()
}()
if _, err := nc.Request("foo", []byte("help"), 3*time.Second); err != nats.ErrInvalidConnection || err != nats.ErrConnectionClosed {
    t.Fatalf("Unexpected error: %v", err)
}
wg.Wait()

The test for nats.ErrInvalidConnection would be to avoid a flapping test where the connection would be closed in the go routine before the Request() call has a chance to execute.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That may work..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I quickly tried that and looks like the Request is not kicked out...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this always gives us a timeout error, by design.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we would never get this ErrConnectionClosed for a closed channel, since this channel is never closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if you look at Close(), we do go through the list of subscriptions and close the mch or signal in case of async. So NextMsg() (in old Request style) would get kicked out and will get ErrConnectionClosed. You are changing the behavior in that the Request will now return ErrConnectionClosed but only after waiting for the whole timeout.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, fixed.

}
case <-t.C:
if nc.isClosed() {
if nc.IsClosed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not put that here. It is trued that there is possibly a race that the connection is closed while the request just timed-out, but we don't do that in NextMsg, etc..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fixed.

return nil, ErrConnectionClosed
}
return nil, ErrTimeout
Expand Down Expand Up @@ -2746,6 +2748,20 @@ func (nc *Conn) clearPendingFlushCalls() {
nc.pongs = nil
}

// This will clear any pending Request calls.
// Lock is assumed to be held by the caller.
func (nc *Conn) clearPendingRequestCalls() {
if nc.respMap == nil {
return
}
for _, ch := range nc.respMap {
if ch != nil {
close(ch)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should empty the map. In a test, you got this:

github.com/nats-io/go-nats.(*Conn).respHandler(0xc420406a00, 0xc420ac21e0)
	/home/travis/gopath/src/github.com/nats-io/go-nats/nats.go:1982 +0x23a
github.com/nats-io/go-nats.(*Conn).(github.com/nats-io/go-nats.respHandler)-fm(0xc420ac21e0)
	/home/travis/gopath/src/github.com/nats-io/go-nats/nats.go:2004 +0x4c
github.com/nats-io/go-nats.(*Conn).waitForMsgs(0xc420406a00, 0xc4200f6f00)
	/home/travis/gopath/src/github.com/nats-io/go-nats/nats.go:1559 +0x1f9
created by github.com/nats-io/go-nats.(*Conn).subscribe
	/home/travis/gopath/src/github.com/nats-io/go-nats/nats.go:2223 +0x3d9
FAIL	github.com/nats-io/go-nats/test	6.891s
?   	github.com/nats-io/go-nats/util	[no test files]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because the channel was closed, but left in the map, then respHandler tried to close again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
nc.pongs = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you meant nc.respMap here?

}

// Low level close call that will do correct cleanup and set
// desired status. Also controls whether user defined callbacks
// will be triggered. The lock should not be held entering this
Expand All @@ -2768,6 +2784,9 @@ func (nc *Conn) close(status Status, doCBs bool) {
// Clear any queued pongs, e.g. pending flush calls.
nc.clearPendingFlushCalls()

// Clear any queued and blocking Requests.
nc.clearPendingRequestCalls()

if nc.ptmr != nil {
nc.ptmr.Stop()
}
Expand Down
2 changes: 1 addition & 1 deletion test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func TestRequestClose(t *testing.T) {
time.Sleep(100 * time.Millisecond)
nc.Close()
}()
if _, err := nc.Request("foo", []byte("help"), 500*time.Millisecond); err != nats.ErrInvalidConnection && err != nats.ErrConnectionClosed {
if _, err := nc.Request("foo", []byte("help"), 2*time.Second); err != nats.ErrInvalidConnection && err != nats.ErrConnectionClosed {
t.Fatalf("Expected connection error: got %v", err)
}
wg.Wait()
Expand Down