Skip to content

Commit

Permalink
chore: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed May 13, 2024
1 parent 9e20c16 commit e6da6d8
Showing 1 changed file with 18 additions and 28 deletions.
46 changes: 18 additions & 28 deletions waku/v2/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -12,6 +13,8 @@ import (
"golang.org/x/exp/maps"
)

const FilterPingTimeout = 5

type FilterConfig struct {
MaxPeers int
Peers []peer.ID
Expand Down Expand Up @@ -44,14 +47,13 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte

subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers)

if err == nil {
sub.multiplex(subs)
sub.log.Info("go sub.healthCheckLoop()")
go sub.healthCheckLoop()
return sub, nil
} else {
if err != nil {
return nil, err
}
sub.multiplex(subs)
sub.log.Info("go sub.healthCheckLoop()")
go sub.healthCheckLoop()
return sub, nil
}

func (apiSub *Sub) Unsubscribe() {
Expand All @@ -61,7 +63,7 @@ func (apiSub *Sub) Unsubscribe() {

func (apiSub *Sub) healthCheckLoop() {
// Health checks
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(FilterPingTimeout * time.Second)
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -97,22 +99,20 @@ func (apiSub *Sub) cleanup() {

// Returns active sub counts for each pubsub topic
func (apiSub *Sub) getTopicCounts() map[string]int {
apiSub.log.Info("ENTER getTopicCounts()")
defer func() {
apiSub.log.Info("EXIT getTopicCounts()")
}()

// Buffered chan for sub aliveness results
type CheckResult struct {
sub *subscription.SubscriptionDetails
alive bool
}
checkResults := make(chan CheckResult, len(apiSub.subs))
var wg sync.WaitGroup

// Run pings asynchronously
for _, s := range apiSub.subs {
wg.Add(1)
go func(sub *subscription.SubscriptionDetails) {
ctx, cancelFunc := context.WithTimeout(apiSub.ctx, 5*time.Second)
defer wg.Done()
ctx, cancelFunc := context.WithTimeout(apiSub.ctx, FilterPingTimeout*time.Second)
defer cancelFunc()
err := apiSub.wf.IsSubscriptionAlive(ctx, sub)

Expand All @@ -129,7 +129,8 @@ func (apiSub *Sub) getTopicCounts() map[string]int {
topicCounts[t] = 0
}
cnt := 0
subLen := len(apiSub.subs)
wg.Wait()
close(checkResults)
for s := range checkResults {
cnt++
if !s.alive {
Expand All @@ -139,23 +140,13 @@ func (apiSub *Sub) getTopicCounts() map[string]int {
} else {
topicCounts[s.sub.ContentFilter.PubsubTopic]++
}

if cnt == subLen {
// All values received
break
}
}

close(checkResults)
return topicCounts
}

// Attempts to resubscribe on topics that lack subscriptions
func (apiSub *Sub) resubscribe(topicCounts map[string]int) {
apiSub.log.Info("ENTER resubscribe()")
defer func() {
apiSub.log.Info("EXIT resubscribe()")
}()

// Delete healthy topics
for t, cnt := range topicCounts {
Expand All @@ -181,7 +172,7 @@ func (apiSub *Sub) resubscribe(topicCounts map[string]int) {
}

cnt := 0
apiSub.log.Info("resubscribe(): before range newSubs")
apiSub.log.Debug("resubscribe(): before range newSubs")
for subs := range newSubs {
cnt++
if subs != nil {
Expand Down Expand Up @@ -214,12 +205,11 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
}

func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
for _, s := range subs {
apiSub.subs[s.ID] = s
}

// Multiplex onto single channel
// Goroutines will exit once sub channels are closed
for _, subDetails := range subs {
apiSub.subs[subDetails.ID] = subDetails
go func(subDetails *subscription.SubscriptionDetails) {
apiSub.log.Info("New multiplex", zap.String("subID", subDetails.ID))
for env := range subDetails.C {
Expand Down

0 comments on commit e6da6d8

Please sign in to comment.