Skip to content

Commit

Permalink
Merge pull request #5218 from onflow/leo/rate-limit-transaction-by-payer
Browse files Browse the repository at this point in the history
[Collection] Rate limiting transaction by payer
  • Loading branch information
zhangchiqing authored Jan 30, 2024
2 parents 08ca433 + e090e15 commit cc807f3
Show file tree
Hide file tree
Showing 12 changed files with 579 additions and 6 deletions.
8 changes: 8 additions & 0 deletions access/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,11 @@ type InvalidTxByteSizeError struct {
func (e InvalidTxByteSizeError) Error() string {
return fmt.Sprintf("transaction byte size (%d) exceeds the maximum byte size allowed for a transaction (%d)", e.Actual, e.Maximum)
}

type InvalidTxRateLimitedError struct {
Payer flow.Address
}

func (e InvalidTxRateLimitedError) Error() string {
return fmt.Sprintf("transaction rate limited for payer (%s)", e.Payer)
}
54 changes: 54 additions & 0 deletions access/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ func (b *ProtocolStateBlocks) FinalizedHeader() (*flow.Header, error) {
return b.state.Final().Head()
}

// RateLimiter is an interface for checking if an address is rate limited.
// By convention, the address used is the payer field of a transaction.
// This rate limiter is applied when a transaction is first received by a
// node, meaning that if a transaction is rate-limited it will be dropped.
type RateLimiter interface {
// IsRateLimited returns true if the address is rate limited
IsRateLimited(address flow.Address) bool
}

type NoopLimiter struct{}

func NewNoopLimiter() *NoopLimiter {
return &NoopLimiter{}
}

func (l *NoopLimiter) IsRateLimited(address flow.Address) bool {
return false
}

type TransactionValidationOptions struct {
Expiry uint
ExpiryBuffer uint
Expand All @@ -58,6 +77,7 @@ type TransactionValidator struct {
chain flow.Chain // for checking validity of addresses
options TransactionValidationOptions
serviceAccountAddress flow.Address
limiter RateLimiter
}

func NewTransactionValidator(
Expand All @@ -70,10 +90,35 @@ func NewTransactionValidator(
chain: chain,
options: options,
serviceAccountAddress: chain.ServiceAddress(),
limiter: NewNoopLimiter(),
}
}

func NewTransactionValidatorWithLimiter(
blocks Blocks,
chain flow.Chain,
options TransactionValidationOptions,
rateLimiter RateLimiter,
) *TransactionValidator {
return &TransactionValidator{
blocks: blocks,
chain: chain,
options: options,
serviceAccountAddress: chain.ServiceAddress(),
limiter: rateLimiter,
}
}

func (v *TransactionValidator) Validate(tx *flow.TransactionBody) (err error) {
// rate limit transactions for specific payers.
// a short term solution to prevent attacks that send too many failed transactions
// if a transaction is from a payer that should be rate limited, all the following
// checks will be skipped
err = v.checkRateLimitPayer(tx)
if err != nil {
return err
}

err = v.checkTxSizeLimit(tx)
if err != nil {
return err
Expand Down Expand Up @@ -119,6 +164,15 @@ func (v *TransactionValidator) Validate(tx *flow.TransactionBody) (err error) {
return nil
}

func (v *TransactionValidator) checkRateLimitPayer(tx *flow.TransactionBody) error {
if v.limiter.IsRateLimited(tx.Payer) {
return InvalidTxRateLimitedError{
Payer: tx.Payer,
}
}
return nil
}

func (v *TransactionValidator) checkTxSizeLimit(tx *flow.TransactionBody) error {
txSize := uint64(tx.ByteSize())
// first check compatibility to collection byte size
Expand Down
9 changes: 9 additions & 0 deletions admin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,12 @@ curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"
```
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "stop-at-height", "data": { "height": 1111, "crash": false }}'
```

### Add/Remove/Get address to rate limit a payer from adding transactions to collection nodes' mempool
```
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "add", "addresses": "a08d349e8037d6e5,e6765c6113547fb7" }}'
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "remove", "addresses": "a08d349e8037d6e5,e6765c6113547fb7" }}'
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "get" }}'
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "get_config" }}'
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "set_config", "limit": 1, "burst": 1 }}'
```
122 changes: 122 additions & 0 deletions admin/commands/collection/tx_rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package collection

import (
"context"
"fmt"

"github.com/rs/zerolog/log"
"golang.org/x/time/rate"

"github.com/onflow/flow-go/admin"
"github.com/onflow/flow-go/admin/commands"
"github.com/onflow/flow-go/engine/collection/ingest"
)

var _ commands.AdminCommand = (*TxRateLimitCommand)(nil)

// TxRateLimitCommand will adjust the transaction ingest rate limiter.
type TxRateLimitCommand struct {
limiter *ingest.AddressRateLimiter
}

type TxRateLimitCommandAddress struct {
Addresses []string
}

func NewTxRateLimitCommand(limiter *ingest.AddressRateLimiter) *TxRateLimitCommand {
return &TxRateLimitCommand{
limiter: limiter,
}
}

func (s *TxRateLimitCommand) Handler(_ context.Context, req *admin.CommandRequest) (interface{}, error) {
input, ok := req.Data.(map[string]interface{})
if !ok {
return admin.NewInvalidAdminReqFormatError("expected { \"command\": \"add|remove|get|get_config|set_config\", \"addresses\": \"addresses\""), nil
}

command, ok := input["command"]
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"command\" field is empty, must be one of add|remove|get|get_config|set_config"), nil
}

cmd, ok := command.(string)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"command\" field is not string, must be one of add|remove|get|get_config|set_config"), nil
}

if cmd == "get" {
list := s.limiter.GetAddresses()
return fmt.Sprintf("rate limited list contains a total of %d addresses: %v", len(list), list), nil
}

if cmd == "add" || cmd == "remove" {
result, ok := input["addresses"]
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"addresses\" field is empty, must be hex formated addresses, can be splitted by \",\""), nil
}
addresses, ok := result.(string)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"addresses\" field is not string, must be hex formated addresses, can be splitted by \",\""), nil
}

