Skip to content

Commit

Permalink
Add p2p sdk (#1799)
Browse files Browse the repository at this point in the history
Co-authored-by: Stephen Buttolph <[email protected]>
  • Loading branch information
joshua-kim and StephenButtolph authored Aug 2, 2023
1 parent 6b87540 commit 6026279
Show file tree
Hide file tree
Showing 12 changed files with 1,430 additions and 9 deletions.
176 changes: 176 additions & 0 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"context"
"errors"
"fmt"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/set"
)

var (
ErrAppRequestFailed = errors.New("app request failed")
ErrRequestPending = errors.New("request pending")
ErrNoPeers = errors.New("no peers")
)

// AppResponseCallback is called upon receiving an AppResponse for an AppRequest
// issued by Client.
// Callers should check [err] to see whether the AppRequest failed or not.
type AppResponseCallback func(
nodeID ids.NodeID,
responseBytes []byte,
err error,
)

// CrossChainAppResponseCallback is called upon receiving an
// CrossChainAppResponse for a CrossChainAppRequest issued by Client.
// Callers should check [err] to see whether the AppRequest failed or not.
type CrossChainAppResponseCallback func(
chainID ids.ID,
responseBytes []byte,
err error,
)

type Client struct {
handlerPrefix []byte
router *Router
sender common.AppSender
}

// AppRequestAny issues an AppRequest to an arbitrary node decided by Client.
// If a specific node needs to be requested, use AppRequest instead.
// See AppRequest for more docs.
func (c *Client) AppRequestAny(
ctx context.Context,
appRequestBytes []byte,
onResponse AppResponseCallback,
) error {
c.router.lock.RLock()
peers := c.router.peers.Sample(1)
c.router.lock.RUnlock()

if len(peers) != 1 {
return ErrNoPeers
}

nodeIDs := set.Set[ids.NodeID]{
peers[0]: struct{}{},
}
return c.AppRequest(ctx, nodeIDs, appRequestBytes, onResponse)
}

// AppRequest issues an arbitrary request to a node.
// [onResponse] is invoked upon an error or a response.
func (c *Client) AppRequest(
ctx context.Context,
nodeIDs set.Set[ids.NodeID],
appRequestBytes []byte,
onResponse AppResponseCallback,
) error {
c.router.lock.Lock()
defer c.router.lock.Unlock()

appRequestBytes = c.prefixMessage(appRequestBytes)
for nodeID := range nodeIDs {
requestID := c.router.requestID
if _, ok := c.router.pendingAppRequests[requestID]; ok {
return fmt.Errorf(
"failed to issue request with request id %d: %w",
requestID,
ErrRequestPending,
)
}

if err := c.sender.SendAppRequest(
ctx,
set.Set[ids.NodeID]{nodeID: struct{}{}},
requestID,
appRequestBytes,
); err != nil {
return err
}

c.router.pendingAppRequests[requestID] = onResponse
c.router.requestID++
}

return nil
}

// AppGossip sends a gossip message to a random set of peers.
func (c *Client) AppGossip(
ctx context.Context,
appGossipBytes []byte,
) error {
return c.sender.SendAppGossip(
ctx,
c.prefixMessage(appGossipBytes),
)
}

// AppGossipSpecific sends a gossip message to a predetermined set of peers.
func (c *Client) AppGossipSpecific(
ctx context.Context,
nodeIDs set.Set[ids.NodeID],
appGossipBytes []byte,
) error {
return c.sender.SendAppGossipSpecific(
ctx,
nodeIDs,
c.prefixMessage(appGossipBytes),
)
}

// CrossChainAppRequest sends a cross chain app request to another vm.
// [onResponse] is invoked upon an error or a response.
func (c *Client) CrossChainAppRequest(
ctx context.Context,
chainID ids.ID,
appRequestBytes []byte,
onResponse CrossChainAppResponseCallback,
) error {
c.router.lock.Lock()
defer c.router.lock.Unlock()

requestID := c.router.requestID
if _, ok := c.router.pendingCrossChainAppRequests[requestID]; ok {
return fmt.Errorf(
"failed to issue request with request id %d: %w",
requestID,
ErrRequestPending,
)
}

if err := c.sender.SendCrossChainAppRequest(
ctx,
chainID,
c.router.requestID,
c.prefixMessage(appRequestBytes),
); err != nil {
return err
}

c.router.pendingCrossChainAppRequests[requestID] = onResponse
c.router.requestID++

return nil
}

// prefixMessage prefixes the original message with the handler identifier
// corresponding to this client.
//
// Only gossip and request messages need to be prefixed.
// Response messages don't need to be prefixed because request ids are tracked
// which map to the expected response handler.
func (c *Client) prefixMessage(src []byte) []byte {
messageBytes := make([]byte, len(c.handlerPrefix)+len(src))
copy(messageBytes, c.handlerPrefix)
copy(messageBytes[len(c.handlerPrefix):], src)
return messageBytes
}
98 changes: 98 additions & 0 deletions network/p2p/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"context"
"time"

"go.uber.org/zap"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
)

// Handler is the server-side logic for virtual machine application protocols.
type Handler interface {
// AppGossip is called when handling an AppGossip message.
AppGossip(
ctx context.Context,
nodeID ids.NodeID,
gossipBytes []byte,
) error
// AppRequest is called when handling an AppRequest message.
// Returns the bytes for the response corresponding to [requestBytes]
AppRequest(
ctx context.Context,
nodeID ids.NodeID,
deadline time.Time,
requestBytes []byte,
) ([]byte, error)
// CrossChainAppRequest is called when handling a CrossChainAppRequest
// message.
// Returns the bytes for the response corresponding to [requestBytes]
CrossChainAppRequest(
ctx context.Context,
chainID ids.ID,
deadline time.Time,
requestBytes []byte,
) ([]byte, error)
}

// responder automatically sends the response for a given request
type responder struct {
handlerID uint64
handler Handler
log logging.Logger
sender common.AppSender
}

func (r *responder) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) error {
appResponse, err := r.handler.AppRequest(ctx, nodeID, deadline, request)
if err != nil {
r.log.Debug("failed to handle message",
zap.Stringer("messageOp", message.AppRequestOp),
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Time("deadline", deadline),
zap.Uint64("handlerID", r.handlerID),
zap.Binary("message", request),
)
return nil
}

return r.sender.SendAppResponse(ctx, nodeID, requestID, appResponse)
}

func (r *responder) AppGossip(ctx context.Context, nodeID ids.NodeID, msg []byte) error {
err := r.handler.AppGossip(ctx, nodeID, msg)
if err != nil {
r.log.Debug("failed to handle message",
zap.Stringer("messageOp", message.AppGossipOp),
zap.Stringer("nodeID", nodeID),
zap.Uint64("handlerID", r.handlerID),
zap.Binary("message", msg),
)
}
return nil
}

func (r *responder) CrossChainAppRequest(ctx context.Context, chainID ids.ID, requestID uint32, deadline time.Time, request []byte) error {
appResponse, err := r.handler.CrossChainAppRequest(ctx, chainID, deadline, request)
if err != nil {
r.log.Debug("failed to handle message",
zap.Stringer("messageOp", message.CrossChainAppRequestOp),
zap.Stringer("chainID", chainID),
zap.Uint32("requestID", requestID),
zap.Time("deadline", deadline),
zap.Uint64("handlerID", r.handlerID),
zap.Binary("message", request),
)
return nil
}

return r.sender.SendCrossChainAppResponse(ctx, chainID, requestID, appResponse)
}
84 changes: 84 additions & 0 deletions network/p2p/mocks/mock_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6026279

Please sign in to comment.