Skip to content

Commit

Permalink
Add api/filter_test
Browse files Browse the repository at this point in the history
  • Loading branch information
vitvly committed Mar 28, 2024
1 parent 03044de commit 295b509
Show file tree
Hide file tree
Showing 8 changed files with 463 additions and 447 deletions.
5 changes: 3 additions & 2 deletions waku/v2/api/filter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package filter
package api

import (
"context"
Expand Down Expand Up @@ -105,7 +105,8 @@ func (apiSub *Sub) checkAliveness() map[string]uint {
close(ch)
// Collect healthy topics
m := make(map[string]uint)
for t := range maps.Keys(protocol.ContentFilterToPubSubTopicMap(apiSub.ContentFilter)) {
topicMap, _ := protocol.ContentFilterToPubSubTopicMap(apiSub.ContentFilter)
for _, t := range maps.Keys(topicMap) {
m[t] = 0
}
for t := range ch {
Expand Down
47 changes: 47 additions & 0 deletions waku/v2/api/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package api

import (
"context"
"sync"
"testing"

"github.com/libp2p/go-libp2p/core/host"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"go.uber.org/zap"
)

func TestFilterApiSuite(t *testing.T) {
suite.Run(t, new(FilterApiTestSuite))
}

const defaultTestPubSubTopic = "/waku/2/go/filter/test"
const defaultTestContentTopic = "/test/10/my-app"

type FilterApiTestSuite struct {
suite.Suite

testTopic string
testContentTopic string
ctx context.Context
ctxCancel context.CancelFunc
lightNode *filter.WakuFilterLightNode
lightNodeHost host.Host
relayNode *relay.WakuRelay
relaySub *relay.Subscription
fullNode *filter.WakuFilterFullNode
fullNodeHost host.Host
wg *sync.WaitGroup
contentFilter protocol.ContentFilter
subDetails []*subscription.SubscriptionDetails
log *zap.Logger
}

type WakuMsg struct {
pubSubTopic string
contentTopic string
payload string
}
44 changes: 12 additions & 32 deletions waku/v2/protocol/filter/filter_proto_ident_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,51 +31,31 @@ import (
func (s *FilterTestSuite) TestCreateSubscription() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)

}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.testTopic, s.testContentTopic, ""}, s.subDetails[0].C)
}

func (s *FilterTestSuite) TestModifySubscription() {

// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)

}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.testTopic, s.testContentTopic, ""}, s.subDetails[0].C)

// Subscribe to another content_topic
newContentTopic := "Topic_modified"
s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID())

s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)

}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.testTopic, newContentTopic, ""}, s.subDetails[0].C)
}

func (s *FilterTestSuite) TestMultipleMessages() {

// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)

}, s.subDetails[0].C)

s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
s.waitForMsg(&WakuMsg{s.testTopic, s.testContentTopic, "first"}, s.subDetails[0].C)

}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.testTopic, s.testContentTopic, "second"}, s.subDetails[0].C)
}

func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *FilterSubscribeParameters,
Expand Down Expand Up @@ -222,7 +202,7 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi

