Skip to content

Commit

Permalink
feat(peer-exchange): rate limit (#1043)
Browse files Browse the repository at this point in the history
Co-authored-by: Prem Chaitanya Prathi <[email protected]>
  • Loading branch information
richard-ramos and chaitanyaprem authored Mar 11, 2024
1 parent d4bda12 commit 4d828bd
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 17 deletions.
6 changes: 3 additions & 3 deletions waku/v2/protocol/peer_exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"go.uber.org/zap"
)

func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error {
params := new(PeerExchangeParameters)
func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...RequestOption) error {
params := new(PeerExchangeRequestParameters)
params.host = wakuPX.h
params.log = wakuPX.log
params.pm = wakuPX.pm
Expand Down Expand Up @@ -103,7 +103,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
return wakuPX.handleResponse(ctx, responseRPC.Response, params)
}

func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse, params *PeerExchangeParameters) error {
func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse, params *PeerExchangeRequestParameters) error {
var discoveredPeers []struct {
addrInfo peer.AddrInfo
enr *enode.Node
Expand Down
1 change: 1 addition & 0 deletions waku/v2/protocol/peer_exchange/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
decodeRPCFailure metricsErrCategory = "decode_rpc_failure"
pxFailure metricsErrCategory = "px_failure"
dialFailure metricsErrCategory = "dial_failure"
rateLimitFailure metricsErrCategory = "ratelimit_failure"
)

// RecordError increases the counter for different error types
Expand Down
18 changes: 17 additions & 1 deletion waku/v2/protocol/peer_exchange/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
"github.com/waku-org/go-waku/waku/v2/service"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier
Expand Down Expand Up @@ -47,12 +48,13 @@ type WakuPeerExchange struct {

peerConnector PeerConnector
enrCache *enrCache
limiter *rate.Limiter
}

// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
// Takes an optional peermanager if WakuPeerExchange is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger) (*WakuPeerExchange, error) {
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger, opts ...Option) (*WakuPeerExchange, error) {
wakuPX := new(WakuPeerExchange)
wakuPX.disc = disc
wakuPX.metrics = newMetrics(reg)
Expand All @@ -62,6 +64,12 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector,
wakuPX.pm = pm
wakuPX.CommonService = service.NewCommonService()

params := &PeerExchangeParameters{}
for _, opt := range opts {
opt(params)
}

wakuPX.limiter = params.limiter
return wakuPX, nil
}

Expand All @@ -87,6 +95,14 @@ func (wakuPX *WakuPeerExchange) start() error {
func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) {
return func(stream network.Stream) {
logger := wakuPX.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))

if wakuPX.limiter != nil && !wakuPX.limiter.Allow() {
wakuPX.metrics.RecordError(rateLimitFailure)
wakuPX.log.Error("exceeds the rate limit")
// TODO: peer exchange protocol should contain an err field
return
}

requestRPC := &pb.PeerExchangeRPC{}
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
err := reader.ReadMsg(requestRPC)
Expand Down
40 changes: 27 additions & 13 deletions waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,23 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type PeerExchangeParameters struct {
limiter *rate.Limiter
}

type Option func(*PeerExchangeParameters)

// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol
func WithRateLimiter(r rate.Limit, b int) Option {
return func(params *PeerExchangeParameters) {
params.limiter = rate.NewLimiter(r, b)
}
}

type PeerExchangeRequestParameters struct {
host host.Host
selectedPeer peer.ID
peerAddr multiaddr.Multiaddr
Expand All @@ -22,11 +36,11 @@ type PeerExchangeParameters struct {
clusterID int
}

type PeerExchangeOption func(*PeerExchangeParameters) error
type RequestOption func(*PeerExchangeRequestParameters) error

// WithPeer is an option used to specify the peerID to fetch peers from
func WithPeer(p peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithPeer(p peer.ID) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.selectedPeer = p
if params.peerAddr != nil {
return errors.New("peerAddr and peerId options are mutually exclusive")
Expand All @@ -38,8 +52,8 @@ func WithPeer(p peer.ID) PeerExchangeOption {
// WithPeerAddr is an option used to specify a peerAddress to fetch peers from
// This new peer will be added to peerStore.
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
func WithPeerAddr(pAddr multiaddr.Multiaddr) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.peerAddr = pAddr
if params.selectedPeer != "" {
return errors.New("peerAddr and peerId options are mutually exclusive")
Expand All @@ -53,8 +67,8 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) PeerExchangeOption {
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
// Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.peerSelectionType = peermanager.Automatic
params.preferredPeers = fromThesePeers
return nil
Expand All @@ -65,24 +79,24 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithFastestPeerSelection(fromThesePeers ...peer.ID) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.peerSelectionType = peermanager.LowestRTT
params.preferredPeers = fromThesePeers
return nil
}
}

// DefaultOptions are the default options to be used when using the lightpush protocol
func DefaultOptions(host host.Host) []PeerExchangeOption {
return []PeerExchangeOption{
func DefaultOptions(host host.Host) []RequestOption {
return []RequestOption{
WithAutomaticPeerSelection(),
}
}

// Use this if you want to filter peers by specific shards
func FilterByShard(clusterID int, shard int) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func FilterByShard(clusterID int, shard int) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.shard = shard
params.clusterID = clusterID
return nil
Expand Down

0 comments on commit 4d828bd

Please sign in to comment.