From 86fba1730eb9f14f8bdaa90dbc3a849e809c3376 Mon Sep 17 00:00:00 2001 From: DuodenumL Date: Fri, 25 Feb 2022 11:35:27 +0800 Subject: [PATCH] fix a bug that could cause "Helium.dispatch" to panic --- discovery/helium/helium.go | 13 +++++++++++++ discovery/helium/helium_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/discovery/helium/helium.go b/discovery/helium/helium.go index d643da50f..4b3656e07 100644 --- a/discovery/helium/helium.go +++ b/discovery/helium/helium.go @@ -15,6 +15,7 @@ import ( // Helium . type Helium struct { sync.Once + lock *sync.RWMutex config types.GRPCConfig stor store.Store subs sync.Map @@ -25,6 +26,7 @@ func New(config types.GRPCConfig, stor store.Store) *Helium { h := &Helium{} h.config = config h.stor = stor + h.lock = &sync.RWMutex{} h.Do(func() { h.start(context.TODO()) // TODO rewrite ctx here, because this will run only once! }) @@ -33,6 +35,8 @@ func New(config types.GRPCConfig, stor store.Store) *Helium { // Subscribe . func (h *Helium) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID { + h.lock.Lock() + defer h.lock.Unlock() id := uuid.New() _, _ = h.subs.LoadOrStore(id, ch) return id @@ -40,6 +44,8 @@ func (h *Helium) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID { // Unsubscribe . func (h *Helium) Unsubscribe(id uuid.UUID) { + h.lock.Lock() + defer h.lock.Unlock() h.subs.Delete(id) } @@ -77,7 +83,14 @@ func (h *Helium) start(ctx context.Context) { } func (h *Helium) dispatch(status types.ServiceStatus) { + h.lock.RLock() + defer h.lock.RUnlock() h.subs.Range(func(k, v interface{}) bool { + defer func() { + if err := recover(); err != nil { + log.Errorf(context.TODO(), "[dispatch] dispatch %s failed, err: %v", k, err) + } + }() c, ok := v.(chan<- types.ServiceStatus) if !ok { log.Error("[WatchServiceStatus] failed to cast channel from map") diff --git a/discovery/helium/helium_test.go b/discovery/helium/helium_test.go index 4d79af122..1b1f68101 100644 --- a/discovery/helium/helium_test.go +++ b/discovery/helium/helium_test.go @@ -47,3 +47,33 @@ func TestHelium(t *testing.T) { close(chAddr) close(chStatus) } + +func TestPanic(t *testing.T) { + chAddr := make(chan []string) + + store := &storemocks.Store{} + store.On("ServiceStatusStream", mock.Anything).Return(chAddr, nil) + + grpcConfig := types.GRPCConfig{ + ServiceDiscoveryPushInterval: time.Duration(1) * time.Second, + } + service := New(grpcConfig, store) + + for i := 0; i < 1000; i++ { + go func() { + chStatus := make(chan types.ServiceStatus) + uuid := service.Subscribe(chStatus) + time.Sleep(time.Second) + service.Unsubscribe(uuid) + close(chStatus) + }() + } + + go func() { + for i := 0; i < 1000; i++ { + chAddr <- []string{"hhh", "hhh2"} + } + }() + + time.Sleep(5 * time.Second) +}