func (s *FilterTestSuite) TestIncorrectSubscribeIdentifier() {
log := utils.Logger()
s.log = log
s.Log = log
s.wg = &sync.WaitGroup{}

// Create test context
Expand All @@ -231,9 +211,9 @@ func (s *FilterTestSuite) TestIncorrectSubscribeIdentifier() {
s.testTopic = defaultTestPubSubTopic
s.testContentTopic = defaultTestContentTopic

s.lightNode = s.makeWakuFilterLightNode(true, true)
s.lightNode = s.StartNode(s.MakeWakuFilterLightNode())

s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
s.relayNode, s.fullNode = s.MakeWakuFilterFullNode(s.testTopic, false)

//Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
Expand All @@ -259,7 +239,7 @@ func (wf *WakuFilterLightNode) startWithIncorrectPushProto() error {

func (s *FilterTestSuite) TestIncorrectPushIdentifier() {
log := utils.Logger()
s.log = log
s.Log = log
s.wg = &sync.WaitGroup{}

// Create test context
Expand All @@ -270,9 +250,9 @@ func (s *FilterTestSuite) TestIncorrectPushIdentifier() {
s.testTopic = defaultTestPubSubTopic
s.testContentTopic = defaultTestContentTopic

s.lightNode = s.makeWakuFilterLightNode(false, true)
s.lightNode = s.MakeWakuFilterLightNode()

s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
s.relayNode, s.fullNode = s.MakeWakuFilterFullNode(s.testTopic, false)

// Re-start light node with unsupported prefix for match func
s.lightNode.Stop()
Expand All @@ -298,7 +278,7 @@ func (s *FilterTestSuite) TestIncorrectPushIdentifier() {
// Message should never arrive -> exit after timeout
select {
case msg := <-s.subDetails[0].C:
s.log.Info("Light node received a msg")
s.Log.Info("Light node received a msg")
s.Require().Nil(msg)
case <-time.After(1 * time.Second):
s.Require().True(true)
Expand Down
7 changes: 4 additions & 3 deletions waku/v2/protocol/filter/filter_push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package filter

import (
"context"
"strconv"
"time"

"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"strconv"
"time"
)

func (s *FilterTestSuite) TestValidPayloadsASCII() {
Expand Down Expand Up @@ -131,7 +132,7 @@ func (s *FilterTestSuite) TestLargePayloadsUTF8() {
// Generate large string
for i := range messages {
messages[i].payload, _ = tests.GenerateRandomUTF8String(153600)
s.log.Info("Generated payload with ", zap.String("length", strconv.Itoa(len(messages[i].payload))))
s.Log.Info("Generated payload with ", zap.String("length", strconv.Itoa(len(messages[i].payload))))
}

// All messages should be received
Expand Down
52 changes: 18 additions & 34 deletions waku/v2/protocol/filter/filter_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,16 @@ func (s *FilterTestSuite) TestWakuFilter() {
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

// Should be received
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "first")
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.testTopic, s.testContentTopic, "first"}, s.subDetails[0].C)

// Wrong content topic
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, "TopicB", "second")
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{s.testTopic, "TopicB", "second"}, s.subDetails[0].C)

_, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)

// Should not receive after unsubscribe
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "third")
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{s.testTopic, s.testContentTopic, "third"}, s.subDetails[0].C)

// Two new subscriptions with same [peer, contentFilter]
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
Expand All @@ -46,13 +40,9 @@ func (s *FilterTestSuite) TestWakuFilter() {
s.Require().Equal(len(s.lightNode.Subscriptions()), 2)

// Should be received on both subscriptions
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "fourth")
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.testTopic, s.testContentTopic, "fourth"}, s.subDetails[0].C)

s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "fifth")
}, secondSub[0].C)
s.waitForMsg(&WakuMsg{s.testTopic, s.testContentTopic, "fifth"}, secondSub[0].C)

s.waitForMsg(nil, s.subDetails[0].C)
s.waitForMsg(nil, secondSub[0].C)
Expand All @@ -62,9 +52,7 @@ func (s *FilterTestSuite) TestWakuFilter() {
s.Require().NoError(err)

// Should still receive
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "sixth")
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.testTopic, s.testContentTopic, "sixth"}, s.subDetails[0].C)

// Unsubscribe from first sub only
_, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0])
Expand All @@ -73,19 +61,15 @@ func (s *FilterTestSuite) TestWakuFilter() {
s.Require().Equal(len(s.lightNode.Subscriptions()), 0)

// Should not receive after unsubscribe
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "seventh")
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{s.testTopic, s.testContentTopic, "seventh"}, s.subDetails[0].C)
}

func (s *FilterTestSuite) TestPubSubSingleContentTopic() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

