Skip to content

Commit

Permalink
Merge branch 'dev' into e2e-add-bootstrap-check
Browse files Browse the repository at this point in the history
  • Loading branch information
marun authored Aug 30, 2023
2 parents a0eb69c + 3811802 commit 95edd86
Show file tree
Hide file tree
Showing 12 changed files with 469 additions and 59 deletions.
103 changes: 103 additions & 0 deletions network/p2p/throttler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"sync"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
)

var _ Throttler = (*SlidingWindowThrottler)(nil)

type Throttler interface {
// Handle returns true if a message from [nodeID] should be handled.
Handle(nodeID ids.NodeID) bool
}

// NewSlidingWindowThrottler returns a new instance of SlidingWindowThrottler.
// Nodes are throttled if they exceed [limit] messages during an interval of
// time over [period].
// [period] and [limit] should both be > 0.
func NewSlidingWindowThrottler(period time.Duration, limit int) *SlidingWindowThrottler {
now := time.Now()
return &SlidingWindowThrottler{
period: period,
limit: float64(limit),
windows: [2]window{
{
start: now,
hits: make(map[ids.NodeID]float64),
},
{
start: now.Add(-period),
hits: make(map[ids.NodeID]float64),
},
},
}
}

// window is used internally by SlidingWindowThrottler to represent the amount
// of hits from a node in the evaluation period beginning at [start]
type window struct {
start time.Time
hits map[ids.NodeID]float64
}

// SlidingWindowThrottler is an implementation of the sliding window throttling
// algorithm.
type SlidingWindowThrottler struct {
period time.Duration
limit float64
clock mockable.Clock

lock sync.Mutex
current int
windows [2]window
}

// Handle returns true if the amount of calls received in the last [s.period]
// time is less than [s.limit]
//
// This is calculated by adding the current period's count to a weighted count
// of the previous period.
func (s *SlidingWindowThrottler) Handle(nodeID ids.NodeID) bool {
s.lock.Lock()
defer s.lock.Unlock()

// The current window becomes the previous window if the current evaluation
// period is over
now := s.clock.Time()
sinceUpdate := now.Sub(s.windows[s.current].start)
if sinceUpdate >= 2*s.period {
s.rotate(now.Add(-s.period))
}
if sinceUpdate >= s.period {
s.rotate(now)
sinceUpdate = 0
}

currentHits := s.windows[s.current].hits
current := currentHits[nodeID]
previousFraction := float64(s.period-sinceUpdate) / float64(s.period)
previous := s.windows[1-s.current].hits[nodeID]
estimatedHits := current + previousFraction*previous
if estimatedHits >= s.limit {
// The peer has sent too many requests, drop this request.
return false
}

currentHits[nodeID]++
return true
}

func (s *SlidingWindowThrottler) rotate(t time.Time) {
s.current = 1 - s.current
s.windows[s.current] = window{
start: t,
hits: make(map[ids.NodeID]float64),
}
}
39 changes: 39 additions & 0 deletions network/p2p/throttler_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"context"
"errors"
"fmt"
"time"

"github.com/ava-labs/avalanchego/ids"
)

var (
ErrThrottled = errors.New("throttled")
_ Handler = (*ThrottlerHandler)(nil)
)

type ThrottlerHandler struct {
Handler
Throttler Throttler
}

func (t ThrottlerHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
if !t.Throttler.Handle(nodeID) {
return fmt.Errorf("dropping message from %s: %w", nodeID, ErrThrottled)
}

return t.Handler.AppGossip(ctx, nodeID, gossipBytes)
}

func (t ThrottlerHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
if !t.Throttler.Handle(nodeID) {
return nil, fmt.Errorf("dropping message from %s: %w", nodeID, ErrThrottled)
}

return t.Handler.AppRequest(ctx, nodeID, deadline, requestBytes)
}
74 changes: 74 additions & 0 deletions network/p2p/throttler_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
)

