Skip to content

Commit

Permalink
[FIXED] Data race between processMsg() and Stats()
Browse files Browse the repository at this point in the history
Resolves #520

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
kozlovic committed Sep 16, 2019
1 parent 6ffcfbc commit dee505b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 8 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ install:
before_script:
- $(exit $(go fmt ./... | wc -l))
- go vet ./...
- misspell -error -locale US .
- find . -type f -name "*.go" | xargs misspell -error -locale US
- staticcheck ./...
script:
- go test -i -race ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -race ./...; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi
11 changes: 9 additions & 2 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3884,15 +3884,22 @@ func (nc *Conn) Stats() Statistics {
// Stats are updated either under connection's mu or subsMu mutexes.
// Lock both to safely get them.
nc.mu.Lock()
nc.subsMu.RLock()
// Note: It would be more logical to get subsMu's read lock here since we are
// "reading" the stats. However, in processMsg() we don't lock the connection
// (unless we detect a slow consumer) but we do get subsMu read lock to fetch
// the subscription the incoming message is for. We do update InMsgs/Bytes under
// that read lock. Swithching processMsg()'s subsMu to write lock would decrease
// performance, so instead, to avoid the data race, we simply get subsMu's write
// lock here.
nc.subsMu.Lock()
stats := Statistics{
InMsgs: nc.InMsgs,
InBytes: nc.InBytes,
OutMsgs: nc.OutMsgs,
OutBytes: nc.OutBytes,
Reconnects: nc.Reconnects,
}
nc.subsMu.RUnlock()
nc.subsMu.Unlock()
nc.mu.Unlock()
return stats
}
Expand Down
36 changes: 36 additions & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2181,3 +2181,39 @@ func TestAuthErrorOnReconnect(t *testing.T) {
t.Fatalf("Wrong status: %d\n", nc.Status())
}
}

func TestStatsRace(t *testing.T) {
o := natsserver.DefaultTestOptions
o.Port = -1
s := RunServerWithOptions(&o)
defer s.Shutdown()

nc, err := Connect(fmt.Sprintf("nats://%s:%d", o.Host, o.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan bool)
go func() {
defer wg.Done()
for {
select {
case <-ch:
return
default:
nc.Stats()
}
}
}()

nc.Subscribe("foo", func(_ *Msg) {})
for i := 0; i < 1000; i++ {
nc.Publish("foo", []byte("hello"))
}

close(ch)
wg.Wait()
}
8 changes: 4 additions & 4 deletions scripts/cov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

rm -rf ./cov
mkdir cov
go test -v -race -covermode=atomic -coverprofile=./cov/nats.out
go test -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test
go test -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin
go test -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto
go test --failfast -v -race -covermode=atomic -coverprofile=./cov/nats.out
go test --failfast -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test
go test --failfast -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin
go test --failfast -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto
gocovmerge ./cov/*.out > acc.out
rm -rf ./cov

Expand Down

0 comments on commit dee505b

Please sign in to comment.