diff --git a/access/errors.go b/access/errors.go index e23c7a7347b..d4103784919 100644 --- a/access/errors.go +++ b/access/errors.go @@ -90,3 +90,11 @@ type InvalidTxByteSizeError struct { func (e InvalidTxByteSizeError) Error() string { return fmt.Sprintf("transaction byte size (%d) exceeds the maximum byte size allowed for a transaction (%d)", e.Actual, e.Maximum) } + +type InvalidTxRateLimitedError struct { + Payer flow.Address +} + +func (e InvalidTxRateLimitedError) Error() string { + return fmt.Sprintf("transaction rate limited for payer (%s)", e.Payer) +} diff --git a/access/validator.go b/access/validator.go index b59a1539b95..f85881c3414 100644 --- a/access/validator.go +++ b/access/validator.go @@ -42,6 +42,25 @@ func (b *ProtocolStateBlocks) FinalizedHeader() (*flow.Header, error) { return b.state.Final().Head() } +// RateLimiter is an interface for checking if an address is rate limited. +// By convention, the address used is the payer field of a transaction. +// This rate limiter is applied when a transaction is first received by a +// node, meaning that if a transaction is rate-limited it will be dropped. +type RateLimiter interface { + // IsRateLimited returns true if the address is rate limited + IsRateLimited(address flow.Address) bool +} + +type NoopLimiter struct{} + +func NewNoopLimiter() *NoopLimiter { + return &NoopLimiter{} +} + +func (l *NoopLimiter) IsRateLimited(address flow.Address) bool { + return false +} + type TransactionValidationOptions struct { Expiry uint ExpiryBuffer uint @@ -58,6 +77,7 @@ type TransactionValidator struct { chain flow.Chain // for checking validity of addresses options TransactionValidationOptions serviceAccountAddress flow.Address + limiter RateLimiter } func NewTransactionValidator( @@ -70,10 +90,35 @@ func NewTransactionValidator( chain: chain, options: options, serviceAccountAddress: chain.ServiceAddress(), + limiter: NewNoopLimiter(), + } +} + +func NewTransactionValidatorWithLimiter( + blocks Blocks, + chain flow.Chain, + options TransactionValidationOptions, + rateLimiter RateLimiter, +) *TransactionValidator { + return &TransactionValidator{ + blocks: blocks, + chain: chain, + options: options, + serviceAccountAddress: chain.ServiceAddress(), + limiter: rateLimiter, } } func (v *TransactionValidator) Validate(tx *flow.TransactionBody) (err error) { + // rate limit transactions for specific payers. + // a short term solution to prevent attacks that send too many failed transactions + // if a transaction is from a payer that should be rate limited, all the following + // checks will be skipped + err = v.checkRateLimitPayer(tx) + if err != nil { + return err + } + err = v.checkTxSizeLimit(tx) if err != nil { return err @@ -119,6 +164,15 @@ func (v *TransactionValidator) Validate(tx *flow.TransactionBody) (err error) { return nil } +func (v *TransactionValidator) checkRateLimitPayer(tx *flow.TransactionBody) error { + if v.limiter.IsRateLimited(tx.Payer) { + return InvalidTxRateLimitedError{ + Payer: tx.Payer, + } + } + return nil +} + func (v *TransactionValidator) checkTxSizeLimit(tx *flow.TransactionBody) error { txSize := uint64(tx.ByteSize()) // first check compatibility to collection byte size diff --git a/admin/README.md b/admin/README.md index 7e4e5831aa2..1694be1f765 100644 --- a/admin/README.md +++ b/admin/README.md @@ -95,3 +95,12 @@ curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{" ``` curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "stop-at-height", "data": { "height": 1111, "crash": false }}' ``` + +### Add/Remove/Get address to rate limit a payer from adding transactions to collection nodes' mempool +``` +curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "add", "addresses": "a08d349e8037d6e5,e6765c6113547fb7" }}' +curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "remove", "addresses": "a08d349e8037d6e5,e6765c6113547fb7" }}' +curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "get" }}' +curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "get_config" }}' +curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "set_config", "limit": 1, "burst": 1 }}' +``` diff --git a/admin/commands/collection/tx_rate_limiter.go b/admin/commands/collection/tx_rate_limiter.go new file mode 100644 index 00000000000..c767f080156 --- /dev/null +++ b/admin/commands/collection/tx_rate_limiter.go @@ -0,0 +1,122 @@ +package collection + +import ( + "context" + "fmt" + + "github.com/rs/zerolog/log" + "golang.org/x/time/rate" + + "github.com/onflow/flow-go/admin" + "github.com/onflow/flow-go/admin/commands" + "github.com/onflow/flow-go/engine/collection/ingest" +) + +var _ commands.AdminCommand = (*TxRateLimitCommand)(nil) + +// TxRateLimitCommand will adjust the transaction ingest rate limiter. +type TxRateLimitCommand struct { + limiter *ingest.AddressRateLimiter +} + +type TxRateLimitCommandAddress struct { + Addresses []string +} + +func NewTxRateLimitCommand(limiter *ingest.AddressRateLimiter) *TxRateLimitCommand { + return &TxRateLimitCommand{ + limiter: limiter, + } +} + +func (s *TxRateLimitCommand) Handler(_ context.Context, req *admin.CommandRequest) (interface{}, error) { + input, ok := req.Data.(map[string]interface{}) + if !ok { + return admin.NewInvalidAdminReqFormatError("expected { \"command\": \"add|remove|get|get_config|set_config\", \"addresses\": \"addresses\""), nil + } + + command, ok := input["command"] + if !ok { + return admin.NewInvalidAdminReqErrorf("the \"command\" field is empty, must be one of add|remove|get|get_config|set_config"), nil + } + + cmd, ok := command.(string) + if !ok { + return admin.NewInvalidAdminReqErrorf("the \"command\" field is not string, must be one of add|remove|get|get_config|set_config"), nil + } + + if cmd == "get" { + list := s.limiter.GetAddresses() + return fmt.Sprintf("rate limited list contains a total of %d addresses: %v", len(list), list), nil + } + + if cmd == "add" || cmd == "remove" { + result, ok := input["addresses"] + if !ok { + return admin.NewInvalidAdminReqErrorf("the \"addresses\" field is empty, must be hex formated addresses, can be splitted by \",\""), nil + } + addresses, ok := result.(string) + if !ok { + return admin.NewInvalidAdminReqErrorf("the \"addresses\" field is not string, must be hex formated addresses, can be splitted by \",\""), nil + } + + log.Info().Msgf("admintool %v addresses: %v", cmd, addresses) + + resp, err := s.AddOrRemove(cmd, addresses) + if err != nil { + return nil, err + } + return resp, nil + } + + if cmd == "get_config" { + limit, burst := s.limiter.GetLimitConfig() + return fmt.Sprintf("limit: %v, burst: %v", limit, burst), nil + } + + if cmd == "set_config" { + dataLimit, limit_ok := input["limit"] + dataBurst, burst_ok := input["burst"] + if !burst_ok || !limit_ok { + return admin.NewInvalidAdminReqErrorf("the \"limit\" or \"burst\" field is empty, must be number"), nil + } + limit, ok := dataLimit.(float64) + if !ok { + return admin.NewInvalidAdminReqErrorf("the \"limit\" field is not number: %v", dataLimit), nil + } + + burst, ok := dataBurst.(float64) + if !ok { + return admin.NewInvalidAdminReqErrorf("the \"burst\" field is not number: %v", dataBurst), nil + } + + oldLimit, oldBurst := s.limiter.GetLimitConfig() + log.Info().Msgf("admintool set_config limit: %v, burst: %v, old limit: %v, old burst: %v", limit, burst, oldLimit, oldBurst) + s.limiter.SetLimitConfig(rate.Limit(limit), int(burst)) + return fmt.Sprintf("succesfully set limit %v, burst %v", limit, burst), nil + } + + return fmt.Sprintf( + "invalid command field (%s), must be either \"add\" or \"remove\" or \"get\" or \"get_config\" or \"set_config\"", + cmd), nil +} + +func (s *TxRateLimitCommand) Validator(req *admin.CommandRequest) error { + return nil +} + +func (s *TxRateLimitCommand) AddOrRemove(command string, addresses string) (string, error) { + addrList, err := ingest.ParseAddresses(addresses) + if err != nil { + return "", err + } + + if command == "add" { + ingest.AddAddresses(s.limiter, addrList) + return fmt.Sprintf("added %d addresses", len(addrList)), nil + } + + // command == "remove" + ingest.RemoveAddresses(s.limiter, addrList) + return fmt.Sprintf("removed %d addresses", len(addrList)), nil +} diff --git a/cmd/collection/main.go b/cmd/collection/main.go index f867661a711..eda6cf67c32 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -5,10 +5,12 @@ import ( "time" "github.com/spf13/pflag" + "golang.org/x/time/rate" client "github.com/onflow/flow-go-sdk/access/grpc" sdkcrypto "github.com/onflow/flow-go-sdk/crypto" "github.com/onflow/flow-go/admin/commands" + collectionCommands "github.com/onflow/flow-go/admin/commands/collection" storageCommands "github.com/onflow/flow-go/admin/commands/storage" "github.com/onflow/flow-go/cmd" "github.com/onflow/flow-go/cmd/util/cmd/common" @@ -83,6 +85,7 @@ func main() { pools *epochpool.TransactionPools // epoch-scoped transaction pools followerDistributor *pubsub.FollowerDistributor + addressRateLimiter *ingest.AddressRateLimiter push *pusher.Engine ing *ingest.Engine @@ -99,6 +102,9 @@ func main() { accessNodeIDS []string apiRatelimits map[string]int apiBurstlimits map[string]int + txRatelimits float64 + txBurstlimits int + txRatelimitPayers string ) var deprecatedFlagBlockRateDelay time.Duration @@ -159,6 +165,17 @@ func main() { flags.StringToIntVar(&apiRatelimits, "api-rate-limits", map[string]int{}, "per second rate limits for GRPC API methods e.g. Ping=300,SendTransaction=500 etc. note limits apply globally to all clients.") flags.StringToIntVar(&apiBurstlimits, "api-burst-limits", map[string]int{}, "burst limits for gRPC API methods e.g. Ping=100,SendTransaction=100 etc. note limits apply globally to all clients.") + // rate limiting for accounts, default is 2 transactions every 2.5 seconds + // Note: The rate limit configured for each node may differ from the effective network-wide rate limit + // for a given payer. In particular, the number of clusters and the message propagation factor will + // influence how the individual rate limit translates to a network-wide rate limit. + // For example, suppose we have 5 collection clusters and configure each Collection Node with a rate + // limit of 1 message per second. Then, the effective network-wide rate limit for a payer address would + // be *at least* 5 messages per second. + flags.Float64Var(&txRatelimits, "ingest-tx-rate-limits", 2.5, "per second rate limits for processing transactions for limited account") + flags.IntVar(&txBurstlimits, "ingest-tx-burst-limits", 2, "burst limits for processing transactions for limited account") + flags.StringVar(&txRatelimitPayers, "ingest-tx-rate-limit-payers", "", "comma separated list of accounts to apply rate limiting to") + // deprecated flags flags.DurationVar(&deprecatedFlagBlockRateDelay, "block-rate-delay", 0, "the delay to broadcast block proposal in order to control block production rate") }).ValidateFlags(func() error { @@ -181,6 +198,21 @@ func main() { nodeBuilder. PreInit(cmd.DynamicStartPreInit). + Module("transaction rate limiter", func(node *cmd.NodeConfig) error { + // To be managed by admin tool, and used by ingestion engine + addressRateLimiter = ingest.NewAddressRateLimiter(rate.Limit(txRatelimits), txBurstlimits) + // read the rate limit addresses from flag and add to the rate limiter + addrs, err := ingest.ParseAddresses(txRatelimitPayers) + if err != nil { + return fmt.Errorf("could not parse rate limit addresses: %w", err) + } + ingest.AddAddresses(addressRateLimiter, addrs) + + return nil + }). + AdminCommand("ingest-tx-rate-limit", func(node *cmd.NodeConfig) commands.AdminCommand { + return collectionCommands.NewTxRateLimitCommand(addressRateLimiter) + }). AdminCommand("read-range-cluster-blocks", func(conf *cmd.NodeConfig) commands.AdminCommand { clusterPayloads := badger.NewClusterPayloads(&metrics.NoopCollector{}, conf.DB) headers, ok := conf.Storage.Headers.(*badger.Headers) @@ -391,6 +423,7 @@ func main() { node.RootChainID.Chain(), pools, ingestConf, + addressRateLimiter, ) return ing, err }). diff --git a/engine/collection/ingest/engine.go b/engine/collection/ingest/engine.go index d635332ba27..8a9edce3728 100644 --- a/engine/collection/ingest/engine.go +++ b/engine/collection/ingest/engine.go @@ -55,11 +55,12 @@ func New( chain flow.Chain, pools *epochs.TransactionPools, config Config, + limiter *AddressRateLimiter, ) (*Engine, error) { logger := log.With().Str("engine", "ingest").Logger() - transactionValidator := access.NewTransactionValidator( + transactionValidator := access.NewTransactionValidatorWithLimiter( access.NewProtocolStateBlocks(state), chain, access.TransactionValidationOptions{ @@ -70,6 +71,7 @@ func New( MaxTransactionByteSize: config.MaxTransactionByteSize, MaxCollectionByteSize: config.MaxCollectionByteSize, }, + limiter, ) // FIFO queue for transactions diff --git a/engine/collection/ingest/engine_test.go b/engine/collection/ingest/engine_test.go index cdaa33eb7db..fdbd923e506 100644 --- a/engine/collection/ingest/engine_test.go +++ b/engine/collection/ingest/engine_test.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "golang.org/x/time/rate" "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine" @@ -125,7 +126,7 @@ func (suite *Suite) SetupTest() { suite.conf = DefaultConfig() chain := flow.Testnet.Chain() - suite.engine, err = New(log, net, suite.state, metrics, metrics, metrics, suite.me, chain, suite.pools, suite.conf) + suite.engine, err = New(log, net, suite.state, metrics, metrics, metrics, suite.me, chain, suite.pools, suite.conf, NewAddressRateLimiter(rate.Limit(1), 1)) suite.Require().NoError(err) } diff --git a/engine/collection/ingest/rate_limiter.go b/engine/collection/ingest/rate_limiter.go new file mode 100644 index 00000000000..66733ae03cc --- /dev/null +++ b/engine/collection/ingest/rate_limiter.go @@ -0,0 +1,147 @@ +package ingest + +import ( + "strings" + "sync" + + "golang.org/x/time/rate" + + "github.com/onflow/flow-go/model/flow" +) + +// AddressRateLimiter limits the rate of ingested transactions with a given payer address. +type AddressRateLimiter struct { + mu sync.RWMutex + limiters map[flow.Address]*rate.Limiter + limit rate.Limit // X messages allowed per second + burst int // X messages allowed at one time +} + +// AddressRateLimiter limits the rate of ingested transactions with a given payer address. +// It allows the given "limit" amount messages per second with a "burst" amount of messages to be sent at once +// +// for example, +// To config 1 message per 100 milliseconds, convert to per second first, which is 10 message per second, +// so limit is 10 ( rate.Limit(10) ), and burst is 1. +// Note: rate.Limit(0.1), burst = 1 means 1 message per 10 seconds, instead of 1 message per 100 milliseconds. +// +// To config 3 message per minute, the per-second-basis is 0.05 (3/60), so the limit should be rate.Limit(0.05), +// and burst is 3. +// +// Note: The rate limit configured for each node may differ from the effective network-wide rate limit +// for a given payer. In particular, the number of clusters and the message propagation factor will +// influence how the individual rate limit translates to a network-wide rate limit. +// For example, suppose we have 5 collection clusters and configure each Collection Node with a rate +// limit of 1 message per second. Then, the effective network-wide rate limit for a payer address would +// be *at least* 5 messages per second. +func NewAddressRateLimiter(limit rate.Limit, burst int) *AddressRateLimiter { + return &AddressRateLimiter{ + limiters: make(map[flow.Address]*rate.Limiter), + limit: limit, + burst: burst, + } +} + +// Allow returns whether the given address should be allowed (not rate limited) +func (r *AddressRateLimiter) Allow(address flow.Address) bool { + return !r.IsRateLimited(address) +} + +// IsRateLimited returns whether the given address should be rate limited +func (r *AddressRateLimiter) IsRateLimited(address flow.Address) bool { + r.mu.RLock() + limiter, ok := r.limiters[address] + r.mu.RUnlock() + + if !ok { + return false + } + + rateLimited := !limiter.Allow() + return rateLimited +} + +// AddAddress add an address to be rate limited +func (r *AddressRateLimiter) AddAddress(address flow.Address) { + r.mu.Lock() + defer r.mu.Unlock() + + _, ok := r.limiters[address] + if ok { + return + } + + r.limiters[address] = rate.NewLimiter(r.limit, r.burst) +} + +// RemoveAddress remove an address for being rate limited +func (r *AddressRateLimiter) RemoveAddress(address flow.Address) { + r.mu.Lock() + defer r.mu.Unlock() + + delete(r.limiters, address) +} + +// GetAddresses get the list of rate limited address +func (r *AddressRateLimiter) GetAddresses() []flow.Address { + r.mu.RLock() + defer r.mu.RUnlock() + + addresses := make([]flow.Address, 0, len(r.limiters)) + for address := range r.limiters { + addresses = append(addresses, address) + } + + return addresses +} + +// GetLimitConfig get the limit config +func (r *AddressRateLimiter) GetLimitConfig() (rate.Limit, int) { + r.mu.RLock() + defer r.mu.RUnlock() + return r.limit, r.burst +} + +// SetLimitConfig update the limit config +// Note all the existing limiters will be updated, and reset +func (r *AddressRateLimiter) SetLimitConfig(limit rate.Limit, burst int) { + r.mu.Lock() + defer r.mu.Unlock() + + for address := range r.limiters { + r.limiters[address] = rate.NewLimiter(limit, burst) + } + + r.limit = limit + r.burst = burst +} + +// Util functions +func AddAddresses(r *AddressRateLimiter, addresses []flow.Address) { + for _, address := range addresses { + r.AddAddress(address) + } +} + +func RemoveAddresses(r *AddressRateLimiter, addresses []flow.Address) { + for _, address := range addresses { + r.RemoveAddress(address) + } +} + +// parse addresses string into a list of flow addresses +func ParseAddresses(addresses string) ([]flow.Address, error) { + addressList := make([]flow.Address, 0) + for _, addr := range strings.Split(addresses, ",") { + addr = strings.TrimSpace(addr) + if addr == "" { + continue + } + flowAddr, err := flow.StringToAddress(addr) + if err != nil { + return nil, err + } + addressList = append(addressList, flowAddr) + } + return addressList, nil +} diff --git a/engine/collection/ingest/rate_limiter_test.go b/engine/collection/ingest/rate_limiter_test.go new file mode 100644 index 00000000000..20609f59bf6 --- /dev/null +++ b/engine/collection/ingest/rate_limiter_test.go @@ -0,0 +1,185 @@ +package ingest_test + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "golang.org/x/time/rate" + + "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/collection/ingest" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +var _ access.RateLimiter = (*ingest.AddressRateLimiter)(nil) + +func TestLimiterAddRemoveAddress(t *testing.T) { + t.Parallel() + + good1 := unittest.RandomAddressFixture() + limited1 := unittest.RandomAddressFixture() + limited2 := unittest.RandomAddressFixture() + + numPerSec := rate.Limit(1) + burst := 1 + l := ingest.NewAddressRateLimiter(numPerSec, burst) + + require.False(t, l.IsRateLimited(good1)) + require.False(t, l.IsRateLimited(good1)) // address are not limited + + l.AddAddress(limited1) + require.Equal(t, []flow.Address{limited1}, l.GetAddresses()) + + require.False(t, l.IsRateLimited(limited1)) // address 1 is not limited on the first call + require.True(t, l.IsRateLimited(limited1)) // limited on the second call immediately + require.True(t, l.IsRateLimited(limited1)) // limited on the second call immediately + + require.False(t, l.IsRateLimited(good1)) + require.False(t, l.IsRateLimited(good1)) // address are not limited + + l.AddAddress(limited2) + list := l.GetAddresses() + require.Len(t, list, 2) + require.Contains(t, list, limited1, limited2) + + require.False(t, l.IsRateLimited(limited2)) // address 2 is not limited on the first call + require.True(t, l.IsRateLimited(limited2)) // limited on the second call immediately + require.True(t, l.IsRateLimited(limited2)) // limited on the second call immediately + + l.RemoveAddress(limited1) // after remove the limit, it no longer limited + require.False(t, l.IsRateLimited(limited1)) + require.False(t, l.IsRateLimited(limited1)) + + // but limit2 is still limited + require.True(t, l.IsRateLimited(limited2)) +} + +func TestLimiterBurst(t *testing.T) { + t.Parallel() + + limited1 := unittest.RandomAddressFixture() + + numPerSec := rate.Limit(1) + burst := 3 + l := ingest.NewAddressRateLimiter(numPerSec, burst) + + l.AddAddress(limited1) + for i := 0; i < burst; i++ { + require.False(t, l.IsRateLimited(limited1), fmt.Sprintf("%v-nth call", i)) + } + + require.True(t, l.IsRateLimited(limited1)) // limited + require.True(t, l.IsRateLimited(limited1)) // limited +} + +// verify that if wait long enough after rate limited +func TestLimiterWaitLongEnough(t *testing.T) { + t.Parallel() + + addr1 := unittest.RandomAddressFixture() + + // with limit set to 10, it means we allow 10 messages per second, + // and with burst set to 1, it means we only allow 1 message at a time, + // so the limit is 1 message per 100 milliseconds. + // Note rate.Limit(0.1) is not to set 1 message per 100 milliseconds, but + // 1 message per 10 seconds. + numPerSec := rate.Limit(10) + burst := 1 + l := ingest.NewAddressRateLimiter(numPerSec, burst) + + l.AddAddress(addr1) + require.False(t, l.IsRateLimited(addr1)) + require.True(t, l.IsRateLimited(addr1)) + + // check every 10 Millisecond then after 100 Millisecond it should be allowed + require.Eventually(t, func() bool { + return l.Allow(addr1) + }, 110*time.Millisecond, 10*time.Millisecond) + + // block again until another 100 ms + require.True(t, l.IsRateLimited(addr1)) + + // block until another 100 ms + require.Eventually(t, func() bool { + return l.Allow(addr1) + }, 110*time.Millisecond, 10*time.Millisecond) +} + +func TestLimiterConcurrentSafe(t *testing.T) { + t.Parallel() + good1 := unittest.RandomAddressFixture() + limited1 := unittest.RandomAddressFixture() + + numPerSec := rate.Limit(1) + burst := 1 + l := ingest.NewAddressRateLimiter(numPerSec, burst) + + l.AddAddress(limited1) + + wg := sync.WaitGroup{} + wg.Add(2) + + succeed := atomic.NewUint64(0) + go func(wg *sync.WaitGroup) { + defer wg.Done() + ok := l.IsRateLimited(limited1) + if ok { + succeed.Add(1) + } + require.False(t, l.IsRateLimited(good1)) // never limited + }(&wg) + + go func(wg *sync.WaitGroup) { + defer wg.Done() + ok := l.IsRateLimited(limited1) + if ok { + succeed.Add(1) + } + require.False(t, l.IsRateLimited(good1)) // never limited + }(&wg) + + wg.Wait() + require.Equal(t, uint64(1), succeed.Load()) // should only succeed once +} + +func TestLimiterGetSetConfig(t *testing.T) { + t.Parallel() + + addr1 := unittest.RandomAddressFixture() + + // with limit set to 10, it means we allow 10 messages per second, + // and with burst set to 1, it means we only allow 1 message at a time, + // so the limit is 1 message per 100 milliseconds. + // Note rate.Limit(0.1) is not to set 1 message per 100 milliseconds, but + // 1 message per 10 seconds. + numPerSec := rate.Limit(10) + burst := 1 + l := ingest.NewAddressRateLimiter(numPerSec, burst) + + l.AddAddress(addr1) + require.False(t, l.IsRateLimited(addr1)) + require.True(t, l.IsRateLimited(addr1)) + + limitConfig, burstConfig := l.GetLimitConfig() + require.Equal(t, numPerSec, limitConfig) + require.Equal(t, burst, burstConfig) + + // change from 1 message per 100 ms to 4 messages per 200 ms + l.SetLimitConfig(rate.Limit(20), 4) + + // verify the quota is reset, and the new limit is applied + for i := 0; i < 4; i++ { + require.False(t, l.IsRateLimited(addr1), fmt.Sprintf("fail at %v-th call", i)) + } + require.True(t, l.IsRateLimited(addr1)) + + // check every 10 Millisecond then after 100 Millisecond it should be allowed + require.Eventually(t, func() bool { + return l.Allow(addr1) + }, 210*time.Millisecond, 10*time.Millisecond) +} diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index cdbd6526181..1f5e8b65864 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "golang.org/x/time/rate" "github.com/onflow/flow-go/cmd/build" "github.com/onflow/flow-go/consensus" @@ -30,6 +31,7 @@ import ( "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/collection/epochmgr" "github.com/onflow/flow-go/engine/collection/epochmgr/factories" + "github.com/onflow/flow-go/engine/collection/ingest" collectioningest "github.com/onflow/flow-go/engine/collection/ingest" mockcollection "github.com/onflow/flow-go/engine/collection/mock" "github.com/onflow/flow-go/engine/collection/pusher" @@ -303,7 +305,8 @@ func CollectionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ro collections := storage.NewCollections(node.PublicDB, transactions) clusterPayloads := storage.NewClusterPayloads(node.Metrics, node.PublicDB) - ingestionEngine, err := collectioningest.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Metrics, node.Me, node.ChainID.Chain(), pools, collectioningest.DefaultConfig()) + ingestionEngine, err := collectioningest.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Metrics, node.Me, node.ChainID.Chain(), pools, collectioningest.DefaultConfig(), + ingest.NewAddressRateLimiter(rate.Limit(1), 10)) // 10 tps require.NoError(t, err) selector := filter.HasRole(flow.RoleAccess, flow.RoleVerification) diff --git a/model/flow/address.go b/model/flow/address.go index a0b054f28fb..17ae1946c97 100644 --- a/model/flow/address.go +++ b/model/flow/address.go @@ -25,12 +25,21 @@ func ConvertAddress(b [AddressLength]byte) Address { // HexToAddress converts a hex string to an Address. func HexToAddress(h string) Address { + addr, _ := StringToAddress(h) + return addr +} + +// StringToAddress converts a string to an Address and return an error if the string is malformed +func StringToAddress(h string) (Address, error) { trimmed := strings.TrimPrefix(h, "0x") if len(trimmed)%2 == 1 { trimmed = "0" + trimmed } - b, _ := hex.DecodeString(trimmed) - return BytesToAddress(b) + b, err := hex.DecodeString(trimmed) + if err != nil { + return EmptyAddress, fmt.Errorf("can not decode hex string (%v) to address: %w", h, err) + } + return BytesToAddress(b), nil } // BytesToAddress returns Address with value b. diff --git a/module/jobqueue/README.md b/module/jobqueue/README.md index 15562a89703..e36bc060144 100644 --- a/module/jobqueue/README.md +++ b/module/jobqueue/README.md @@ -37,7 +37,7 @@ Job consumer provides the `Check` method for users to notify new jobs available. Once called, job consumer will iterate through each height with the `AtIndex` method. It stops when one of the following condition is true: 1. no job was found at a index -2. no more workers to work on them, which is limitted by the config item `maxProcessing` +2. no more workers to work on them, which is limited by the config item `maxProcessing` `Check` method is concurrent safe, meaning even if job consumer is notified concurrently about new jobs available, job consumer will check at most once to find new jobs.