diff --git a/Makefile b/Makefile index 904cb39f38d..b7dd16f3221 100644 --- a/Makefile +++ b/Makefile @@ -336,7 +336,7 @@ docker-build-access-debug: docker build -f cmd/Dockerfile --build-arg TARGET=./cmd/access --build-arg COMMIT=$(COMMIT) --build-arg VERSION=$(IMAGE_TAG) --build-arg GOARCH=$(GOARCH) --target debug \ -t "$(CONTAINER_REGISTRY)/access-debug:latest" -t "$(CONTAINER_REGISTRY)/access-debug:$(SHORT_COMMIT)" -t "$(CONTAINER_REGISTRY)/access-debug:$(IMAGE_TAG)" . -# build corrupted access node for BFT testing +# build corrupt access node for BFT testing .PHONY: docker-build-access-corrupt docker-build-access-corrupt: #temporarily make insecure/ a non-module to allow Docker to use corrupt builders there diff --git a/insecure/cmd/mods_override.sh b/insecure/cmd/mods_override.sh index 6713441ad4f..6f6b4d4a6a7 100755 --- a/insecure/cmd/mods_override.sh +++ b/insecure/cmd/mods_override.sh @@ -6,7 +6,7 @@ cp ./go.mod ./go2.mod cp ./go.sum ./go2.sum # inject forked libp2p-pubsub into main module to allow building corrupt Docker images -echo "require github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.12-0.20221110181155-60457b3ef6d5" >> ./go.mod +echo "require github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.2-0.20221208234712-b44d9133e4ee" >> ./go.mod # update go.sum since added new dependency go mod tidy diff --git a/insecure/corruptlibp2p/gossipsub_spammer.go b/insecure/corruptlibp2p/gossipsub_spammer.go new file mode 100644 index 00000000000..70a3ef4a780 --- /dev/null +++ b/insecure/corruptlibp2p/gossipsub_spammer.go @@ -0,0 +1,117 @@ +package corruptlibp2p + +import ( + "sync" + "testing" + "time" + + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + corrupt "github.com/yhassanzadeh13/go-libp2p-pubsub" + + "github.com/onflow/flow-go/insecure/internal" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/network/p2p" + p2ptest "github.com/onflow/flow-go/network/p2p/test" +) + +// GossipSubRouterSpammer is a wrapper around the GossipSubRouter that allows us to +// spam the victim with junk control messages. +type GossipSubRouterSpammer struct { + router *atomicRouter + SpammerNode p2p.LibP2PNode +} + +// NewGossipSubRouterSpammer is the main method tests call for spamming attacks. +func NewGossipSubRouterSpammer(t *testing.T, sporkId flow.Identifier, role flow.Role) *GossipSubRouterSpammer { + spammerNode, router := createSpammerNode(t, sporkId, role) + return &GossipSubRouterSpammer{ + router: router, + SpammerNode: spammerNode, + } +} + +// SpamIHave spams the victim with junk iHave messages. +// ctlMessages is the list of spam messages to send to the victim node. +func (s *GossipSubRouterSpammer) SpamIHave(t *testing.T, victim p2p.LibP2PNode, ctlMessages []pb.ControlMessage) { + for _, ctlMessage := range ctlMessages { + require.True(t, s.router.Get().SendControl(victim.Host().ID(), &ctlMessage)) + } +} + +// GenerateIHaveCtlMessages generates IHAVE control messages before they are sent so the test can prepare +// to expect receiving them before they are sent by the spammer. +func (s *GossipSubRouterSpammer) GenerateIHaveCtlMessages(t *testing.T, msgCount, msgSize int) []pb.ControlMessage { + var iHaveCtlMsgs []pb.ControlMessage + for i := 0; i < msgCount; i++ { + iHaveCtlMsg := GossipSubCtrlFixture(WithIHave(msgCount, msgSize)) + + iHaves := iHaveCtlMsg.GetIhave() + require.Equal(t, msgCount, len(iHaves)) + iHaveCtlMsgs = append(iHaveCtlMsgs, *iHaveCtlMsg) + } + return iHaveCtlMsgs +} + +// Start starts the spammer and waits until it is fully initialized before returning. +func (s *GossipSubRouterSpammer) Start(t *testing.T) { + require.Eventuallyf(t, func() bool { + // ensuring the spammer router has been initialized. + // this is needed because the router is initialized asynchronously. + return s.router.Get() != nil + }, 1*time.Second, 100*time.Millisecond, "spammer router not set") + s.router.set(s.router.Get()) +} + +func createSpammerNode(t *testing.T, sporkId flow.Identifier, role flow.Role) (p2p.LibP2PNode, *atomicRouter) { + router := newAtomicRouter() + spammerNode, _ := p2ptest.NodeFixture( + t, + sporkId, + t.Name(), + p2ptest.WithRole(role), + internal.WithCorruptGossipSub(CorruptGossipSubFactory(func(r *corrupt.GossipSubRouter) { + require.NotNil(t, r) + router.set(r) + }), + CorruptGossipSubConfigFactoryWithInspector(func(id peer.ID, rpc *corrupt.RPC) error { + // here we can inspect the incoming RPC message to the spammer node + return nil + })), + ) + return spammerNode, router +} + +// atomicRouter is a wrapper around the corrupt.GossipSubRouter that allows atomic access to the router. +// This is done to avoid race conditions when accessing the router from multiple goroutines. +type atomicRouter struct { + mu sync.Mutex + router *corrupt.GossipSubRouter +} + +func newAtomicRouter() *atomicRouter { + return &atomicRouter{ + mu: sync.Mutex{}, + } +} + +// SetRouter sets the router if it has never been set. Returns true if the router was set, false otherwise. +func (a *atomicRouter) set(router *corrupt.GossipSubRouter) bool { + a.mu.Lock() + defer a.mu.Unlock() + if a.router == nil { + a.router = router + return true + } + return false +} + +// Get returns the router. +func (a *atomicRouter) Get() *corrupt.GossipSubRouter { + a.mu.Lock() + defer a.mu.Unlock() + return a.router +} + +// TODO: SpamIWant, SpamGraft, SpamPrune. diff --git a/insecure/corruptlibp2p/libp2p.go b/insecure/corruptlibp2p/libp2p.go deleted file mode 100644 index 5d06a2a899c..00000000000 --- a/insecure/corruptlibp2p/libp2p.go +++ /dev/null @@ -1,11 +0,0 @@ -package corruptlibp2p - -import ( - corruptpubsub "github.com/yhassanzadeh13/go-libp2p-pubsub" -) - -// getGossipSubParams is a small test function to test that can access forked pubsub module -func getGossipSubParams() corruptpubsub.GossipSubParams { - defaultParams := corruptpubsub.DefaultGossipSubParams() - return defaultParams -} diff --git a/insecure/corruptlibp2p/libp2p_node_factory.go b/insecure/corruptlibp2p/libp2p_node_factory.go index 32c28d1889d..67e7a35694f 100644 --- a/insecure/corruptlibp2p/libp2p_node_factory.go +++ b/insecure/corruptlibp2p/libp2p_node_factory.go @@ -5,8 +5,9 @@ import ( "time" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + corrupt "github.com/yhassanzadeh13/go-libp2p-pubsub" - "github.com/onflow/flow-go/insecure/internal" "github.com/onflow/flow-go/network/p2p" madns "github.com/multiformats/go-multiaddr-dns" @@ -40,7 +41,7 @@ func NewCorruptLibP2PNodeFactory( ) p2pbuilder.LibP2PFactoryFunc { return func() (p2p.LibP2PNode, error) { if chainID != flow.BftTestnet { - panic("illegal chain id for using corruptible conduit factory") + panic("illegal chain id for using corrupt libp2p node") } builder := p2pbuilder.DefaultNodeBuilder( @@ -67,27 +68,36 @@ func NewCorruptLibP2PNodeFactory( } } -// CorruptibleGossipSubFactory returns a factory function that creates a new instance of the forked gossipsub module from +// CorruptGossipSubFactory returns a factory function that creates a new instance of the forked gossipsub module from // github.com/yhassanzadeh13/go-libp2p-pubsub for the purpose of BFT testing and attack vector implementation. -func CorruptibleGossipSubFactory() (p2pbuilder.GossipSubFactoryFunc, *internal.CorruptGossipSubRouter) { - var rt *internal.CorruptGossipSubRouter +func CorruptGossipSubFactory(routerOpts ...func(*corrupt.GossipSubRouter)) p2pbuilder.GossipSubFactoryFunc { factory := func(ctx context.Context, logger zerolog.Logger, host host.Host, cfg p2p.PubSubAdapterConfig) (p2p.PubSubAdapter, error) { adapter, router, err := NewCorruptGossipSubAdapter(ctx, logger, host, cfg) - rt = router + for _, opt := range routerOpts { + opt(router) + } return adapter, err } - return factory, rt + return factory } -// CorruptibleGossipSubConfigFactory returns a factory function that creates a new instance of the forked gossipsub config +// CorruptGossipSubConfigFactory returns a factory function that creates a new instance of the forked gossipsub config // from github.com/yhassanzadeh13/go-libp2p-pubsub for the purpose of BFT testing and attack vector implementation. -func CorruptibleGossipSubConfigFactory(opts ...CorruptPubSubAdapterConfigOption) p2pbuilder.GossipSubAdapterConfigFunc { +func CorruptGossipSubConfigFactory(opts ...CorruptPubSubAdapterConfigOption) p2pbuilder.GossipSubAdapterConfigFunc { return func(base *p2p.BasePubSubAdapterConfig) p2p.PubSubAdapterConfig { return NewCorruptPubSubAdapterConfig(base, opts...) } } +// CorruptGossipSubConfigFactoryWithInspector returns a factory function that creates a new instance of the forked gossipsub config +// from github.com/yhassanzadeh13/go-libp2p-pubsub for the purpose of BFT testing and attack vector implementation. +func CorruptGossipSubConfigFactoryWithInspector(inspector func(peer.ID, *corrupt.RPC) error) p2pbuilder.GossipSubAdapterConfigFunc { + return func(base *p2p.BasePubSubAdapterConfig) p2p.PubSubAdapterConfig { + return NewCorruptPubSubAdapterConfig(base, WithInspector(inspector)) + } +} + func overrideWithCorruptGossipSub(builder p2pbuilder.NodeBuilder, opts ...CorruptPubSubAdapterConfigOption) { - factory, _ := CorruptibleGossipSubFactory() - builder.SetGossipSubFactory(factory, CorruptibleGossipSubConfigFactory(opts...)) + factory := CorruptGossipSubFactory() + builder.SetGossipSubFactory(factory, CorruptGossipSubConfigFactory(opts...)) } diff --git a/insecure/corruptlibp2p/libp2p_test.go b/insecure/corruptlibp2p/libp2p_test.go deleted file mode 100644 index a5ae4024eee..00000000000 --- a/insecure/corruptlibp2p/libp2p_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package corruptlibp2p - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestGetGossipSubParams(t *testing.T) { - gossipSubParams := getGossipSubParams() - - require.Equal(t, gossipSubParams, gossipSubParams) -} diff --git a/insecure/corruptlibp2p/pubsubAdapter.go b/insecure/corruptlibp2p/pubsub_adapter.go similarity index 88% rename from insecure/corruptlibp2p/pubsubAdapter.go rename to insecure/corruptlibp2p/pubsub_adapter.go index 36606f60a8c..eb321f3492f 100644 --- a/insecure/corruptlibp2p/pubsubAdapter.go +++ b/insecure/corruptlibp2p/pubsub_adapter.go @@ -26,7 +26,7 @@ import ( // totally separated from the rest of the codebase. type CorruptGossipSubAdapter struct { gossipSub *corrupt.PubSub - router *internal.CorruptGossipSubRouter + router *corrupt.GossipSubRouter logger zerolog.Logger } @@ -101,34 +101,26 @@ func (c *CorruptGossipSubAdapter) ListPeers(topic string) []peer.ID { return c.gossipSub.ListPeers(topic) } -func (c *CorruptGossipSubAdapter) GetRouter() *internal.CorruptGossipSubRouter { - return c.router -} - -func NewCorruptGossipSubAdapter(ctx context.Context, logger zerolog.Logger, h host.Host, cfg p2p.PubSubAdapterConfig) (p2p.PubSubAdapter, *internal.CorruptGossipSubRouter, error) { +func NewCorruptGossipSubAdapter(ctx context.Context, logger zerolog.Logger, h host.Host, cfg p2p.PubSubAdapterConfig) (p2p.PubSubAdapter, *corrupt.GossipSubRouter, error) { gossipSubConfig, ok := cfg.(*CorruptPubSubAdapterConfig) if !ok { return nil, nil, fmt.Errorf("invalid gossipsub config type: %T", cfg) } // initializes a default gossipsub router and wraps it with the corrupt router. - router, err := corrupt.DefaultGossipSubRouter(h) - if err != nil { - return nil, nil, fmt.Errorf("failed to create gossipsub router: %w", err) - } - corruptRouter := internal.NewCorruptGossipSubRouter(router) + router := corrupt.DefaultGossipSubRouter(h) // injects the corrupt router into the gossipsub constructor - gossipSub, err := corrupt.NewGossipSubWithRouter(ctx, h, corruptRouter, gossipSubConfig.Build()...) + gossipSub, err := corrupt.NewGossipSubWithRouter(ctx, h, router, gossipSubConfig.Build()...) if err != nil { return nil, nil, fmt.Errorf("failed to create corrupt gossipsub: %w", err) } adapter := &CorruptGossipSubAdapter{ gossipSub: gossipSub, - router: corruptRouter, + router: router, logger: logger, } - return adapter, corruptRouter, nil + return adapter, router, nil } diff --git a/insecure/corruptlibp2p/pubsubAdapterConfig.go b/insecure/corruptlibp2p/pubsub_adapter_config.go similarity index 79% rename from insecure/corruptlibp2p/pubsubAdapterConfig.go rename to insecure/corruptlibp2p/pubsub_adapter_config.go index 32fb95cd4a8..f10ef335326 100644 --- a/insecure/corruptlibp2p/pubsubAdapterConfig.go +++ b/insecure/corruptlibp2p/pubsub_adapter_config.go @@ -22,6 +22,7 @@ import ( // totally separated from the rest of the codebase. type CorruptPubSubAdapterConfig struct { options []corrupt.Option + inspector func(peer.ID, *corrupt.RPC) error withMessageSigning bool withStrictSignatureVerification bool } @@ -44,17 +45,28 @@ func WithStrictSignatureVerification(withStrictSignatureVerification bool) Corru var _ p2p.PubSubAdapterConfig = (*CorruptPubSubAdapterConfig)(nil) +func WithInspector(inspector func(peer.ID, *corrupt.RPC) error) func(config *CorruptPubSubAdapterConfig) { + return func(config *CorruptPubSubAdapterConfig) { + config.inspector = inspector + config.options = append(config.options, corrupt.WithAppSpecificRpcInspector(func(id peer.ID, rpc *corrupt.RPC) error { + return config.inspector(id, rpc) + })) + } +} + func NewCorruptPubSubAdapterConfig(base *p2p.BasePubSubAdapterConfig, opts ...CorruptPubSubAdapterConfigOption) *CorruptPubSubAdapterConfig { config := &CorruptPubSubAdapterConfig{ withMessageSigning: true, withStrictSignatureVerification: true, + options: make([]corrupt.Option, 0), } for _, opt := range opts { opt(config) } - config.options = defaultCorruptPubsubOptions(base, config.withMessageSigning, config.withStrictSignatureVerification) + // Note: we append the default options at the end to make sure that we are not overriding the options provided by the caller. + config.options = append(config.options, defaultCorruptPubsubOptions(base, config.withMessageSigning, config.withStrictSignatureVerification)...) return config } @@ -68,11 +80,11 @@ func (c *CorruptPubSubAdapterConfig) WithSubscriptionFilter(filter p2p.Subscript } func (c *CorruptPubSubAdapterConfig) WithScoreOption(_ p2p.ScoreOptionBuilder) { - // Corrupt does not support score options. This is a no-op. + // CorruptPubSub does not support score options. This is a no-op. } func (c *CorruptPubSubAdapterConfig) WithAppSpecificRpcInspector(_ func(peer.ID, *pubsub.RPC) error) { - // Corrupt does not support app-specific inspector for now. This is a no-op. + // CorruptPubSub receives its inspector at a different time than the original pubsub (i.e., at creation time). } func (c *CorruptPubSubAdapterConfig) WithMessageIdFunction(f func([]byte) string) { diff --git a/insecure/corruptlibp2p/spam_test.go b/insecure/corruptlibp2p/spam_test.go new file mode 100644 index 00000000000..b3a619e0006 --- /dev/null +++ b/insecure/corruptlibp2p/spam_test.go @@ -0,0 +1,88 @@ +package corruptlibp2p_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/network/channels" + + pb "github.com/libp2p/go-libp2p-pubsub/pb" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + corrupt "github.com/yhassanzadeh13/go-libp2p-pubsub" + + "github.com/onflow/flow-go/insecure/corruptlibp2p" + "github.com/onflow/flow-go/insecure/internal" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/network/p2p" + p2ptest "github.com/onflow/flow-go/network/p2p/test" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestSpam_IHave sets up a 2 node test between a victim node and a spammer. The spammer sends a few iHAVE control messages +// to the victim node without being subscribed to any of the same topics. +// The test then checks that the victim node received all the messages from the spammer. +func TestSpam_IHave(t *testing.T) { + const messagesToSpam = 3 + sporkId := unittest.IdentifierFixture() + role := flow.RoleConsensus + + gsrSpammer := corruptlibp2p.NewGossipSubRouterSpammer(t, sporkId, role) + + allSpamIHavesReceived := sync.WaitGroup{} + allSpamIHavesReceived.Add(messagesToSpam) + + var iHaveReceivedCtlMsgs []pb.ControlMessage + victimNode, _ := p2ptest.NodeFixture( + t, + sporkId, + t.Name(), + p2ptest.WithRole(role), + internal.WithCorruptGossipSub(corruptlibp2p.CorruptGossipSubFactory(), + corruptlibp2p.CorruptGossipSubConfigFactoryWithInspector(func(id peer.ID, rpc *corrupt.RPC) error { + iHaves := rpc.GetControl().GetIhave() + if len(iHaves) == 0 { + // don't inspect control messages with no iHAVE messages + return nil + } + iHaveReceivedCtlMsgs = append(iHaveReceivedCtlMsgs, *rpc.GetControl()) + allSpamIHavesReceived.Done() // acknowledge that victim received a message. + return nil + })), + ) + + // starts nodes + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + defer cancel() + nodes := []p2p.LibP2PNode{gsrSpammer.SpammerNode, victimNode} + p2ptest.StartNodes(t, signalerCtx, nodes, 5*time.Second) + defer p2ptest.StopNodes(t, nodes, cancel, 5*time.Second) + + gsrSpammer.Start(t) + + // prior to the test we should ensure that spammer and victim connect. + // this is vital as the spammer will circumvent the normal pubsub subscription mechanism and send iHAVE messages directly to the victim. + // without a prior connection established, directly spamming pubsub messages may cause a race condition in the pubsub implementation. + p2ptest.EnsureConnected(t, ctx, nodes) + p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, func() (interface{}, channels.Topic) { + blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId) + return unittest.ProposalFixture(), blockTopic + }) + + // prepare to spam - generate iHAVE control messages + iHaveSentCtlMsgs := gsrSpammer.GenerateIHaveCtlMessages(t, messagesToSpam, 5) + + // start spamming the victim peer + gsrSpammer.SpamIHave(t, victimNode, iHaveSentCtlMsgs) + + // check that victim received all spam messages + unittest.RequireReturnsBefore(t, allSpamIHavesReceived.Wait, 1*time.Second, "victim did not receive all spam messages") + + // check contents of received messages should match what spammer sent + require.ElementsMatch(t, iHaveReceivedCtlMsgs, iHaveSentCtlMsgs) +} diff --git a/insecure/corruptlibp2p/spammerGossipSub.go b/insecure/corruptlibp2p/spammerGossipSub.go deleted file mode 100644 index 023771e0d30..00000000000 --- a/insecure/corruptlibp2p/spammerGossipSub.go +++ /dev/null @@ -1,32 +0,0 @@ -package corruptlibp2p - -import ( - "github.com/libp2p/go-libp2p/core/peer" - pubsub "github.com/yhassanzadeh13/go-libp2p-pubsub" -) - -type ControlMessage int - -// SpammerGossipSub is a wrapper around the GossipSubRouter that allows us to -// spam the victim with junk control messages. -type SpammerGossipSub struct { - router *pubsub.GossipSubRouter -} - -func NewSpammerGossipSubRouter(router *pubsub.GossipSubRouter) *SpammerGossipSub { - return &SpammerGossipSub{ - router: router, - } -} - -// SpamIHave spams the victim with junk iHave messages. -// msgCount is the number of iHave messages to send. -// msgSize is the number of messageIDs to include in each iHave message. -func (s *SpammerGossipSub) SpamIHave(victim peer.ID, msgCount int, msgSize int) { - for i := 0; i < msgCount; i++ { - ctlIHave := GossipSubCtrlFixture(WithIHave(msgCount, msgSize)) - s.router.SendControl(victim, ctlIHave) - } -} - -// TODO: SpamIWant, SpamGraft, SpamPrune. diff --git a/insecure/go.mod b/insecure/go.mod index c6aa8e2d2b1..a76c78aabf1 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -7,14 +7,14 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/ipfs/go-datastore v0.6.0 github.com/libp2p/go-libp2p v0.23.3 - github.com/libp2p/go-libp2p-pubsub v0.8.2-0.20221201175637-3d2eab35722e + github.com/libp2p/go-libp2p-pubsub v0.8.2 github.com/multiformats/go-multiaddr-dns v0.3.1 github.com/onflow/flow-go v0.0.0-00010101000000-000000000000 github.com/onflow/flow-go/crypto v0.24.4 github.com/rs/zerolog v1.28.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.0 - github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.12-0.20221110181155-60457b3ef6d5 + github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.2-0.20221208234712-b44d9133e4ee google.golang.org/grpc v1.50.1 google.golang.org/protobuf v1.28.1 ) @@ -130,6 +130,7 @@ require ( github.com/klauspost/compress v1.15.10 // indirect github.com/klauspost/cpuid/v2 v2.1.1 // indirect github.com/koron/go-ssdp v0.0.3 // indirect + github.com/libp2p/go-addr-util v0.1.0 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index 8f50e2f19bc..42fc2698337 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -777,6 +777,7 @@ github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdA github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E= github.com/libp2p/go-addr-util v0.1.0 h1:acKsntI33w2bTU7tC9a0SaPimJGfSI0bFKC18ChxeVI= +github.com/libp2p/go-addr-util v0.1.0/go.mod h1:6I3ZYuFr2O/9D+SoyM0zEw0EF3YkldtTX406BpdQMqw= github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= @@ -877,8 +878,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuD github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.8.0 h1:bzTG693TA1Ju/zKmUCQzDLSqiJnyRFVwPpuloZ/OZtI= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.8.2-0.20221201175637-3d2eab35722e h1:phmi6mEoO5y2AQP68+4vZhNpHtZ4dum2ieFtWdmjXak= -github.com/libp2p/go-libp2p-pubsub v0.8.2-0.20221201175637-3d2eab35722e/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw= +github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I= +github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= github.com/libp2p/go-libp2p-record v0.1.0/go.mod h1:ujNc8iuE5dlKWVy6wuL6dd58t0n7xI4hAIl8pE6wu5Q= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= @@ -1483,8 +1484,8 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= -github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.12-0.20221110181155-60457b3ef6d5 h1:c6Ns5fDv1quWqDgIVw+hRRsLfCVTz7tqhSwRP5wjRRo= -github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.12-0.20221110181155-60457b3ef6d5/go.mod h1:lqsC4XUC1754dyyPIJjQ4rhVhmv2SMAgq+1MJUoJyOU= +github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.2-0.20221208234712-b44d9133e4ee h1:yFB2xjfswpuRh8FHagdBMKcBMltjr5u/XKzX6fkJO5E= +github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.2-0.20221208234712-b44d9133e4ee/go.mod h1:Tylw4k1H86gbJx84i3r7qahN/mBaeMpUBvHY0Igshfw= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/insecure/internal/corruptGossipSubRouter.go b/insecure/internal/corruptGossipSubRouter.go deleted file mode 100644 index a203f71deff..00000000000 --- a/insecure/internal/corruptGossipSubRouter.go +++ /dev/null @@ -1,97 +0,0 @@ -package internal - -import ( - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - pubsub "github.com/yhassanzadeh13/go-libp2p-pubsub" -) - -// CorruptGossipSubRouter is a wrapper around GossipSubRouter that allows us to access the internal -// fields of the router for BFT testing and attack implementations. -type CorruptGossipSubRouter struct { - router *pubsub.GossipSubRouter -} - -var _ pubsub.GossipPubSubRouter = (*CorruptGossipSubRouter)(nil) - -func NewCorruptGossipSubRouter(router *pubsub.GossipSubRouter) *CorruptGossipSubRouter { - return &CorruptGossipSubRouter{ - router: router, - } -} - -func (m *CorruptGossipSubRouter) Protocols() []protocol.ID { - return m.router.Protocols() -} - -func (m *CorruptGossipSubRouter) Attach(sub *pubsub.PubSub) { - m.router.Attach(sub) -} - -func (m *CorruptGossipSubRouter) AddPeer(pid peer.ID, protocolId protocol.ID) { - m.router.AddPeer(pid, protocolId) -} - -func (m *CorruptGossipSubRouter) RemovePeer(pid peer.ID) { - m.router.RemovePeer(pid) -} - -func (m *CorruptGossipSubRouter) EnoughPeers(topic string, suggested int) bool { - return m.router.EnoughPeers(topic, suggested) -} - -func (m *CorruptGossipSubRouter) AcceptFrom(pid peer.ID) pubsub.AcceptStatus { - return m.router.AcceptFrom(pid) -} - -func (m *CorruptGossipSubRouter) HandleRPC(rpc *pubsub.RPC) { - m.router.HandleRPC(rpc) -} - -func (m *CorruptGossipSubRouter) Publish(message *pubsub.Message) { - m.router.Publish(message) -} - -func (m *CorruptGossipSubRouter) Join(topic string) { - m.router.Join(topic) -} - -func (m *CorruptGossipSubRouter) Leave(topic string) { - m.router.Leave(topic) -} - -func (m *CorruptGossipSubRouter) SetPeerScore(score *pubsub.PeerScore) { - m.router.SetPeerScore(score) -} - -func (m *CorruptGossipSubRouter) GetPeerScore() *pubsub.PeerScore { - return m.router.GetPeerScore() -} - -func (m *CorruptGossipSubRouter) SetPeerScoreThresholds(thresholds *pubsub.PeerScoreThresholds) { - m.router.SetPeerScoreThresholds(thresholds) -} - -func (m *CorruptGossipSubRouter) SetGossipTracer(tracer *pubsub.GossipTracer) { - m.router.SetGossipTracer(tracer) -} - -func (m *CorruptGossipSubRouter) GetGossipTracer() *pubsub.GossipTracer { - return m.router.GetGossipTracer() -} - -func (m *CorruptGossipSubRouter) GetTagTracer() *pubsub.TagTracer { - return m.router.GetTagTracer() -} - -func (m *CorruptGossipSubRouter) SetDirectPeers(direct map[peer.ID]struct{}) { - m.router.SetDirectPeers(direct) -} - -func (m *CorruptGossipSubRouter) SetPeerGater(gater *pubsub.PeerGater) { - m.router.SetPeerGater(gater) -} - -func (m *CorruptGossipSubRouter) GetPeerGater() *pubsub.PeerGater { - return m.router.GetPeerGater() -} diff --git a/integration/go.mod b/integration/go.mod index 6be5c5f9a5d..fa6ef4761f7 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -161,6 +161,7 @@ require ( github.com/klauspost/compress v1.15.10 // indirect github.com/klauspost/cpuid/v2 v2.1.1 // indirect github.com/koron/go-ssdp v0.0.3 // indirect + github.com/libp2p/go-addr-util v0.1.0 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect @@ -169,7 +170,7 @@ require ( github.com/libp2p/go-libp2p-core v0.20.1 // indirect github.com/libp2p/go-libp2p-kad-dht v0.18.0 // indirect github.com/libp2p/go-libp2p-kbucket v0.4.7 // indirect - github.com/libp2p/go-libp2p-pubsub v0.8.2-0.20221201175637-3d2eab35722e // indirect + github.com/libp2p/go-libp2p-pubsub v0.8.2 // indirect github.com/libp2p/go-libp2p-record v0.2.0 // indirect github.com/libp2p/go-libp2p-tls v0.4.1 // indirect github.com/libp2p/go-msgio v0.2.0 // indirect @@ -261,7 +262,7 @@ require ( github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xanzy/ssh-agent v0.3.0 // indirect - github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.12-0.20221110181155-60457b3ef6d5 // indirect + github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.2-0.20221208234712-b44d9133e4ee // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect github.com/zeebo/blake3 v0.2.3 // indirect go.opencensus.io v0.23.0 // indirect diff --git a/integration/go.sum b/integration/go.sum index 8cb84644473..969a4657dd6 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -902,6 +902,7 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E= github.com/libp2p/go-addr-util v0.1.0 h1:acKsntI33w2bTU7tC9a0SaPimJGfSI0bFKC18ChxeVI= +github.com/libp2p/go-addr-util v0.1.0/go.mod h1:6I3ZYuFr2O/9D+SoyM0zEw0EF3YkldtTX406BpdQMqw= github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= @@ -1002,8 +1003,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuD github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.8.0 h1:bzTG693TA1Ju/zKmUCQzDLSqiJnyRFVwPpuloZ/OZtI= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.8.2-0.20221201175637-3d2eab35722e h1:phmi6mEoO5y2AQP68+4vZhNpHtZ4dum2ieFtWdmjXak= -github.com/libp2p/go-libp2p-pubsub v0.8.2-0.20221201175637-3d2eab35722e/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw= +github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I= +github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= github.com/libp2p/go-libp2p-record v0.1.0/go.mod h1:ujNc8iuE5dlKWVy6wuL6dd58t0n7xI4hAIl8pE6wu5Q= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= @@ -1680,8 +1681,8 @@ github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= -github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.12-0.20221110181155-60457b3ef6d5 h1:c6Ns5fDv1quWqDgIVw+hRRsLfCVTz7tqhSwRP5wjRRo= -github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.12-0.20221110181155-60457b3ef6d5/go.mod h1:lqsC4XUC1754dyyPIJjQ4rhVhmv2SMAgq+1MJUoJyOU= +github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.2-0.20221208234712-b44d9133e4ee h1:yFB2xjfswpuRh8FHagdBMKcBMltjr5u/XKzX6fkJO5E= +github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.2-0.20221208234712-b44d9133e4ee/go.mod h1:Tylw4k1H86gbJx84i3r7qahN/mBaeMpUBvHY0Igshfw= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/network/internal/p2pfixtures/fixtures.go b/network/internal/p2pfixtures/fixtures.go index c76e7412020..7ed95a51295 100644 --- a/network/internal/p2pfixtures/fixtures.go +++ b/network/internal/p2pfixtures/fixtures.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/onflow/flow-go/network/message" + addrutil "github.com/libp2p/go-addr-util" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" @@ -27,7 +29,6 @@ import ( "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/internal/p2putils" "github.com/onflow/flow-go/network/internal/testutils" - "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p" p2pdht "github.com/onflow/flow-go/network/p2p/dht" "github.com/onflow/flow-go/network/p2p/keyutils" @@ -147,46 +148,6 @@ func PeerIdsFixture(t *testing.T, n int) []peer.ID { return peerIDs } -// MustEncodeEvent encodes and returns the given event and fails the test if it faces any issue while encoding. -func MustEncodeEvent(t *testing.T, v interface{}, channel channels.Channel) []byte { - bz, err := unittest.NetworkCodec().Encode(v) - require.NoError(t, err) - - msg := message.Message{ - ChannelID: channel.String(), - Payload: bz, - } - data, err := msg.Marshal() - require.NoError(t, err) - - return data -} - -// SubMustReceiveMessage checks that the subscription have received the given message within the given timeout by the context. -func SubMustReceiveMessage(t *testing.T, ctx context.Context, expectedMessage []byte, sub p2p.Subscription) { - received := make(chan struct{}) - go func() { - msg, err := sub.Next(ctx) - require.NoError(t, err) - require.Equal(t, expectedMessage, msg.Data) - close(received) - }() - - select { - case <-received: - return - case <-ctx.Done(): - require.Fail(t, "timeout on receiving expected pubsub message") - } -} - -// SubsMustReceiveMessage checks that all subscriptions receive the given message within the given timeout by the context. -func SubsMustReceiveMessage(t *testing.T, ctx context.Context, expectedMessage []byte, subs []p2p.Subscription) { - for _, sub := range subs { - SubMustReceiveMessage(t, ctx, expectedMessage, sub) - } -} - // SubMustNeverReceiveAnyMessage checks that the subscription never receives any message within the given timeout by the context. func SubMustNeverReceiveAnyMessage(t *testing.T, ctx context.Context, sub p2p.Subscription) { timeouted := make(chan struct{}) @@ -249,19 +210,6 @@ func AddNodesToEachOthersPeerStore(t *testing.T, nodes []p2p.LibP2PNode, ids flo } } -// EnsureConnected ensures that the given nodes are connected to each other. -// It fails the test if any of the nodes is not connected to any other node. -func EnsureConnected(t *testing.T, ctx context.Context, nodes []p2p.LibP2PNode) { - for _, node := range nodes { - for _, other := range nodes { - if node == other { - continue - } - require.NoError(t, node.Host().Connect(ctx, other.Host().Peerstore().PeerInfo(other.Host().ID()))) - } - } -} - // EnsureNotConnected ensures that no connection exists from "from" nodes to "to" nodes. func EnsureNotConnected(t *testing.T, ctx context.Context, from []p2p.LibP2PNode, to []p2p.LibP2PNode) { for _, node := range from { @@ -282,43 +230,6 @@ func EnsureNotConnectedBetweenGroups(t *testing.T, ctx context.Context, groupA [ EnsureNotConnected(t, ctx, groupB, groupA) } -// EnsurePubsubMessageExchange ensures that the given nodes exchange the given message on the given channel through pubsub. -func EnsurePubsubMessageExchange(t *testing.T, ctx context.Context, nodes []p2p.LibP2PNode, messageFactory func() (interface{}, channels.Topic)) { - _, topic := messageFactory() - - subs := make([]p2p.Subscription, len(nodes)) - slashingViolationsConsumer := unittest.NetworkSlashingViolationsConsumer(unittest.Logger(), metrics.NewNoopCollector()) - for i, node := range nodes { - ps, err := node.Subscribe( - topic, - validator.TopicValidator( - unittest.Logger(), - unittest.NetworkCodec(), - slashingViolationsConsumer, - unittest.AllowAllPeerFilter())) - require.NoError(t, err) - subs[i] = ps - } - - // let subscriptions propagate - time.Sleep(1 * time.Second) - - channel, ok := channels.ChannelFromTopic(topic) - require.True(t, ok) - - for _, node := range nodes { - // creates a unique message to be published by the node - msg, _ := messageFactory() - data := MustEncodeEvent(t, msg, channel) - require.NoError(t, node.Publish(ctx, topic, data)) - - // wait for the message to be received by all nodes - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - SubsMustReceiveMessage(t, ctx, data, subs) - cancel() - } -} - // EnsureNoPubsubMessageExchange ensures that the no pubsub message is exchanged "from" the given nodes "to" the given nodes. func EnsureNoPubsubMessageExchange(t *testing.T, ctx context.Context, from []p2p.LibP2PNode, to []p2p.LibP2PNode, messageFactory func() (interface{}, channels.Topic)) { _, topic := messageFactory() @@ -452,21 +363,6 @@ func EnsureStreamCreation(t *testing.T, ctx context.Context, from []p2p.LibP2PNo } } -// EnsureStreamCreationInBothDirections ensure that between each pair of nodes in the given list, a stream is created in both directions. -func EnsureStreamCreationInBothDirections(t *testing.T, ctx context.Context, nodes []p2p.LibP2PNode) { - for _, this := range nodes { - for _, other := range nodes { - if this == other { - continue - } - // stream creation should pass without error - s, err := this.CreateStream(ctx, other.Host().ID()) - require.NoError(t, err) - require.NotNil(t, s) - } - } -} - // LongStringMessageFactoryFixture returns a function that creates a long unique string message. func LongStringMessageFactoryFixture(t *testing.T) func() string { return func() string { @@ -475,3 +371,43 @@ func LongStringMessageFactoryFixture(t *testing.T) func() string { return fmt.Sprintf("%s %d \n", msg, time.Now().UnixNano()) // add timestamp to make sure we don't send the same message twice } } + +// MustEncodeEvent encodes and returns the given event and fails the test if it faces any issue while encoding. +func MustEncodeEvent(t *testing.T, v interface{}, channel channels.Channel) []byte { + bz, err := unittest.NetworkCodec().Encode(v) + require.NoError(t, err) + + msg := message.Message{ + ChannelID: channel.String(), + Payload: bz, + } + data, err := msg.Marshal() + require.NoError(t, err) + + return data +} + +// SubMustReceiveMessage checks that the subscription have received the given message within the given timeout by the context. +func SubMustReceiveMessage(t *testing.T, ctx context.Context, expectedMessage []byte, sub p2p.Subscription) { + received := make(chan struct{}) + go func() { + msg, err := sub.Next(ctx) + require.NoError(t, err) + require.Equal(t, expectedMessage, msg.Data) + close(received) + }() + + select { + case <-received: + return + case <-ctx.Done(): + require.Fail(t, "timeout on receiving expected pubsub message") + } +} + +// SubsMustReceiveMessage checks that all subscriptions receive the given message within the given timeout by the context. +func SubsMustReceiveMessage(t *testing.T, ctx context.Context, expectedMessage []byte, subs []p2p.Subscription) { + for _, sub := range subs { + SubMustReceiveMessage(t, ctx, expectedMessage, sub) + } +} diff --git a/network/p2p/connection/connection_gater_test.go b/network/p2p/connection/connection_gater_test.go index 95be3671788..7634e4347df 100644 --- a/network/p2p/connection/connection_gater_test.go +++ b/network/p2p/connection/connection_gater_test.go @@ -94,7 +94,7 @@ func TestConnectionGating(t *testing.T) { node2Peers.Add(node1.Host().ID(), struct{}{}) // now both nodes should be able to connect to each other. - p2pfixtures.EnsureStreamCreationInBothDirections(t, ctx, []p2p.LibP2PNode{node1, node2}) + p2ptest.EnsureStreamCreationInBothDirections(t, ctx, []p2p.LibP2PNode{node1, node2}) }) } @@ -273,8 +273,8 @@ func ensureCommunicationSilenceAmongGroups(t *testing.T, ctx context.Context, sp // ensureCommunicationOverAllProtocols ensures that all nodes are connected to each other, and they can exchange messages over the pubsub and unicast. func ensureCommunicationOverAllProtocols(t *testing.T, ctx context.Context, sporkId flow.Identifier, nodes []p2p.LibP2PNode, inbounds []chan string) { - p2pfixtures.EnsureConnected(t, ctx, nodes) - p2pfixtures.EnsurePubsubMessageExchange(t, ctx, nodes, func() (interface{}, channels.Topic) { + p2ptest.EnsureConnected(t, ctx, nodes) + p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, func() (interface{}, channels.Topic) { blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId) return unittest.ProposalFixture(), blockTopic }) diff --git a/network/p2p/test/fixtures.go b/network/p2p/test/fixtures.go index 088766031d7..6cdd751f750 100644 --- a/network/p2p/test/fixtures.go +++ b/network/p2p/test/fixtures.go @@ -21,6 +21,8 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/network/channels" + "github.com/onflow/flow-go/network/internal/p2pfixtures" "github.com/onflow/flow-go/network/internal/testutils" "github.com/onflow/flow-go/network/p2p" "github.com/onflow/flow-go/network/p2p/connection" @@ -29,6 +31,7 @@ import ( "github.com/onflow/flow-go/network/p2p/scoring" "github.com/onflow/flow-go/network/p2p/unicast" "github.com/onflow/flow-go/network/p2p/utils" + validator "github.com/onflow/flow-go/network/validator/pubsub" "github.com/onflow/flow-go/utils/logging" "github.com/onflow/flow-go/utils/unittest" ) @@ -304,3 +307,70 @@ func LetNodesDiscoverEachOther(t *testing.T, ctx context.Context, nodes []p2p.Li } } } + +// EnsureConnected ensures that the given nodes are connected to each other. +// It fails the test if any of the nodes is not connected to any other node. +func EnsureConnected(t *testing.T, ctx context.Context, nodes []p2p.LibP2PNode) { + for _, node := range nodes { + for _, other := range nodes { + if node == other { + continue + } + require.NoError(t, node.Host().Connect(ctx, other.Host().Peerstore().PeerInfo(other.Host().ID()))) + require.Equal(t, node.Host().Network().Connectedness(other.Host().ID()), network.Connected) + } + } +} + +// EnsureStreamCreationInBothDirections ensure that between each pair of nodes in the given list, a stream is created in both directions. +func EnsureStreamCreationInBothDirections(t *testing.T, ctx context.Context, nodes []p2p.LibP2PNode) { + for _, this := range nodes { + for _, other := range nodes { + if this == other { + continue + } + // stream creation should pass without error + s, err := this.CreateStream(ctx, other.Host().ID()) + require.NoError(t, err) + require.NotNil(t, s) + } + } +} + +// EnsurePubsubMessageExchange ensures that the given connected nodes exchange the given message on the given channel through pubsub. +// Note: EnsureConnected() must be called to connect all nodes before calling this function. +func EnsurePubsubMessageExchange(t *testing.T, ctx context.Context, nodes []p2p.LibP2PNode, messageFactory func() (interface{}, channels.Topic)) { + _, topic := messageFactory() + + subs := make([]p2p.Subscription, len(nodes)) + slashingViolationsConsumer := unittest.NetworkSlashingViolationsConsumer(unittest.Logger(), metrics.NewNoopCollector()) + for i, node := range nodes { + ps, err := node.Subscribe( + topic, + validator.TopicValidator( + unittest.Logger(), + unittest.NetworkCodec(), + slashingViolationsConsumer, + unittest.AllowAllPeerFilter())) + require.NoError(t, err) + subs[i] = ps + } + + // let subscriptions propagate + time.Sleep(1 * time.Second) + + channel, ok := channels.ChannelFromTopic(topic) + require.True(t, ok) + + for _, node := range nodes { + // creates a unique message to be published by the node + msg, _ := messageFactory() + data := p2pfixtures.MustEncodeEvent(t, msg, channel) + require.NoError(t, node.Publish(ctx, topic, data)) + + // wait for the message to be received by all nodes + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + p2pfixtures.SubsMustReceiveMessage(t, ctx, data, subs) + cancel() + } +}