Skip to content

Commit

Permalink
Fix races in m3msg consumer test code (#2589)
Browse files Browse the repository at this point in the history
Fixes various data races in m3msg consumer, producer tests (followup after #2583, updates #2540).
Consumers are sharing message object pool, which makes go race detector go nuts - as a workaround, simply disabled pooling in src/msg/integration test suite.
Also, cleaned up some unnecessary indirection in message structs, should reduce GC overhead overall.
  • Loading branch information
vdarulis authored Sep 4, 2020
1 parent 98039ce commit fb5b4a9
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 26 deletions.
16 changes: 8 additions & 8 deletions src/msg/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestSharedConsumer(t *testing.T) {
}
}

func TestReplicatedConsumerx(t *testing.T) {
func TestReplicatedConsumer(t *testing.T) {
if testing.Short() {
t.SkipNow() // Just skip if we're doing a short run
}
Expand Down Expand Up @@ -129,9 +129,9 @@ func TestSharedConsumerWithDeadInstance(t *testing.T) {
s.Run(t, ctrl)
s.VerifyConsumers(t)
testConsumers := s.consumerServices[0].testConsumers
require.True(t, testConsumers[len(testConsumers)-1].consumed <= s.TotalMessages()*10/100)
require.True(t, testConsumers[len(testConsumers)-1].numConsumed() <= s.TotalMessages()*10/100)
testConsumers = s.consumerServices[1].testConsumers
require.True(t, testConsumers[len(testConsumers)-1].consumed <= s.TotalMessages()*20/100)
require.True(t, testConsumers[len(testConsumers)-1].numConsumed() <= s.TotalMessages()*20/100)
}
}

Expand Down Expand Up @@ -546,8 +546,8 @@ func TestRemoveConsumerService(t *testing.T) {
)
s.Run(t, ctrl)
s.VerifyConsumers(t)
require.Equal(t, msgPerShard*numberOfShards, len(s.consumerServices[0].consumed))
require.Equal(t, msgPerShard*numberOfShards, len(s.consumerServices[1].consumed))
require.Equal(t, msgPerShard*numberOfShards, s.consumerServices[0].numConsumed())
require.Equal(t, msgPerShard*numberOfShards, s.consumerServices[1].numConsumed())
}
}

