Skip to content

Commit

Permalink
allow limiter to be configured
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Jan 20, 2024
1 parent bfd2f02 commit 1469927
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 14 deletions.
34 changes: 21 additions & 13 deletions admin/commands/collection/tx_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package collection
import (
"context"
"fmt"
"strconv"

"github.com/onflow/flow-go/admin"
"github.com/onflow/flow-go/admin/commands"
Expand All @@ -25,25 +24,34 @@ func NewTxRateLimitCommand(limiter *ingest.AddressRateLimiter) *TxRateLimitComma
}

func (s *TxRateLimitCommand) Handler(_ context.Context, req *admin.CommandRequest) (interface{}, error) {
input, ok := req.Data.(map[string]string)
input, ok := req.Data.(map[string]interface{})
if !ok {
return admin.NewInvalidAdminReqFormatError("expected { \"command\": \"add|remove|get|get_config|set_config\", \"addresses\": \"addresses\""), nil
}

cmd, ok := input["command"]
command, ok := input["command"]
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"command\" field is empty, must be either \"add\" or \"remove\" or \"get\""), nil
return admin.NewInvalidAdminReqErrorf("the \"command\" field is empty, must be one of add|remove|get|get_config|set_config"), nil
}

cmd, ok := command.(string)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"command\" field is not string, must be one of add|remove|get|get_config|set_config"), nil
}

if cmd == "get" {
return s.limiter.GetAddresses(), nil
}

if cmd == "add" || cmd == "remove" {
addresses, ok := input["addresses"]
result, ok := input["addresses"]
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"addresses\" field is empty, must be hex formated addresses, can be splitted by \",\""), nil
}
addresses, ok := result.(string)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"addresses\" field is not string, must be hex formated addresses, can be splitted by \",\""), nil
}

resp, err := s.AddOrRemove(cmd, addresses)
if err != nil {
Expand All @@ -58,19 +66,19 @@ func (s *TxRateLimitCommand) Handler(_ context.Context, req *admin.CommandReques
}

if cmd == "set_config" {
strLimit, limit_ok := input["limit"]
strBurst, burst_ok := input["burst"]
dataLimit, limit_ok := input["limit"]
dataBurst, burst_ok := input["burst"]
if !limit_ok || !burst_ok {
return admin.NewInvalidAdminReqErrorf("the \"limit\" or \"burst\" field is empty, must be number"), nil
}
limit, err := strconv.ParseFloat(strLimit, 64)
if err == nil {
return admin.NewInvalidAdminReqErrorf("the \"limit\" field is not number: %v", strLimit), nil
limit, ok := dataLimit.(float64)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"limit\" field is not number: %v", dataLimit), nil
}

burst, err := strconv.Atoi(strBurst)
if err == nil {
return admin.NewInvalidAdminReqErrorf("the \"burst\" field is not number: %v", strBurst), nil
burst, ok := dataBurst.(int)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"burst\" field is not number: %v", dataBurst), nil
}

s.limiter.SetLimitConfig(rate.Limit(limit), burst)
Expand Down
44 changes: 43 additions & 1 deletion engine/collection/ingest/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,13 @@ func TestLimiterWaitLongEnough(t *testing.T) {
return l.Allow(addr1)
}, 110*time.Millisecond, 10*time.Millisecond)

// block again until another 100 ms
require.True(t, l.IsRateLimited(addr1))

// block until another 100 ms
require.False(t, l.Allow(addr1))
require.Eventually(t, func() bool {
return l.Allow(addr1)
}, 110*time.Millisecond, 10*time.Millisecond)
}

func TestLimiterConcurrentSafe(t *testing.T) {
Expand Down Expand Up @@ -141,3 +146,40 @@ func TestLimiterConcurrentSafe(t *testing.T) {
wg.Wait()
require.Equal(t, uint64(1), succeed.Load()) // should only succeed once
}

func TestLimiterGetSetConfig(t *testing.T) {
t.Parallel()

addr1 := unittest.RandomAddressFixture()

// with limit set to 10, it means we allow 10 messages per second,
// and with burst set to 1, it means we only allow 1 message at a time,
// so the limit is 1 message per 100 milliseconds.
// Note rate.Limit(0.1) is not to set 1 message per 100 milliseconds, but
// 1 message per 10 seconds.
numPerSec := rate.Limit(10)
burst := 1
l := ingest.NewAddressRateLimiter(numPerSec, burst)

l.AddAddress(addr1)
require.False(t, l.IsRateLimited(addr1))
require.True(t, l.IsRateLimited(addr1))

limitConfig, burstConfig := l.GetLimitConfig()
require.Equal(t, numPerSec, limitConfig)
require.Equal(t, burst, burstConfig)

// change from 1 message per 100 ms to 4 messages per 200 ms
l.SetLimitConfig(rate.Limit(20), 4)

// verify the quota is reset, and the new limit is applied
for i := 0; i < 4; i++ {
require.False(t, l.IsRateLimited(addr1), fmt.Sprintf("fail at %v-th call", i))
}
require.True(t, l.IsRateLimited(addr1))

// check every 10 Millisecond then after 100 Millisecond it should be allowed
require.Eventually(t, func() bool {
return l.Allow(addr1)
}, 210*time.Millisecond, 10*time.Millisecond)
}

0 comments on commit 1469927

Please sign in to comment.