Skip to content

Commit

Permalink
Migrate x/sync to p2p (#3106)
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Kim <[email protected]>
Co-authored-by: Stephen Buttolph <[email protected]>
Co-authored-by: Darioush Jalali <[email protected]>
  • Loading branch information
3 people authored Sep 25, 2024
1 parent 558f32a commit 0a46687
Show file tree
Hide file tree
Showing 17 changed files with 1,616 additions and 3,436 deletions.
90 changes: 90 additions & 0 deletions network/p2p/p2ptest/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2ptest

import (
"context"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/engine/enginetest"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
)

// NewClient generates a client-server pair and returns the client used to
// communicate with a server with the specified handler
func NewClient(
t *testing.T,
ctx context.Context,
handler p2p.Handler,
clientNodeID ids.NodeID,
serverNodeID ids.NodeID,
) *p2p.Client {
clientSender := &enginetest.Sender{}
serverSender := &enginetest.Sender{}

clientNetwork, err := p2p.NewNetwork(logging.NoLog{}, clientSender, prometheus.NewRegistry(), "")
require.NoError(t, err)

serverNetwork, err := p2p.NewNetwork(logging.NoLog{}, serverSender, prometheus.NewRegistry(), "")
require.NoError(t, err)

clientSender.SendAppGossipF = func(ctx context.Context, _ common.SendConfig, gossipBytes []byte) error {
// Send the request asynchronously to avoid deadlock when the server
// sends the response back to the client
go func() {
require.NoError(t, serverNetwork.AppGossip(ctx, clientNodeID, gossipBytes))
}()

return nil
}

clientSender.SendAppRequestF = func(ctx context.Context, _ set.Set[ids.NodeID], requestID uint32, requestBytes []byte) error {
// Send the request asynchronously to avoid deadlock when the server
// sends the response back to the client
go func() {
require.NoError(t, serverNetwork.AppRequest(ctx, clientNodeID, requestID, time.Time{}, requestBytes))
}()

return nil
}

serverSender.SendAppResponseF = func(ctx context.Context, _ ids.NodeID, requestID uint32, responseBytes []byte) error {
// Send the request asynchronously to avoid deadlock when the server
// sends the response back to the client
go func() {
require.NoError(t, clientNetwork.AppResponse(ctx, serverNodeID, requestID, responseBytes))
}()

return nil
}

serverSender.SendAppErrorF = func(ctx context.Context, _ ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error {
// Send the request asynchronously to avoid deadlock when the server
// sends the response back to the client
go func() {
require.NoError(t, clientNetwork.AppRequestFailed(ctx, serverNodeID, requestID, &common.AppError{
Code: errorCode,
Message: errorMessage,
}))
}()

return nil
}

require.NoError(t, clientNetwork.Connected(ctx, clientNodeID, nil))
require.NoError(t, clientNetwork.Connected(ctx, serverNodeID, nil))
require.NoError(t, serverNetwork.Connected(ctx, clientNodeID, nil))
require.NoError(t, serverNetwork.Connected(ctx, serverNodeID, nil))

require.NoError(t, serverNetwork.AddHandler(0, handler))
return clientNetwork.NewClient(0)
}
110 changes: 110 additions & 0 deletions network/p2p/p2ptest/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2ptest

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/set"
)

func TestNewClient_AppGossip(t *testing.T) {
require := require.New(t)
ctx := context.Background()

appGossipChan := make(chan struct{})
testHandler := p2p.TestHandler{
AppGossipF: func(context.Context, ids.NodeID, []byte) {
close(appGossipChan)
},
}

client := NewClient(t, ctx, testHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID())
require.NoError(client.AppGossip(ctx, common.SendConfig{}, []byte("foobar")))
<-appGossipChan
}

func TestNewClient_AppRequest(t *testing.T) {
tests := []struct {
name string
appResponse []byte
appErr error
appRequestF func(ctx context.Context, client *p2p.Client, onResponse p2p.AppResponseCallback) error
}{
{
name: "AppRequest - response",
appResponse: []byte("foobar"),
appRequestF: func(ctx context.Context, client *p2p.Client, onResponse p2p.AppResponseCallback) error {
return client.AppRequest(ctx, set.Of(ids.GenerateTestNodeID()), []byte("foo"), onResponse)
},
},
{
name: "AppRequest - error",
appErr: &common.AppError{
Code: 123,
Message: "foobar",
},
appRequestF: func(ctx context.Context, client *p2p.Client, onResponse p2p.AppResponseCallback) error {
return client.AppRequest(ctx, set.Of(ids.GenerateTestNodeID()), []byte("foo"), onResponse)
},
},
{
name: "AppRequestAny - response",
appResponse: []byte("foobar"),
appRequestF: func(ctx context.Context, client *p2p.Client, onResponse p2p.AppResponseCallback) error {
return client.AppRequestAny(ctx, []byte("foo"), onResponse)
},
},
{
name: "AppRequestAny - error",
appErr: &common.AppError{
Code: 123,
Message: "foobar",
},
appRequestF: func(ctx context.Context, client *p2p.Client, onResponse p2p.AppResponseCallback) error {
return client.AppRequestAny(ctx, []byte("foo"), onResponse)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
ctx := context.Background()

appRequestChan := make(chan struct{})
testHandler := p2p.TestHandler{
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *common.AppError) {
if tt.appErr != nil {
return nil, &common.AppError{
Code: 123,
Message: tt.appErr.Error(),
}
}

return tt.appResponse, nil
},
}

client := NewClient(t, ctx, testHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID())
require.NoError(tt.appRequestF(
ctx,
client,
func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) {
require.ErrorIs(err, tt.appErr)
require.Equal(tt.appResponse, responseBytes)
close(appRequestChan)
},
))
<-appRequestChan
})
}
}
Loading

0 comments on commit 0a46687

Please sign in to comment.