log.Info().Msgf("admintool %v addresses: %v", cmd, addresses)

resp, err := s.AddOrRemove(cmd, addresses)
if err != nil {
return nil, err
}
return resp, nil
}

if cmd == "get_config" {
limit, burst := s.limiter.GetLimitConfig()
return fmt.Sprintf("limit: %v, burst: %v", limit, burst), nil
}

if cmd == "set_config" {
dataLimit, limit_ok := input["limit"]
dataBurst, burst_ok := input["burst"]
if !burst_ok || !limit_ok {
return admin.NewInvalidAdminReqErrorf("the \"limit\" or \"burst\" field is empty, must be number"), nil
}
limit, ok := dataLimit.(float64)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"limit\" field is not number: %v", dataLimit), nil
}

burst, ok := dataBurst.(float64)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"burst\" field is not number: %v", dataBurst), nil
}

oldLimit, oldBurst := s.limiter.GetLimitConfig()
log.Info().Msgf("admintool set_config limit: %v, burst: %v, old limit: %v, old burst: %v", limit, burst, oldLimit, oldBurst)
s.limiter.SetLimitConfig(rate.Limit(limit), int(burst))
return fmt.Sprintf("succesfully set limit %v, burst %v", limit, burst), nil
}

return fmt.Sprintf(
"invalid command field (%s), must be either \"add\" or \"remove\" or \"get\" or \"get_config\" or \"set_config\"",
cmd), nil
}

func (s *TxRateLimitCommand) Validator(req *admin.CommandRequest) error {
return nil
}

func (s *TxRateLimitCommand) AddOrRemove(command string, addresses string) (string, error) {
addrList, err := ingest.ParseAddresses(addresses)
if err != nil {
return "", err
}

if command == "add" {
ingest.AddAddresses(s.limiter, addrList)
return fmt.Sprintf("added %d addresses", len(addrList)), nil
}

// command == "remove"
ingest.RemoveAddresses(s.limiter, addrList)
return fmt.Sprintf("removed %d addresses", len(addrList)), nil
}
33 changes: 33 additions & 0 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"time"

"github.com/spf13/pflag"
"golang.org/x/time/rate"