Expand All @@ -574,8 +574,8 @@ func TestAddConsumerService(t *testing.T) {
},
)
s.Run(t, ctrl)
require.Equal(t, s.ExpectedNumMessages(), len(s.consumerServices[0].consumed))
require.Equal(t, s.ExpectedNumMessages(), len(s.consumerServices[1].consumed))
require.True(t, len(s.consumerServices[2].consumed) <= s.ExpectedNumMessages()*80/100)
require.Equal(t, s.ExpectedNumMessages(), s.consumerServices[0].numConsumed())
require.Equal(t, s.ExpectedNumMessages(), s.consumerServices[1].numConsumed())
require.True(t, s.consumerServices[2].numConsumed() <= s.ExpectedNumMessages()*80/100)
}
}
20 changes: 17 additions & 3 deletions src/msg/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (s *setup) Run(
func (s *setup) VerifyConsumers(t *testing.T) {
numWritesPerProducer := s.ExpectedNumMessages()
for _, cs := range s.consumerServices {
require.Equal(t, numWritesPerProducer, len(cs.consumed))
require.Equal(t, numWritesPerProducer, cs.numConsumed())
}
}

Expand Down Expand Up @@ -407,6 +407,13 @@ func (cs *testConsumerService) markConsumed(b []byte) {
cs.consumed[string(b)] = struct{}{}
}

func (cs *testConsumerService) numConsumed() int {
cs.Lock()
defer cs.Unlock()

return len(cs.consumed)
}

func (cs *testConsumerService) Close() {
for _, c := range cs.testConsumers {
c.Close()
Expand Down Expand Up @@ -437,6 +444,13 @@ func (c *testConsumer) Close() {
close(c.doneCh)
}

func (c *testConsumer) numConsumed() int {
c.Lock()
defer c.Unlock()

return c.consumed
}

func newTestConsumer(t *testing.T, cs *testConsumerService) *testConsumer {
consumerListener, err := consumer.NewListener("127.0.0.1:0", testConsumerOptions(t))
require.NoError(t, err)
Expand Down Expand Up @@ -539,8 +553,8 @@ writer:
topicName: topicName
topicWatchInitTimeout: 100ms
placementWatchInitTimeout: 100ms
messagePool:
size: 100
# FIXME: Consumers sharing the same pool trigger false-positives in race detector
messagePool: ~
messageRetry:
initialBackoff: 20ms
maxBackoff: 50ms
Expand Down
26 changes: 13 additions & 13 deletions src/msg/producer/ref_counted.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,24 @@ type OnFinalizeFn func(rm *RefCountedMessage)

// RefCountedMessage is a reference counted message.
type RefCountedMessage struct {
sync.RWMutex
mu sync.RWMutex
Message

size uint64
onFinalizeFn OnFinalizeFn

refCount *atomic.Int32
isDroppedOrConsumed *atomic.Bool
// RefCountedMessage must not be copied by value due to RWMutex,
// safe to store values here and not just pointers
refCount atomic.Int32
isDroppedOrConsumed atomic.Bool
}

// NewRefCountedMessage creates RefCountedMessage.
func NewRefCountedMessage(m Message, fn OnFinalizeFn) *RefCountedMessage {
return &RefCountedMessage{
Message: m,
refCount: atomic.NewInt32(0),
size: uint64(m.Size()),
onFinalizeFn: fn,
isDroppedOrConsumed: atomic.NewBool(false),
Message: m,
size: uint64(m.Size()),
onFinalizeFn: fn,
}
}

Expand Down Expand Up @@ -76,12 +76,12 @@ func (rm *RefCountedMessage) DecRef() {

// IncReads increments the reads count.
func (rm *RefCountedMessage) IncReads() {
rm.RLock()
rm.mu.RLock()
}

// DecReads decrements the reads count.
func (rm *RefCountedMessage) DecReads() {
rm.RUnlock()
rm.mu.RUnlock()
}

// NumRef returns the number of references remaining.
Expand All @@ -107,13 +107,13 @@ func (rm *RefCountedMessage) IsDroppedOrConsumed() bool {
func (rm *RefCountedMessage) finalize(r FinalizeReason) bool {
// NB: This lock prevents the message from being finalized when its still
// being read.
rm.Lock()
rm.mu.Lock()
if rm.isDroppedOrConsumed.Load() {
rm.Unlock()
rm.mu.Unlock()
return false
}
rm.isDroppedOrConsumed.Store(true)
rm.Unlock()
rm.mu.Unlock()
if rm.onFinalizeFn != nil {
rm.onFinalizeFn(rm)
}
Expand Down
4 changes: 2 additions & 2 deletions src/msg/producer/writer/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ type message struct {
retried int
// NB(cw) isAcked could be accessed concurrently by the background thread
// in message writer and acked by consumer service writers.
isAcked *atomic.Bool
// Safe to store value inside struct, as message is never copied by value
isAcked atomic.Bool
}

func newMessage() *message {
return &message{
retryAtNanos: 0,
retried: 0,
isAcked: atomic.NewBool(false),
}
}

Expand Down
76 changes: 76 additions & 0 deletions src/msg/producer/writer/message_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package writer

import (
"testing"

"github.com/m3db/m3/src/msg/producer"
)

var (
// BenchMessage prevents optimization
BenchMessage *message
// BenchBool prevents optimization
BenchBool bool
)

type emptyMessage struct{}

// Shard returns the shard of the message.
func (e emptyMessage) Shard() uint32 { return 0 }

// Bytes returns the bytes of the message.
func (e emptyMessage) Bytes() []byte { return nil }

// Size returns the size of the bytes of the message.
func (e emptyMessage) Size() int { return 0 }

// Finalize will be called by producer to indicate the end of its lifecycle.
func (e emptyMessage) Finalize(_ producer.FinalizeReason) {}

func BenchmarkMessageAtomics(b *testing.B) {
rm := producer.NewRefCountedMessage(emptyMessage{}, nil)
msg := newMessage()
for n := 0; n < b.N; n++ {
msg.Set(metadata{}, rm, 500)
rm.IncRef()
msg.Ack()
BenchBool = msg.IsAcked()
_, BenchBool = msg.Marshaler()
msg.Close()
BenchMessage = msg
}
}

func BenchmarkMessageAtomicsAllocs(b *testing.B) {
for n := 0; n < b.N; n++ {
rm := producer.NewRefCountedMessage(emptyMessage{}, nil)
msg := newMessage()
msg.Set(metadata{}, rm, 500)
rm.IncRef()
msg.Ack()
BenchBool = msg.IsAcked()
_, BenchBool = msg.Marshaler()
msg.Close()
BenchMessage = msg
}
}

0 comments on commit fb5b4a9

Please sign in to comment.