// Message should be received
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "test_msg")
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.testTopic, s.testContentTopic, "test_msg"}, s.subDetails[0].C)

_, err := s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
Expand Down Expand Up @@ -119,9 +103,9 @@ func (s *FilterTestSuite) TestMultiPubSubMultiContentTopic() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 20 seconds

s.lightNode = s.makeWakuFilterLightNode(true, true)
s.lightNode = s.StartNode(s.MakeWakuFilterLightNode())

s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true)
s.relayNode, s.fullNode = s.MakeWakuFilterFullNode(s.testTopic, true)

// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
Expand All @@ -133,14 +117,14 @@ func (s *FilterTestSuite) TestMultiPubSubMultiContentTopic() {
// Subscribe
for _, m := range messages {
s.subDetails = append(s.subDetails, s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())...)
s.log.Info("Subscribing ", zap.String("PubSubTopic", m.pubSubTopic))
s.Log.Info("Subscribing ", zap.String("PubSubTopic", m.pubSubTopic))
_, err := s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic))
s.Require().NoError(err)
}

// Debug to see subscriptions in light node
for _, sub := range s.subDetails {
s.log.Info("Light Node subscription ", zap.String("PubSubTopic", sub.ContentFilter.PubsubTopic), zap.String("ContentTopic", sub.ContentFilter.ContentTopicsList()[0]))
s.Log.Info("Light Node subscription ", zap.String("PubSubTopic", sub.ContentFilter.PubsubTopic), zap.String("ContentTopic", sub.ContentFilter.ContentTopicsList()[0]))
}

// All messages should be received
Expand Down Expand Up @@ -270,7 +254,7 @@ func (s *FilterTestSuite) TestSubscribeErrorHandling() {

func (s *FilterTestSuite) TestMultipleFullNodeSubscriptions() {
log := utils.Logger()
s.log = log
s.Log = log
s.wg = &sync.WaitGroup{}

// Create test context
Expand All @@ -279,10 +263,10 @@ func (s *FilterTestSuite) TestMultipleFullNodeSubscriptions() {
fullNodeIDHex := make([]byte, hex.EncodedLen(len([]byte(s.fullNodeHost.ID()))))
_ = hex.Encode(fullNodeIDHex, []byte(s.fullNodeHost.ID()))

s.log.Info("Already subscribed to", zap.String("fullNode", string(fullNodeIDHex)))
s.Log.Info("Already subscribed to", zap.String("fullNode", string(fullNodeIDHex)))

// This will overwrite values with the second node info
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
s.relayNode, s.fullNode = s.MakeWakuFilterFullNode(s.testTopic, false)

// Connect to second full and relay node
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
Expand All @@ -292,7 +276,7 @@ func (s *FilterTestSuite) TestMultipleFullNodeSubscriptions() {
fullNodeIDHex = make([]byte, hex.EncodedLen(len([]byte(s.fullNodeHost.ID()))))
_ = hex.Encode(fullNodeIDHex, []byte(s.fullNodeHost.ID()))

s.log.Info("Subscribing to second", zap.String("fullNode", string(fullNodeIDHex)))
s.Log.Info("Subscribing to second", zap.String("fullNode", string(fullNodeIDHex)))

// Subscribe to the second full node
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)}
Expand All @@ -308,7 +292,7 @@ func (s *FilterTestSuite) TestSubscribeMultipleLightNodes() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds

lightNode2 := s.makeWakuFilterLightNode(true, true)
lightNode2 := s.StartNode(s.MakeWakuFilterLightNode())

// Connect node2
lightNode2.h.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
Expand Down Expand Up @@ -344,7 +328,7 @@ func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second)

_, fullNode2 := s.makeWakuFilterFullNode(testTopic, false, false)
_, fullNode2 := s.MakeWakuFilterFullNodeNoSharing(testTopic, false)

// Connect nodes
fullNode2.h.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
Expand Down
Loading

0 comments on commit 295b509

Please sign in to comment.