Skip to content

Commit

Permalink
Merge pull request #392 from nats-io/set_last_err_in_drain
Browse files Browse the repository at this point in the history
[FIXED] Set connection's last error if async error occurs during Drain
  • Loading branch information
kozlovic authored Sep 6, 2018
2 parents fb0396e + cf39c28 commit 0856137
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 3 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: go
sudo: false
go:
- 1.11.x
- 1.11
- 1.10.x
- 1.9.x
install:
Expand All @@ -18,4 +18,4 @@ before_script:
- megacheck -ignore "$(cat staticcheck.ignore)" ./...
script:
- go test -i -race ./...
- if [[ "$TRAVIS_GO_VERSION" == 1.11.* ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race ./...; fi
- if [[ "$TRAVIS_GO_VERSION" == 1.11 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race ./...; fi
3 changes: 2 additions & 1 deletion nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (

// Default Constants
const (
Version = "1.6.0"
Version = "1.6.1"
DefaultURL = "nats://localhost:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
Expand Down Expand Up @@ -3241,6 +3241,7 @@ func (nc *Conn) drainConnection() {
// for pushing errors with context.
pushErr := func(err error) {
nc.mu.Lock()
nc.err = err
if errCB != nil {
nc.ach.push(func() { errCB(nc, nil, err) })
}
Expand Down
49 changes: 49 additions & 0 deletions test/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package test

import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -394,3 +395,51 @@ func TestDrainConnectionAutoUnsub(t *testing.T) {
t.Fatalf("Timeout waiting for closed state for connection")
}
}

func TestDrainConnLastError(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()

done := make(chan bool, 1)
closedCb := func(nc *nats.Conn) {
done <- true
}

nc, err := nats.Connect(nats.DefaultURL,
nats.ClosedHandler(closedCb),
nats.DrainTimeout(time.Millisecond))
if err != nil {
t.Fatalf("Failed to create default connection: %v", err)
}
defer nc.Close()

wg := sync.WaitGroup{}
wg.Add(1)
if _, err := nc.Subscribe("foo", func(_ *nats.Msg) {
// So they back up a bit in client to make drain timeout
time.Sleep(100 * time.Millisecond)
wg.Done()

}); err != nil {
t.Fatalf("Error creating subscription; %v", err)
}

if err := nc.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
if err := nc.Drain(); err != nil {
t.Fatalf("Error on drain: %v", err)
}

select {
case <-done:
if e := nc.LastError(); e == nil || e != nats.ErrDrainTimeout {
t.Fatalf("Expected last error to be set to %v, got %v", nats.ErrDrainTimeout, e)
}
case <-time.After(2 * time.Second):
t.Fatalf("Timeout waiting for closed state for connection")
}

// Wait for subscription callback to return
wg.Wait()
}

0 comments on commit 0856137

Please sign in to comment.