Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
pkg/*: fix data races in PubSub and ticker test (#4155)
Browse files Browse the repository at this point in the history
Fixes a data race in PubSub singleton instantiation
where the singleton object creation is not protected
from concurrent initialization.
Fixes a data race in the ticker test where a variable
is written/read to/from without protected access from
multiple goroutines.

Signed-off-by: Shashank Ram <[email protected]>
  • Loading branch information
shashankram authored Sep 22, 2021
1 parent 23a8f5f commit 2d2be96
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
9 changes: 7 additions & 2 deletions pkg/k8s/events/event_pubsub.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package events

import (
"sync"

"github.com/cskr/pubsub"

"github.com/openservicemesh/osm/pkg/announcements"
Expand All @@ -14,6 +16,9 @@ const (
var (
// Globally accessible instance, through singleton pattern using getPubSubInstance()
pubSubInstance *pubsub.PubSub

// pubSubOnce is used to ensure PubSub object creation happens just once
pubSubOnce sync.Once
)

// Subscribe is the Subscribe implementation for PubSub
Expand Down Expand Up @@ -59,8 +64,8 @@ func Unsub(unsubChan chan interface{}) {
// Note that spawning the instance is not thread-safe. First call should happen on
// a single-routine context to avoid races.
func getPubSubInstance() *pubsub.PubSub {
if pubSubInstance == nil {
pubSubOnce.Do(func() {
pubSubInstance = pubsub.New(defaultAnnouncementChannelSize)
}
})
return pubSubInstance
}
8 changes: 8 additions & 0 deletions pkg/ticker/ticker_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ticker

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -44,14 +45,17 @@ func TestTicker(t *testing.T) {
broadcastEvents := events.Subscribe(announcements.ScheduleProxyBroadcast)
defer events.Unsub(broadcastEvents)

var counterMutex sync.Mutex
broadcastsReceived := 0
stop := make(chan struct{})
defer close(stop)
go func() {
for {
select {
case <-broadcastEvents:
counterMutex.Lock()
broadcastsReceived++
counterMutex.Unlock()
case <-stop:
return
}
Expand All @@ -73,6 +77,8 @@ func TestTicker(t *testing.T) {

// broadcast events should increase in the next few seconds
assert.Eventually(func() bool {
counterMutex.Lock()
defer counterMutex.Unlock()
return broadcastsReceived > 0
}, 3*time.Second, 500*time.Millisecond)

Expand All @@ -83,6 +89,8 @@ func TestTicker(t *testing.T) {

// Should stop increasing
assert.Eventually(func() bool {
counterMutex.Lock()
defer counterMutex.Unlock()
firstRead := broadcastsReceived
time.Sleep(1 * time.Second)
secondRead := broadcastsReceived
Expand Down

0 comments on commit 2d2be96

Please sign in to comment.