Skip to content

Commit

Permalink
Remove locks/waitgroups
Browse files Browse the repository at this point in the history
  • Loading branch information
vitvly committed Apr 26, 2024
1 parent d57c9a3 commit 194bd7f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 43 deletions.
94 changes: 53 additions & 41 deletions waku/v2/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package api

import (
"context"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -19,7 +18,6 @@ type FilterConfig struct {
}

type Sub struct {
sync.RWMutex
ContentFilter protocol.ContentFilter
DataCh chan *protocol.Envelope
Config FilterConfig
Expand All @@ -40,9 +38,10 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
sub.Config = config
sub.log = func() *zap.Logger { log, _ := zap.NewDevelopment(); return log }().Named("filterv2-api")

err := sub.subscribe(contentFilter, sub.Config.MaxPeers)
subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers)

if err == nil {
sub.multiplex(subs)
sub.log.Info("go sub.healthCheckLoop()")
go sub.healthCheckLoop()
return sub, nil
Expand All @@ -51,15 +50,13 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
}
}

func (apiSub *Sub) Unsubscribe() error {
apiSub.Lock()
defer apiSub.Unlock()
func (apiSub *Sub) Unsubscribe() {
apiSub.cancel()
for _, s := range apiSub.subs {
apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s)
}
apiSub.cancel()
close(apiSub.DataCh)
return nil

}

func (apiSub *Sub) healthCheckLoop() {
Expand All @@ -76,12 +73,11 @@ func (apiSub *Sub) healthCheckLoop() {
apiSub.checkAliveness()
}
}

}

func (apiSub *Sub) checkAliveness() {
apiSub.log.Info("ENTER checkAliveness()")
apiSub.RLock()
defer apiSub.RUnlock()

// Buffered chan for sub aliveness results
type CheckResult struct {
Expand All @@ -90,54 +86,69 @@ func (apiSub *Sub) checkAliveness() {
}
ch := make(chan CheckResult, len(apiSub.subs))

wg := &sync.WaitGroup{}

// Run pings asynchronously
wg.Add(len(apiSub.subs))
for _, subDetails := range apiSub.subs {
go func(subDetails *subscription.SubscriptionDetails) {
defer wg.Done()
for _, s := range apiSub.subs {
go func() {
ctx, _ := context.WithTimeout(apiSub.ctx, 5*time.Second)
err := apiSub.wf.IsSubscriptionAlive(ctx, subDetails)

ch <- CheckResult{subDetails, err == nil}
}(subDetails)
err := apiSub.wf.IsSubscriptionAlive(ctx, s)

ch <- CheckResult{s, err == nil}
}()
}
wg.Wait()
close(ch)
// Collect healthy topics
m := make(map[string]int)

// Collect healthy topic counts
topicCounts := make(map[string]int)

topicMap, _ := protocol.ContentFilterToPubSubTopicMap(apiSub.ContentFilter)
for _, t := range maps.Keys(topicMap) {
m[t] = 0
topicCounts[t] = 0
}
// Close inactive subs
cnt := 0
for s := range ch {
cnt++
if !s.alive {
s.sub.Close()
delete(apiSub.subs, s.sub.ID)
} else {
m[s.sub.ContentFilter.PubsubTopic]++
topicCounts[s.sub.ContentFilter.PubsubTopic]++
}

if cnt == len(apiSub.subs) {
// All values received
break
}
}
close(ch)
for t, cnt := range topicCounts {
if cnt == apiSub.Config.MaxPeers {
delete(topicCounts, t)
}
}
// Re-subscribe asynchronously
for t, cnt := range m {
if cnt < apiSub.Config.MaxPeers {
wg.Add(1)
cFilter := protocol.ContentFilter{t, apiSub.ContentFilter.ContentTopics}
go func() {
defer wg.Done()
apiSub.subscribe(cFilter, apiSub.Config.MaxPeers-cnt)
}()
newSubs := make(chan []*subscription.SubscriptionDetails)
for t, cnt := range topicCounts {
cFilter := protocol.ContentFilter{t, apiSub.ContentFilter.ContentTopics}
go func() {
subs, err := apiSub.subscribe(cFilter, apiSub.Config.MaxPeers-cnt)
if err != nil {
newSubs <- subs
}
}()
}

for subs := range newSubs {
apiSub.multiplex(subs)
if cnt == len(topicCounts) {
break
}
}
wg.Wait()
close(newSubs)

apiSub.log.Info("EXIT checkAliveness()")
}

func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int) error {
func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int) ([]*subscription.SubscriptionDetails, error) {
// Low-level subscribe, returns a set of SubscriptionDetails
options := make([]filter.FilterSubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
Expand All @@ -148,10 +159,13 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int

if err != nil {
// TODO what if fails?
return err
return nil, err
}
apiSub.Lock()
defer apiSub.Unlock()

return subs, nil
}

func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
for _, s := range subs {
apiSub.subs[s.ID] = s
}
Expand All @@ -165,6 +179,4 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
}
}(subDetails)
}
return nil

}
2 changes: 0 additions & 2 deletions waku/v2/api/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
import (
"context"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -67,7 +66,6 @@ func (s *FilterApiTestSuite) TestSubscribe() {
}
s.Require().Equal(cnt, 1)

time.Sleep(10 * time.Second)
apiSub.Unsubscribe()
for _ = range apiSub.DataCh {
}
Expand Down

0 comments on commit 194bd7f

Please sign in to comment.