Skip to content

Commit

Permalink
Merge #3564
Browse files Browse the repository at this point in the history
3564: [BFT Testing] Gossipsub spam test Framework - IHAVE r=gomisha a=gomisha

This is Framework implementation of a spam test using libp2p gossipsub IHAVE messages.

It sets up a 2 node test between a victim node and a spammer. The spammer sends a few IHAVE  control messages to the victim node without being subscribed to any of the same topics. The test then checks that the victim node received all the messages from the spammer.

- initial implementation of a general purpose Spammer that will be used for future libp2p spam testing
- uses recent pubsub improvements like the RPC ingress message inspector (libp2p/go-libp2p-pubsub#509) to detect received messages on the victim node

ref: https://github.com/dapperlabs/flow-go/issues/6455

Co-authored-by: Yahya Hassanzadeh <[email protected]>
Co-authored-by: Misha <[email protected]>
  • Loading branch information
3 people authored Jan 11, 2023
2 parents fdb0ee4 + 1dd0ac7 commit 4a26d2c
Show file tree
Hide file tree
Showing 18 changed files with 380 additions and 304 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ docker-build-access-debug:
docker build -f cmd/Dockerfile --build-arg TARGET=./cmd/access --build-arg COMMIT=$(COMMIT) --build-arg VERSION=$(IMAGE_TAG) --build-arg GOARCH=$(GOARCH) --target debug \
-t "$(CONTAINER_REGISTRY)/access-debug:latest" -t "$(CONTAINER_REGISTRY)/access-debug:$(SHORT_COMMIT)" -t "$(CONTAINER_REGISTRY)/access-debug:$(IMAGE_TAG)" .

# build corrupted access node for BFT testing
# build corrupt access node for BFT testing
.PHONY: docker-build-access-corrupt
docker-build-access-corrupt:
#temporarily make insecure/ a non-module to allow Docker to use corrupt builders there
Expand Down
2 changes: 1 addition & 1 deletion insecure/cmd/mods_override.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cp ./go.mod ./go2.mod
cp ./go.sum ./go2.sum

# inject forked libp2p-pubsub into main module to allow building corrupt Docker images
echo "require github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.12-0.20221110181155-60457b3ef6d5" >> ./go.mod
echo "require github.com/yhassanzadeh13/go-libp2p-pubsub v0.6.2-0.20221208234712-b44d9133e4ee" >> ./go.mod

# update go.sum since added new dependency
go mod tidy
117 changes: 117 additions & 0 deletions insecure/corruptlibp2p/gossipsub_spammer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package corruptlibp2p

import (
"sync"
"testing"
"time"

pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
corrupt "github.com/yhassanzadeh13/go-libp2p-pubsub"

"github.com/onflow/flow-go/insecure/internal"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/network/p2p"
p2ptest "github.com/onflow/flow-go/network/p2p/test"
)

// GossipSubRouterSpammer is a wrapper around the GossipSubRouter that allows us to
// spam the victim with junk control messages.
type GossipSubRouterSpammer struct {
router *atomicRouter
SpammerNode p2p.LibP2PNode
}

// NewGossipSubRouterSpammer is the main method tests call for spamming attacks.
func NewGossipSubRouterSpammer(t *testing.T, sporkId flow.Identifier, role flow.Role) *GossipSubRouterSpammer {
spammerNode, router := createSpammerNode(t, sporkId, role)
return &GossipSubRouterSpammer{
router: router,
SpammerNode: spammerNode,
}
}

// SpamIHave spams the victim with junk iHave messages.
// ctlMessages is the list of spam messages to send to the victim node.
func (s *GossipSubRouterSpammer) SpamIHave(t *testing.T, victim p2p.LibP2PNode, ctlMessages []pb.ControlMessage) {
for _, ctlMessage := range ctlMessages {
require.True(t, s.router.Get().SendControl(victim.Host().ID(), &ctlMessage))
}
}

// GenerateIHaveCtlMessages generates IHAVE control messages before they are sent so the test can prepare
// to expect receiving them before they are sent by the spammer.
func (s *GossipSubRouterSpammer) GenerateIHaveCtlMessages(t *testing.T, msgCount, msgSize int) []pb.ControlMessage {
var iHaveCtlMsgs []pb.ControlMessage
for i := 0; i < msgCount; i++ {
iHaveCtlMsg := GossipSubCtrlFixture(WithIHave(msgCount, msgSize))

iHaves := iHaveCtlMsg.GetIhave()
require.Equal(t, msgCount, len(iHaves))
iHaveCtlMsgs = append(iHaveCtlMsgs, *iHaveCtlMsg)
}
return iHaveCtlMsgs
}

// Start starts the spammer and waits until it is fully initialized before returning.
func (s *GossipSubRouterSpammer) Start(t *testing.T) {
require.Eventuallyf(t, func() bool {
// ensuring the spammer router has been initialized.
// this is needed because the router is initialized asynchronously.
return s.router.Get() != nil
}, 1*time.Second, 100*time.Millisecond, "spammer router not set")
s.router.set(s.router.Get())
}

func createSpammerNode(t *testing.T, sporkId flow.Identifier, role flow.Role) (p2p.LibP2PNode, *atomicRouter) {
router := newAtomicRouter()
spammerNode, _ := p2ptest.NodeFixture(
t,
sporkId,
t.Name(),
p2ptest.WithRole(role),
internal.WithCorruptGossipSub(CorruptGossipSubFactory(func(r *corrupt.GossipSubRouter) {
require.NotNil(t, r)
router.set(r)
}),
CorruptGossipSubConfigFactoryWithInspector(func(id peer.ID, rpc *corrupt.RPC) error {
// here we can inspect the incoming RPC message to the spammer node
return nil
})),
)
return spammerNode, router
}

// atomicRouter is a wrapper around the corrupt.GossipSubRouter that allows atomic access to the router.
// This is done to avoid race conditions when accessing the router from multiple goroutines.
type atomicRouter struct {
mu sync.Mutex
router *corrupt.GossipSubRouter
}

func newAtomicRouter() *atomicRouter {
return &atomicRouter{
mu: sync.Mutex{},
}
}

// SetRouter sets the router if it has never been set. Returns true if the router was set, false otherwise.
func (a *atomicRouter) set(router *corrupt.GossipSubRouter) bool {
a.mu.Lock()
defer a.mu.Unlock()
if a.router == nil {
a.router = router
return true
}
return false
}

// Get returns the router.
func (a *atomicRouter) Get() *corrupt.GossipSubRouter {
a.mu.Lock()
defer a.mu.Unlock()
return a.router
}

// TODO: SpamIWant, SpamGraft, SpamPrune.
11 changes: 0 additions & 11 deletions insecure/corruptlibp2p/libp2p.go

This file was deleted.

32 changes: 21 additions & 11 deletions insecure/corruptlibp2p/libp2p_node_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
corrupt "github.com/yhassanzadeh13/go-libp2p-pubsub"

"github.com/onflow/flow-go/insecure/internal"
"github.com/onflow/flow-go/network/p2p"

madns "github.com/multiformats/go-multiaddr-dns"
Expand Down Expand Up @@ -40,7 +41,7 @@ func NewCorruptLibP2PNodeFactory(
) p2pbuilder.LibP2PFactoryFunc {
return func() (p2p.LibP2PNode, error) {
if chainID != flow.BftTestnet {
panic("illegal chain id for using corruptible conduit factory")
panic("illegal chain id for using corrupt libp2p node")
}

builder := p2pbuilder.DefaultNodeBuilder(
Expand All @@ -67,27 +68,36 @@ func NewCorruptLibP2PNodeFactory(
}
}

// CorruptibleGossipSubFactory returns a factory function that creates a new instance of the forked gossipsub module from
// CorruptGossipSubFactory returns a factory function that creates a new instance of the forked gossipsub module from
// github.com/yhassanzadeh13/go-libp2p-pubsub for the purpose of BFT testing and attack vector implementation.
func CorruptibleGossipSubFactory() (p2pbuilder.GossipSubFactoryFunc, *internal.CorruptGossipSubRouter) {
var rt *internal.CorruptGossipSubRouter
func CorruptGossipSubFactory(routerOpts ...func(*corrupt.GossipSubRouter)) p2pbuilder.GossipSubFactoryFunc {
factory := func(ctx context.Context, logger zerolog.Logger, host host.Host, cfg p2p.PubSubAdapterConfig) (p2p.PubSubAdapter, error) {
adapter, router, err := NewCorruptGossipSubAdapter(ctx, logger, host, cfg)
rt = router
for _, opt := range routerOpts {
opt(router)
}
return adapter, err
}
return factory, rt
return factory
}

// CorruptibleGossipSubConfigFactory returns a factory function that creates a new instance of the forked gossipsub config
// CorruptGossipSubConfigFactory returns a factory function that creates a new instance of the forked gossipsub config
// from github.com/yhassanzadeh13/go-libp2p-pubsub for the purpose of BFT testing and attack vector implementation.
func CorruptibleGossipSubConfigFactory(opts ...CorruptPubSubAdapterConfigOption) p2pbuilder.GossipSubAdapterConfigFunc {
func CorruptGossipSubConfigFactory(opts ...CorruptPubSubAdapterConfigOption) p2pbuilder.GossipSubAdapterConfigFunc {
return func(base *p2p.BasePubSubAdapterConfig) p2p.PubSubAdapterConfig {
return NewCorruptPubSubAdapterConfig(base, opts...)
}
}

// CorruptGossipSubConfigFactoryWithInspector returns a factory function that creates a new instance of the forked gossipsub config
// from github.com/yhassanzadeh13/go-libp2p-pubsub for the purpose of BFT testing and attack vector implementation.
func CorruptGossipSubConfigFactoryWithInspector(inspector func(peer.ID, *corrupt.RPC) error) p2pbuilder.GossipSubAdapterConfigFunc {
return func(base *p2p.BasePubSubAdapterConfig) p2p.PubSubAdapterConfig {
return NewCorruptPubSubAdapterConfig(base, WithInspector(inspector))
}
}

func overrideWithCorruptGossipSub(builder p2pbuilder.NodeBuilder, opts ...CorruptPubSubAdapterConfigOption) {
factory, _ := CorruptibleGossipSubFactory()
builder.SetGossipSubFactory(factory, CorruptibleGossipSubConfigFactory(opts...))
factory := CorruptGossipSubFactory()
builder.SetGossipSubFactory(factory, CorruptGossipSubConfigFactory(opts...))
}
13 changes: 0 additions & 13 deletions insecure/corruptlibp2p/libp2p_test.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// totally separated from the rest of the codebase.
type CorruptGossipSubAdapter struct {
gossipSub *corrupt.PubSub
router *internal.CorruptGossipSubRouter
router *corrupt.GossipSubRouter
logger zerolog.Logger
}

Expand Down Expand Up @@ -101,34 +101,26 @@ func (c *CorruptGossipSubAdapter) ListPeers(topic string) []peer.ID {
return c.gossipSub.ListPeers(topic)
}

func (c *CorruptGossipSubAdapter) GetRouter() *internal.CorruptGossipSubRouter {
return c.router
}

func NewCorruptGossipSubAdapter(ctx context.Context, logger zerolog.Logger, h host.Host, cfg p2p.PubSubAdapterConfig) (p2p.PubSubAdapter, *internal.CorruptGossipSubRouter, error) {
func NewCorruptGossipSubAdapter(ctx context.Context, logger zerolog.Logger, h host.Host, cfg p2p.PubSubAdapterConfig) (p2p.PubSubAdapter, *corrupt.GossipSubRouter, error) {
gossipSubConfig, ok := cfg.(*CorruptPubSubAdapterConfig)
if !ok {
return nil, nil, fmt.Errorf("invalid gossipsub config type: %T", cfg)
}

// initializes a default gossipsub router and wraps it with the corrupt router.
router, err := corrupt.DefaultGossipSubRouter(h)
if err != nil {
return nil, nil, fmt.Errorf("failed to create gossipsub router: %w", err)
}
corruptRouter := internal.NewCorruptGossipSubRouter(router)
router := corrupt.DefaultGossipSubRouter(h)

// injects the corrupt router into the gossipsub constructor
gossipSub, err := corrupt.NewGossipSubWithRouter(ctx, h, corruptRouter, gossipSubConfig.Build()...)
gossipSub, err := corrupt.NewGossipSubWithRouter(ctx, h, router, gossipSubConfig.Build()...)
if err != nil {
return nil, nil, fmt.Errorf("failed to create corrupt gossipsub: %w", err)
}

adapter := &CorruptGossipSubAdapter{
gossipSub: gossipSub,
router: corruptRouter,
router: router,
logger: logger,
}

return adapter, corruptRouter, nil
return adapter, router, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
// totally separated from the rest of the codebase.
type CorruptPubSubAdapterConfig struct {
options []corrupt.Option
inspector func(peer.ID, *corrupt.RPC) error
withMessageSigning bool
withStrictSignatureVerification bool
}
Expand All @@ -44,17 +45,28 @@ func WithStrictSignatureVerification(withStrictSignatureVerification bool) Corru

var _ p2p.PubSubAdapterConfig = (*CorruptPubSubAdapterConfig)(nil)

func WithInspector(inspector func(peer.ID, *corrupt.RPC) error) func(config *CorruptPubSubAdapterConfig) {
return func(config *CorruptPubSubAdapterConfig) {
config.inspector = inspector
config.options = append(config.options, corrupt.WithAppSpecificRpcInspector(func(id peer.ID, rpc *corrupt.RPC) error {
return config.inspector(id, rpc)
}))
}
}

func NewCorruptPubSubAdapterConfig(base *p2p.BasePubSubAdapterConfig, opts ...CorruptPubSubAdapterConfigOption) *CorruptPubSubAdapterConfig {
config := &CorruptPubSubAdapterConfig{
withMessageSigning: true,
withStrictSignatureVerification: true,
options: make([]corrupt.Option, 0),
}

for _, opt := range opts {
opt(config)
}

config.options = defaultCorruptPubsubOptions(base, config.withMessageSigning, config.withStrictSignatureVerification)
// Note: we append the default options at the end to make sure that we are not overriding the options provided by the caller.
config.options = append(config.options, defaultCorruptPubsubOptions(base, config.withMessageSigning, config.withStrictSignatureVerification)...)

return config
}
Expand All @@ -68,11 +80,11 @@ func (c *CorruptPubSubAdapterConfig) WithSubscriptionFilter(filter p2p.Subscript
}

func (c *CorruptPubSubAdapterConfig) WithScoreOption(_ p2p.ScoreOptionBuilder) {
// Corrupt does not support score options. This is a no-op.
// CorruptPubSub does not support score options. This is a no-op.
}

func (c *CorruptPubSubAdapterConfig) WithAppSpecificRpcInspector(_ func(peer.ID, *pubsub.RPC) error) {
// Corrupt does not support app-specific inspector for now. This is a no-op.
// CorruptPubSub receives its inspector at a different time than the original pubsub (i.e., at creation time).
}

func (c *CorruptPubSubAdapterConfig) WithMessageIdFunction(f func([]byte) string) {
Expand Down
Loading

0 comments on commit 4a26d2c

Please sign in to comment.