diff --git a/.travis.yml b/.travis.yml index 9e73bb2dd..2594b74ea 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,8 +16,8 @@ install: before_script: - $(exit $(go fmt ./... | wc -l)) - go vet ./... -- misspell -error -locale US . +- find . -type f -name "*.go" | xargs misspell -error -locale US - staticcheck ./... script: - go test -i -race ./... -- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -race ./...; fi +- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi diff --git a/nats.go b/nats.go index b3a87449a..4a7826c77 100644 --- a/nats.go +++ b/nats.go @@ -3884,7 +3884,14 @@ func (nc *Conn) Stats() Statistics { // Stats are updated either under connection's mu or subsMu mutexes. // Lock both to safely get them. nc.mu.Lock() - nc.subsMu.RLock() + // Note: It would be more logical to get subsMu's read lock here since we are + // "reading" the stats. However, in processMsg() we don't lock the connection + // (unless we detect a slow consumer) but we do get subsMu read lock to fetch + // the subscription the incoming message is for. We do update InMsgs/Bytes under + // that read lock. Swithching processMsg()'s subsMu to write lock would decrease + // performance, so instead, to avoid the data race, we simply get subsMu's write + // lock here. + nc.subsMu.Lock() stats := Statistics{ InMsgs: nc.InMsgs, InBytes: nc.InBytes, @@ -3892,7 +3899,7 @@ func (nc *Conn) Stats() Statistics { OutBytes: nc.OutBytes, Reconnects: nc.Reconnects, } - nc.subsMu.RUnlock() + nc.subsMu.Unlock() nc.mu.Unlock() return stats } diff --git a/nats_test.go b/nats_test.go index f62f16f2c..0eb649e30 100644 --- a/nats_test.go +++ b/nats_test.go @@ -2181,3 +2181,39 @@ func TestAuthErrorOnReconnect(t *testing.T) { t.Fatalf("Wrong status: %d\n", nc.Status()) } } + +func TestStatsRace(t *testing.T) { + o := natsserver.DefaultTestOptions + o.Port = -1 + s := RunServerWithOptions(&o) + defer s.Shutdown() + + nc, err := Connect(fmt.Sprintf("nats://%s:%d", o.Host, o.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + wg := sync.WaitGroup{} + wg.Add(1) + ch := make(chan bool) + go func() { + defer wg.Done() + for { + select { + case <-ch: + return + default: + nc.Stats() + } + } + }() + + nc.Subscribe("foo", func(_ *Msg) {}) + for i := 0; i < 1000; i++ { + nc.Publish("foo", []byte("hello")) + } + + close(ch) + wg.Wait() +} diff --git a/scripts/cov.sh b/scripts/cov.sh index 398b4657d..760e42f06 100755 --- a/scripts/cov.sh +++ b/scripts/cov.sh @@ -3,10 +3,10 @@ rm -rf ./cov mkdir cov -go test -v -race -covermode=atomic -coverprofile=./cov/nats.out -go test -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test -go test -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin -go test -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto +go test --failfast -v -race -covermode=atomic -coverprofile=./cov/nats.out +go test --failfast -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test +go test --failfast -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin +go test --failfast -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto gocovmerge ./cov/*.out > acc.out rm -rf ./cov