Skip to content

Commit

Permalink
Merge pull request #94 from SkySoft-ATM/broadcaster_bug
Browse files Browse the repository at this point in the history
bug fix on broadcasters: client was stuck when calling unregister aft…
  • Loading branch information
matgabriel authored Jun 30, 2020
2 parents 18fe1cd + 982076a commit 51a3ed5
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 14 deletions.
22 changes: 16 additions & 6 deletions mux/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package mux

import (
"fmt"
"sync/atomic"
)

type Broadcaster struct {
Expand All @@ -21,7 +20,7 @@ type Broadcaster struct {
unreg chan unregistration
outputs map[chan<- interface{}]ConsumerConfig
*BroadcasterConfig
closed uint32
closed chan interface{}
}

// Register a new channel to receive broadcasts
Expand All @@ -40,20 +39,28 @@ func (b *Broadcaster) Register(newch chan<- interface{}, options ...ConsumerOpti
// Unregister a channel so that it no longer receives broadcasts.
func (b *Broadcaster) Unregister(newch chan<- interface{}) {
done := make(chan struct{})
b.unreg <- unregistration{newch, done}
<-done
select {
case b.unreg <- unregistration{newch, done}:
<-done
case <-b.closed:
return
}
}

// Shut this StateBroadcaster down.
func (b *Broadcaster) Close() {
atomic.StoreUint32(&b.closed, 1)
closed := make(chan struct{})
b.closeReq <- closed
<-closed
}

func (b *Broadcaster) Closed() bool {
return atomic.LoadUint32(&b.closed) > 0
select {
case <-b.closed:
return true
default:
return false
}
}

// Submit a new object to all subscribers, this call can block if the input channel is full
Expand Down Expand Up @@ -104,12 +111,14 @@ func (b *Broadcaster) run() {
case r := <-b.reg:
b.addSubscriber(r)
case closed := <-b.closeReq:
close(b.closed)
closed <- struct{}{}
return
}
} else {
select {
case closed := <-b.closeReq:
close(b.closed)
close(b.input)
if len(b.outputs) == 0 {
closed <- struct{}{}
Expand Down Expand Up @@ -164,6 +173,7 @@ func NewNonBlockingBroadcaster(bufLen int, options ...BroadcasterOptionFunc) *Br
unreg: make(chan unregistration),
outputs: make(map[chan<- interface{}]ConsumerConfig),
BroadcasterConfig: &BroadcasterConfig{eagerBroadcast: true},
closed: make(chan interface{}),
}

for _, option := range options {
Expand Down
27 changes: 26 additions & 1 deletion mux/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package mux

import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func backpressureForConsumer(consumerName string, consumer chan string) func(value interface{}) {
Expand Down Expand Up @@ -210,3 +211,27 @@ func TestNoBackpressureOnProducerWithEagerBroadcast(t *testing.T) {
t.Error("Error, backpressure with eager broadcast")
}
}

func TestUnsubscribeAfterClose(t *testing.T) {
b := NewNonBlockingBroadcaster(0)
receiver := make(chan interface{})
b.Register(receiver)

b.Close()

timer := time.NewTimer(1 * time.Second)

done := make(chan interface{})

go func() {
b.Unregister(receiver)
done <- struct{}{}
}()

select {
case <-timer.C:
t.Fatalf("unable to unregister on time")
case <-done:
t.Log("Unregistered successfully")
}
}
21 changes: 15 additions & 6 deletions mux/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ the full state immediately, otherwise values will be dropped on backpressure.
package mux

import (
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -45,7 +44,7 @@ type StateBroadcaster struct {
outputs map[StateUpdateChan]ConsumerConfig
state map[interface{}]ttlValue
update chan update
closed uint32
closed chan interface{}
*BroadcasterConfig
}

Expand Down Expand Up @@ -101,18 +100,27 @@ type stateConsumer struct {
// Unregister a channel so that it no longer receives broadcasts.
func (b *StateBroadcaster) Unregister(newch StateUpdateChan) {
done := make(chan struct{})
b.unreg <- stateUnregistration{newch, done}
<-done
select {
case b.unreg <- stateUnregistration{newch, done}:
<-done
case <-b.closed:
return
}
}

// Shut this StateBroadcaster down.
func (b *StateBroadcaster) Close() {
atomic.StoreUint32(&b.closed, 1)
close(b.closed)
close(b.reg)
}

func (b *StateBroadcaster) Closed() bool {
return atomic.LoadUint32(&b.closed) > 0
select {
case <-b.closed:
return true
default:
return false
}
}

// Submit a new object to all subscribers
Expand Down Expand Up @@ -264,6 +272,7 @@ func NewNonBlockingStateBroadcaster(bufLen int, ttl time.Duration, options ...Br
outputs: make(map[StateUpdateChan]ConsumerConfig),
state: make(map[interface{}]ttlValue),
update: make(chan update, bufLen),
closed: make(chan interface{}),
BroadcasterConfig: &BroadcasterConfig{},
}
for _, option := range options {
Expand Down
24 changes: 23 additions & 1 deletion mux/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ func TestDelete(t *testing.T) {
}

func TestStateCleared(t *testing.T) {

b := NewNonBlockingStateBroadcaster(50, 0)

b.Submit("A", "A1")
Expand All @@ -215,5 +214,28 @@ func TestStateCleared(t *testing.T) {
result2 := b.GetCurrentState()

assert.Equal(t, 0, len(result2))
}

func TestUnsubscribeAfterCloseOnStateBroadcaster(t *testing.T) {
b := NewNonBlockingStateBroadcaster(50, 0)
receiver := make(chan *StateUpdate)
b.Register(receiver)

b.Close()

timer := time.NewTimer(1 * time.Second)

done := make(chan interface{})

go func() {
b.Unregister(receiver)
done <- struct{}{}
}()

select {
case <-timer.C:
t.Fatalf("unable to unregister on time")
case <-done:
t.Log("Unregistered successfully")
}
}

0 comments on commit 51a3ed5

Please sign in to comment.