client "github.com/onflow/flow-go-sdk/access/grpc"
sdkcrypto "github.com/onflow/flow-go-sdk/crypto"
"github.com/onflow/flow-go/admin/commands"
collectionCommands "github.com/onflow/flow-go/admin/commands/collection"
storageCommands "github.com/onflow/flow-go/admin/commands/storage"
"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/cmd/util/cmd/common"
Expand Down Expand Up @@ -83,6 +85,7 @@ func main() {

pools *epochpool.TransactionPools // epoch-scoped transaction pools
followerDistributor *pubsub.FollowerDistributor
addressRateLimiter *ingest.AddressRateLimiter

push *pusher.Engine
ing *ingest.Engine
Expand All @@ -99,6 +102,9 @@ func main() {
accessNodeIDS []string
apiRatelimits map[string]int
apiBurstlimits map[string]int
txRatelimits float64
txBurstlimits int
txRatelimitPayers string
)
var deprecatedFlagBlockRateDelay time.Duration

Expand Down Expand Up @@ -159,6 +165,17 @@ func main() {
flags.StringToIntVar(&apiRatelimits, "api-rate-limits", map[string]int{}, "per second rate limits for GRPC API methods e.g. Ping=300,SendTransaction=500 etc. note limits apply globally to all clients.")
flags.StringToIntVar(&apiBurstlimits, "api-burst-limits", map[string]int{}, "burst limits for gRPC API methods e.g. Ping=100,SendTransaction=100 etc. note limits apply globally to all clients.")

// rate limiting for accounts, default is 2 transactions every 2.5 seconds
// Note: The rate limit configured for each node may differ from the effective network-wide rate limit
// for a given payer. In particular, the number of clusters and the message propagation factor will
// influence how the individual rate limit translates to a network-wide rate limit.
// For example, suppose we have 5 collection clusters and configure each Collection Node with a rate
// limit of 1 message per second. Then, the effective network-wide rate limit for a payer address would
// be *at least* 5 messages per second.
flags.Float64Var(&txRatelimits, "ingest-tx-rate-limits", 2.5, "per second rate limits for processing transactions for limited account")
flags.IntVar(&txBurstlimits, "ingest-tx-burst-limits", 2, "burst limits for processing transactions for limited account")
flags.StringVar(&txRatelimitPayers, "ingest-tx-rate-limit-payers", "", "comma separated list of accounts to apply rate limiting to")

// deprecated flags
flags.DurationVar(&deprecatedFlagBlockRateDelay, "block-rate-delay", 0, "the delay to broadcast block proposal in order to control block production rate")
}).ValidateFlags(func() error {
Expand All @@ -181,6 +198,21 @@ func main() {

nodeBuilder.
PreInit(cmd.DynamicStartPreInit).
Module("transaction rate limiter", func(node *cmd.NodeConfig) error {
// To be managed by admin tool, and used by ingestion engine
addressRateLimiter = ingest.NewAddressRateLimiter(rate.Limit(txRatelimits), txBurstlimits)
// read the rate limit addresses from flag and add to the rate limiter
addrs, err := ingest.ParseAddresses(txRatelimitPayers)
if err != nil {
return fmt.Errorf("could not parse rate limit addresses: %w", err)
}
ingest.AddAddresses(addressRateLimiter, addrs)

return nil
}).
AdminCommand("ingest-tx-rate-limit", func(node *cmd.NodeConfig) commands.AdminCommand {
return collectionCommands.NewTxRateLimitCommand(addressRateLimiter)
}).
AdminCommand("read-range-cluster-blocks", func(conf *cmd.NodeConfig) commands.AdminCommand {
clusterPayloads := badger.NewClusterPayloads(&metrics.NoopCollector{}, conf.DB)
headers, ok := conf.Storage.Headers.(*badger.Headers)
Expand Down Expand Up @@ -391,6 +423,7 @@ func main() {
node.RootChainID.Chain(),
pools,
ingestConf,
addressRateLimiter,
)
return ing, err
}).
Expand Down
4 changes: 3 additions & 1 deletion engine/collection/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ func New(
chain flow.Chain,
pools *epochs.TransactionPools,
config Config,
limiter *AddressRateLimiter,
) (*Engine, error) {

logger := log.With().Str("engine", "ingest").Logger()

transactionValidator := access.NewTransactionValidator(
transactionValidator := access.NewTransactionValidatorWithLimiter(
access.NewProtocolStateBlocks(state),
chain,
access.TransactionValidationOptions{
Expand All @@ -70,6 +71,7 @@ func New(
MaxTransactionByteSize: config.MaxTransactionByteSize,
MaxCollectionByteSize: config.MaxCollectionByteSize,
},
limiter,
)

// FIFO queue for transactions
Expand Down
3 changes: 2 additions & 1 deletion engine/collection/ingest/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"golang.org/x/time/rate"

"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine"
Expand Down Expand Up @@ -125,7 +126,7 @@ func (suite *Suite) SetupTest() {

suite.conf = DefaultConfig()
chain := flow.Testnet.Chain()
suite.engine, err = New(log, net, suite.state, metrics, metrics, metrics, suite.me, chain, suite.pools, suite.conf)
suite.engine, err = New(log, net, suite.state, metrics, metrics, metrics, suite.me, chain, suite.pools, suite.conf, NewAddressRateLimiter(rate.Limit(1), 1))
suite.Require().NoError(err)
}

Expand Down
Loading

0 comments on commit cc807f3

Please sign in to comment.