Skip to content

Commit

Permalink
[FIXED] Data race between processMsg() and Stats()
Browse files Browse the repository at this point in the history
Resolves #520

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
kozlovic committed Sep 17, 2019
1 parent 6ffcfbc commit b4437f1
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 14 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ install:
before_script:
- $(exit $(go fmt ./... | wc -l))
- go vet ./...
- misspell -error -locale US .
- find . -type f -name "*.go" | xargs misspell -error -locale US
- staticcheck ./...
script:
- go test -i -race ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -race ./...; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi
14 changes: 6 additions & 8 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2164,8 +2164,8 @@ func (nc *Conn) processMsg(data []byte) {
nc.subsMu.RLock()

// Stats
nc.InMsgs++
nc.InBytes += uint64(len(data))
atomic.AddUint64(&nc.InMsgs, 1)
atomic.AddUint64(&nc.InBytes, uint64(len(data)))

sub := nc.subs[nc.ps.ma.sid]
if sub == nil {
Expand Down Expand Up @@ -3881,18 +3881,16 @@ func (nc *Conn) isDrainingPubs() bool {

// Stats will return a race safe copy of the Statistics section for the connection.
func (nc *Conn) Stats() Statistics {
// Stats are updated either under connection's mu or subsMu mutexes.
// Lock both to safely get them.
// Stats are updated either under connection's mu or with atomic operations
// for inbound stats in processMsg().
nc.mu.Lock()
nc.subsMu.RLock()
stats := Statistics{
InMsgs: nc.InMsgs,
InBytes: nc.InBytes,
InMsgs: atomic.LoadUint64(&nc.InMsgs),
InBytes: atomic.LoadUint64(&nc.InBytes),
OutMsgs: nc.OutMsgs,
OutBytes: nc.OutBytes,
Reconnects: nc.Reconnects,
}
nc.subsMu.RUnlock()
nc.mu.Unlock()
return stats
}
Expand Down
36 changes: 36 additions & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2181,3 +2181,39 @@ func TestAuthErrorOnReconnect(t *testing.T) {
t.Fatalf("Wrong status: %d\n", nc.Status())
}
}

func TestStatsRace(t *testing.T) {
o := natsserver.DefaultTestOptions
o.Port = -1
s := RunServerWithOptions(&o)
defer s.Shutdown()

nc, err := Connect(fmt.Sprintf("nats://%s:%d", o.Host, o.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan bool)
go func() {
defer wg.Done()
for {
select {
case <-ch:
return
default:
nc.Stats()
}
}
}()

nc.Subscribe("foo", func(_ *Msg) {})
for i := 0; i < 1000; i++ {
nc.Publish("foo", []byte("hello"))
}

close(ch)
wg.Wait()
}
8 changes: 4 additions & 4 deletions scripts/cov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

rm -rf ./cov
mkdir cov
go test -v -race -covermode=atomic -coverprofile=./cov/nats.out
go test -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test
go test -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin
go test -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto
go test --failfast -v -race -covermode=atomic -coverprofile=./cov/nats.out
go test --failfast -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test
go test --failfast -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin
go test --failfast -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto
gocovmerge ./cov/*.out > acc.out
rm -rf ./cov

Expand Down

0 comments on commit b4437f1

Please sign in to comment.