Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(peer-exchange): rate limit #1043

Merged
merged 2 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions waku/v2/protocol/peer_exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"go.uber.org/zap"
)

func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error {
params := new(PeerExchangeParameters)
func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...RequestOption) error {
params := new(PeerExchangeRequestParameters)
params.host = wakuPX.h
params.log = wakuPX.log
params.pm = wakuPX.pm
Expand Down Expand Up @@ -103,7 +103,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
return wakuPX.handleResponse(ctx, responseRPC.Response, params)
}

func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse, params *PeerExchangeParameters) error {
func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse, params *PeerExchangeRequestParameters) error {
var discoveredPeers []struct {
addrInfo peer.AddrInfo
enr *enode.Node
Expand Down
1 change: 1 addition & 0 deletions waku/v2/protocol/peer_exchange/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
decodeRPCFailure metricsErrCategory = "decode_rpc_failure"
pxFailure metricsErrCategory = "px_failure"
dialFailure metricsErrCategory = "dial_failure"
rateLimitFailure metricsErrCategory = "ratelimit_failure"
)

// RecordError increases the counter for different error types
Expand Down
18 changes: 17 additions & 1 deletion waku/v2/protocol/peer_exchange/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
"github.com/waku-org/go-waku/waku/v2/service"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier
Expand Down Expand Up @@ -47,12 +48,13 @@ type WakuPeerExchange struct {

peerConnector PeerConnector
enrCache *enrCache
limiter *rate.Limiter
}

// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
// Takes an optional peermanager if WakuPeerExchange is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger) (*WakuPeerExchange, error) {
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger, opts ...Option) (*WakuPeerExchange, error) {
wakuPX := new(WakuPeerExchange)
wakuPX.disc = disc
wakuPX.metrics = newMetrics(reg)
Expand All @@ -62,6 +64,12 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector,
wakuPX.pm = pm
wakuPX.CommonService = service.NewCommonService()

params := &PeerExchangeParameters{}
for _, opt := range opts {
opt(params)
}

wakuPX.limiter = params.limiter
return wakuPX, nil
}

Expand All @@ -87,6 +95,14 @@ func (wakuPX *WakuPeerExchange) start() error {
func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) {
return func(stream network.Stream) {
logger := wakuPX.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))

if wakuPX.limiter != nil && !wakuPX.limiter.Allow() {
wakuPX.metrics.RecordError(rateLimitFailure)
wakuPX.log.Error("exceeds the rate limit")
// TODO: peer exchange protocol should contain an err field
return
}

requestRPC := &pb.PeerExchangeRPC{}
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
err := reader.ReadMsg(requestRPC)
Expand Down
40 changes: 27 additions & 13 deletions waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,23 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type PeerExchangeParameters struct {
limiter *rate.Limiter
}

type Option func(*PeerExchangeParameters)

// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol
func WithRateLimiter(r rate.Limit, b int) Option {
return func(params *PeerExchangeParameters) {
params.limiter = rate.NewLimiter(r, b)
}
}

type PeerExchangeRequestParameters struct {
host host.Host
selectedPeer peer.ID
peerAddr multiaddr.Multiaddr
Expand All @@ -22,11 +36,11 @@ type PeerExchangeParameters struct {
clusterID int
}

type PeerExchangeOption func(*PeerExchangeParameters) error
type RequestOption func(*PeerExchangeRequestParameters) error

// WithPeer is an option used to specify the peerID to fetch peers from
func WithPeer(p peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithPeer(p peer.ID) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.selectedPeer = p
if params.peerAddr != nil {
return errors.New("peerAddr and peerId options are mutually exclusive")
Expand All @@ -38,8 +52,8 @@ func WithPeer(p peer.ID) PeerExchangeOption {
// WithPeerAddr is an option used to specify a peerAddress to fetch peers from
// This new peer will be added to peerStore.
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
func WithPeerAddr(pAddr multiaddr.Multiaddr) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.peerAddr = pAddr
if params.selectedPeer != "" {
return errors.New("peerAddr and peerId options are mutually exclusive")
Expand All @@ -53,8 +67,8 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) PeerExchangeOption {
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
// Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.peerSelectionType = peermanager.Automatic
params.preferredPeers = fromThesePeers
return nil
Expand All @@ -65,24 +79,24 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithFastestPeerSelection(fromThesePeers ...peer.ID) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.peerSelectionType = peermanager.LowestRTT
params.preferredPeers = fromThesePeers
return nil
}
}

// DefaultOptions are the default options to be used when using the lightpush protocol
func DefaultOptions(host host.Host) []PeerExchangeOption {
return []PeerExchangeOption{
func DefaultOptions(host host.Host) []RequestOption {
return []RequestOption{
WithAutomaticPeerSelection(),
}
}

// Use this if you want to filter peers by specific shards
func FilterByShard(clusterID int, shard int) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func FilterByShard(clusterID int, shard int) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.shard = shard
params.clusterID = clusterID
return nil
Expand Down
Loading