diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 6f629b06d..915ce75fd 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -19,8 +19,8 @@ import ( "go.uber.org/zap" ) -func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error { - params := new(PeerExchangeParameters) +func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...RequestOption) error { + params := new(PeerExchangeRequestParameters) params.host = wakuPX.h params.log = wakuPX.log params.pm = wakuPX.pm @@ -103,7 +103,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts return wakuPX.handleResponse(ctx, responseRPC.Response, params) } -func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse, params *PeerExchangeParameters) error { +func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse, params *PeerExchangeRequestParameters) error { var discoveredPeers []struct { addrInfo peer.AddrInfo enr *enode.Node diff --git a/waku/v2/protocol/peer_exchange/metrics.go b/waku/v2/protocol/peer_exchange/metrics.go index 1a98949a9..4d55db1d4 100644 --- a/waku/v2/protocol/peer_exchange/metrics.go +++ b/waku/v2/protocol/peer_exchange/metrics.go @@ -39,6 +39,7 @@ var ( decodeRPCFailure metricsErrCategory = "decode_rpc_failure" pxFailure metricsErrCategory = "px_failure" dialFailure metricsErrCategory = "dial_failure" + rateLimitFailure metricsErrCategory = "ratelimit_failure" ) // RecordError increases the counter for different error types diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index 3f4a15739..a70b34f61 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -20,6 +20,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" + "golang.org/x/time/rate" ) // PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier @@ -47,12 +48,13 @@ type WakuPeerExchange struct { peerConnector PeerConnector enrCache *enrCache + limiter *rate.Limiter } // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct // Takes an optional peermanager if WakuPeerExchange is being created along with WakuNode. // If using libp2p host, then pass peermanager as nil -func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger) (*WakuPeerExchange, error) { +func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger, opts ...Option) (*WakuPeerExchange, error) { wakuPX := new(WakuPeerExchange) wakuPX.disc = disc wakuPX.metrics = newMetrics(reg) @@ -62,6 +64,12 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, wakuPX.pm = pm wakuPX.CommonService = service.NewCommonService() + params := &PeerExchangeParameters{} + for _, opt := range opts { + opt(params) + } + + wakuPX.limiter = params.limiter return wakuPX, nil } @@ -87,6 +95,14 @@ func (wakuPX *WakuPeerExchange) start() error { func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) { return func(stream network.Stream) { logger := wakuPX.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) + + if wakuPX.limiter != nil && !wakuPX.limiter.Allow() { + wakuPX.metrics.RecordError(rateLimitFailure) + wakuPX.log.Error("exceeds the rate limit") + // TODO: peer exchange protocol should contain an err field + return + } + requestRPC := &pb.PeerExchangeRPC{} reader := pbio.NewDelimitedReader(stream, math.MaxInt32) err := reader.ReadMsg(requestRPC) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index 220bd4f98..c08988091 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -8,9 +8,23 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/peermanager" "go.uber.org/zap" + "golang.org/x/time/rate" ) type PeerExchangeParameters struct { + limiter *rate.Limiter +} + +type Option func(*PeerExchangeParameters) + +// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol +func WithRateLimiter(r rate.Limit, b int) Option { + return func(params *PeerExchangeParameters) { + params.limiter = rate.NewLimiter(r, b) + } +} + +type PeerExchangeRequestParameters struct { host host.Host selectedPeer peer.ID peerAddr multiaddr.Multiaddr @@ -22,11 +36,11 @@ type PeerExchangeParameters struct { clusterID int } -type PeerExchangeOption func(*PeerExchangeParameters) error +type RequestOption func(*PeerExchangeRequestParameters) error // WithPeer is an option used to specify the peerID to fetch peers from -func WithPeer(p peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) error { +func WithPeer(p peer.ID) RequestOption { + return func(params *PeerExchangeRequestParameters) error { params.selectedPeer = p if params.peerAddr != nil { return errors.New("peerAddr and peerId options are mutually exclusive") @@ -38,8 +52,8 @@ func WithPeer(p peer.ID) PeerExchangeOption { // WithPeerAddr is an option used to specify a peerAddress to fetch peers from // This new peer will be added to peerStore. // Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. -func WithPeerAddr(pAddr multiaddr.Multiaddr) PeerExchangeOption { - return func(params *PeerExchangeParameters) error { +func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption { + return func(params *PeerExchangeRequestParameters) error { params.peerAddr = pAddr if params.selectedPeer != "" { return errors.New("peerAddr and peerId options are mutually exclusive") @@ -53,8 +67,8 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) PeerExchangeOption { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore // Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager -func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) error { +func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) RequestOption { + return func(params *PeerExchangeRequestParameters) error { params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers return nil @@ -65,8 +79,8 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore -func WithFastestPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) error { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) RequestOption { + return func(params *PeerExchangeRequestParameters) error { params.peerSelectionType = peermanager.LowestRTT params.preferredPeers = fromThesePeers return nil @@ -74,15 +88,15 @@ func WithFastestPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { } // DefaultOptions are the default options to be used when using the lightpush protocol -func DefaultOptions(host host.Host) []PeerExchangeOption { - return []PeerExchangeOption{ +func DefaultOptions(host host.Host) []RequestOption { + return []RequestOption{ WithAutomaticPeerSelection(), } } // Use this if you want to filter peers by specific shards -func FilterByShard(clusterID int, shard int) PeerExchangeOption { - return func(params *PeerExchangeParameters) error { +func FilterByShard(clusterID int, shard int) RequestOption { + return func(params *PeerExchangeRequestParameters) error { params.shard = shard params.clusterID = clusterID return nil