From b3eadd407a17b750e7bd72c42d861e3b9f9234df Mon Sep 17 00:00:00 2001 From: Vitaly Vlasov Date: Mon, 11 Mar 2024 13:15:56 +0200 Subject: [PATCH] Sub multiplexing through Filter API --- waku/v2/api/filter.go | 134 +++++++++++++++++++++ waku/v2/node/wakunode2.go | 2 +- waku/v2/peermanager/topic_event_handler.go | 3 +- waku/v2/peerstore/waku_peer_store.go | 50 +++----- waku/v2/protocol/content_filter.go | 9 ++ 5 files changed, 166 insertions(+), 32 deletions(-) create mode 100644 waku/v2/api/filter.go diff --git a/waku/v2/api/filter.go b/waku/v2/api/filter.go new file mode 100644 index 000000000..893a2be12 --- /dev/null +++ b/waku/v2/api/filter.go @@ -0,0 +1,134 @@ +package filter + +import ( + "context" + "sync" + "time" + + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/subscription" +) + +type FilterConfig struct { + MaxPeers uint +} + +type Sub struct { + sync.RWMutex + ContentFilter protocol.ContentFilter + DataCh chan *protocol.Envelope + Config FilterConfig + subs subscription.SubscriptionSet + wf *filter.WakuFilterLightNode + ctx context.Context + cancel context.CancelFunc +} + +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig) (*Sub, error) { + sub := new(Sub) + sub.wf = wf + sub.ctx, sub.cancel = context.WithCancel(ctx) + sub.subs = make(subscription.SubscriptionSet) + sub.DataCh = make(chan *protocol.Envelope) + sub.ContentFilter = contentFilter + sub.Config = config + + err := sub.subscribe(contentFilter, sub.Config.MaxPeers) + + if err == nil { + sub.healthCheckLoop() + return sub, nil + } else { + return nil, err + } +} + +func Unsubscribe(apiSub *Sub) error { + apiSub.RLock() + defer apiSub.RUnlock() + for _, s := range apiSub.subs { + apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s) + } + apiSub.cancel() + return nil +} + +func (apiSub *Sub) healthCheckLoop() { + // Health checks + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-apiSub.ctx.Done(): + return + case <-ticker.C: + // Returns a map of pubsub topics to peer counts + m := apiSub.checkAliveness() + for t, cnt := range m { + if cnt < apiSub.Config.MaxPeers { + cFilter := protocol.ContentFilter{t, apiSub.ContentFilter.ContentTopics} + apiSub.subscribe(cFilter, apiSub.Config.MaxPeers-cnt) + } + } + } + } +} + +func (apiSub *Sub) checkAliveness() map[string]uint { + apiSub.RLock() + defer apiSub.RUnlock() + ch := make(chan string, len(apiSub.subs)) + wg := &sync.WaitGroup{} + wg.Add(len(apiSub.subs)) + for _, subDetails := range apiSub.subs { + go func(subDetails *subscription.SubscriptionDetails) { + defer wg.Done() + err := apiSub.wf.IsSubscriptionAlive(apiSub.ctx, subDetails) + + if err != nil { + subDetails.Close() + apiSub.Lock() + defer apiSub.Unlock() + delete(apiSub.subs, subDetails.ID) + } else { + ch <- subDetails.ContentFilter.PubsubTopic + } + }(subDetails) + + } + wg.Wait() + close(ch) + // Collect healthy topics + m := make(map[string]uint) + for topic := range ch { + m[topic]++ + } + + return m + +} +func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount uint) error { + // Low-level subscribe, returns a set of SubscriptionDetails + subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, filter.WithMaxPeersPerContentFilter(int(peerCount))) + if err != nil { + // TODO what if fails? + return err + } + apiSub.Lock() + defer apiSub.Unlock() + for _, s := range subs { + apiSub.subs[s.ID] = s + } + // Multiplex onto single channel + // Goroutines will exit once sub channels are closed + for _, subDetails := range subs { + go func(subDetails *subscription.SubscriptionDetails) { + for env := range subDetails.C { + apiSub.DataCh <- env + } + }(subDetails) + } + return nil + +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index dda7c0929..968b75c2c 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -831,7 +831,7 @@ func (w *WakuNode) Peers() ([]*Peer, error) { Protocols: protocols, Connected: connected, Addrs: addrs, - PubsubTopics: topics, + PubsubTopics: maps.Keys(topics), }) } return peers, nil diff --git a/waku/v2/peermanager/topic_event_handler.go b/waku/v2/peermanager/topic_event_handler.go index 1a39fee25..ca4c149f8 100644 --- a/waku/v2/peermanager/topic_event_handler.go +++ b/waku/v2/peermanager/topic_event_handler.go @@ -12,6 +12,7 @@ import ( waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" + "golang.org/x/exp/maps" ) func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error { @@ -102,7 +103,7 @@ func (pm *PeerManager) handleNewRelayTopicUnSubscription(pubsubTopic string) { logging.HostID("peerID", peer)) continue } - if len(peerTopics) == 1 && peerTopics[0] == pubsubTopic { + if len(peerTopics) == 1 && maps.Keys(peerTopics)[0] == pubsubTopic { err := pm.host.Network().ClosePeer(peer) if err != nil { pm.logger.Warn("Failed to disconnect connection towards peer", diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index 402d265ad..203b23103 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -8,6 +8,8 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" + "golang.org/x/exp/maps" ) // Origin is used to determine how the peer is identified, @@ -58,7 +60,7 @@ type WakuPeerstore interface { AddPubSubTopic(p peer.ID, topic string) error RemovePubSubTopic(p peer.ID, topic string) error - PubSubTopics(p peer.ID) ([]string, error) + PubSubTopics(p peer.ID) (protocol.TopicSet, error) SetPubSubTopics(p peer.ID, topics []string) error PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice @@ -175,13 +177,12 @@ func (ps *WakuPeerstoreImpl) AddPubSubTopic(p peer.ID, topic string) error { if err != nil { return err } - for _, t := range existingTopics { - if t == topic { - return nil - } + + if _, found := existingTopics[topic]; found { + return nil } - existingTopics = append(existingTopics, topic) - return ps.peerStore.Put(p, peerPubSubTopics, existingTopics) + existingTopics[topic] = struct{}{} + return ps.peerStore.Put(p, peerPubSubTopics, maps.Keys(existingTopics)) } // RemovePubSubTopic removes a pubSubTopic from the peer @@ -195,14 +196,9 @@ func (ps *WakuPeerstoreImpl) RemovePubSubTopic(p peer.ID, topic string) error { return nil } - for i := range existingTopics { - if existingTopics[i] == topic { - existingTopics = append(existingTopics[:i], existingTopics[i+1:]...) - break - } - } + delete(existingTopics, topic) - err = ps.SetPubSubTopics(p, existingTopics) + err = ps.SetPubSubTopics(p, maps.Keys(existingTopics)) if err != nil { return err } @@ -215,16 +211,16 @@ func (ps *WakuPeerstoreImpl) SetPubSubTopics(p peer.ID, topics []string) error { } // PubSubTopics fetches list of pubSubTopics for a peer -func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) { +func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) (protocol.TopicSet, error) { result, err := ps.peerStore.Get(p, peerPubSubTopics) if err != nil { if errors.Is(err, peerstore.ErrNotFound) { - return nil, nil + return protocol.NewTopicSet(), nil } else { return nil, err } } - return result.([]string), nil + return protocol.NewTopicSet((result.([]string))...), nil } // PeersByPubSubTopic Returns list of peers that support list of pubSubTopics @@ -235,22 +231,16 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specific } var result peer.IDSlice for _, p := range specificPeers { - topics, err := ps.PubSubTopics(p) + peerMatch := true + peerTopics, err := ps.PubSubTopics(p) if err == nil { - //Convoluted and crazy logic to find subset of topics - // Could not find a better way to do it? - peerTopicMap := make(map[string]struct{}) - match := true - for _, topic := range topics { - peerTopicMap[topic] = struct{}{} - } - for _, topic := range pubSubTopics { - if _, ok := peerTopicMap[topic]; !ok { - match = false + for _, t := range pubSubTopics { + if _, ok := peerTopics[t]; !ok { + peerMatch = false break } } - if match { + if peerMatch { result = append(result, p) } } //Note: skipping a peer in case of an error as there would be others available. @@ -268,7 +258,7 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeer for _, p := range specificPeers { topics, err := ps.PubSubTopics(p) if err == nil { - for _, topic := range topics { + for topic := range topics { if topic == pubSubTopic { result = append(result, p) } diff --git a/waku/v2/protocol/content_filter.go b/waku/v2/protocol/content_filter.go index f09cf52b3..0cff1cbc5 100644 --- a/waku/v2/protocol/content_filter.go +++ b/waku/v2/protocol/content_filter.go @@ -6,6 +6,15 @@ type PubsubTopicStr = string type ContentTopicStr = string type ContentTopicSet map[string]struct{} +type TopicSet map[string]struct{} + +func NewTopicSet(topics ...string) TopicSet { + s := make(TopicSet, len(topics)) + for _, t := range topics { + s[t] = struct{}{} + } + return s +} func NewContentTopicSet(contentTopics ...string) ContentTopicSet { s := make(ContentTopicSet, len(contentTopics))