func TestThrottlerHandlerAppGossip(t *testing.T) {
tests := []struct {
name string
Throttler Throttler
expectedErr error
}{
{
name: "throttled",
Throttler: NewSlidingWindowThrottler(time.Second, 1),
},
{
name: "throttler errors",
Throttler: NewSlidingWindowThrottler(time.Second, 0),
expectedErr: ErrThrottled,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)

handler := ThrottlerHandler{
Handler: NoOpHandler{},
Throttler: tt.Throttler,
}
err := handler.AppGossip(context.Background(), ids.GenerateTestNodeID(), []byte("foobar"))
require.ErrorIs(err, tt.expectedErr)
})
}
}

func TestThrottlerHandlerAppRequest(t *testing.T) {
tests := []struct {
name string
Throttler Throttler
expectedErr error
}{
{
name: "throttled",
Throttler: NewSlidingWindowThrottler(time.Second, 1),
},
{
name: "throttler errors",
Throttler: NewSlidingWindowThrottler(time.Second, 0),
expectedErr: ErrThrottled,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)

handler := ThrottlerHandler{
Handler: NoOpHandler{},
Throttler: tt.Throttler,
}
_, err := handler.AppRequest(context.Background(), ids.GenerateTestNodeID(), time.Time{}, []byte("foobar"))
require.ErrorIs(err, tt.expectedErr)
})
}
}
139 changes: 139 additions & 0 deletions network/p2p/throttler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
)

func TestSlidingWindowThrottlerHandle(t *testing.T) {
period := time.Minute
previousWindowStartTime := time.Time{}
currentWindowStartTime := previousWindowStartTime.Add(period)

nodeID := ids.GenerateTestNodeID()

type call struct {
time time.Time
throttled bool
}

tests := []struct {
name string
limit int
calls []call
}{
{
name: "throttled in current window",
limit: 1,
calls: []call{
{
time: currentWindowStartTime,
},
{
time: currentWindowStartTime,
throttled: true,
},
},
},
{
name: "throttled from previous window",
limit: 1,
calls: []call{
{
time: previousWindowStartTime,
},
{
time: currentWindowStartTime,
throttled: true,
},
},
},
{
name: "throttled over multiple evaluation periods",
limit: 5,
calls: []call{
{
time: currentWindowStartTime.Add(30 * time.Second),
},
{
time: currentWindowStartTime.Add(period).Add(1 * time.Second),
},
{
time: currentWindowStartTime.Add(period).Add(2 * time.Second),
},
{
time: currentWindowStartTime.Add(period).Add(3 * time.Second),
},
{
time: currentWindowStartTime.Add(period).Add(4 * time.Second),
},
{
time: currentWindowStartTime.Add(period).Add(30 * time.Second),
},
{
time: currentWindowStartTime.Add(period).Add(30 * time.Second),
throttled: true,
},
{
time: currentWindowStartTime.Add(5 * period),
},
},
},
{
name: "one hit per period",
limit: 2,
calls: []call{
{
time: currentWindowStartTime,
},
{
time: currentWindowStartTime.Add(period).Add(time.Second),
},
{
time: currentWindowStartTime.Add(2 * period).Add(time.Second),
},
{
time: currentWindowStartTime.Add(3 * period).Add(time.Second),
},
{
time: currentWindowStartTime.Add(4 * period).Add(time.Second),
},
},
},
{
// if too much time passes by, a current window might not be a
// valid previous window.
name: "current window needs to be reset",
limit: 1,
calls: []call{
{
time: currentWindowStartTime,
},
{
time: currentWindowStartTime.Add(10 * period),
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
throttler := NewSlidingWindowThrottler(period, tt.limit)
throttler.windows[throttler.current].start = currentWindowStartTime
throttler.windows[1-throttler.current].start = previousWindowStartTime

for _, call := range tt.calls {
throttler.clock.Set(call.time)
require.Equal(call.throttled, !throttler.Handle(nodeID))
}
})
}
}
Loading

0 comments on commit 95edd86

Please sign in to comment.