Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Collection] Rate limiting transaction by payer #5218

Merged
merged 32 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a648d6f
rate limiting transaction by payer
zhangchiqing Jan 9, 2024
55a324e
add limiter
zhangchiqing Jan 9, 2024
2e0b7c2
add rate limiter
zhangchiqing Jan 10, 2024
df21354
add test cases
zhangchiqing Jan 10, 2024
4481ef5
add burst
zhangchiqing Jan 15, 2024
279a814
fix lint
zhangchiqing Jan 15, 2024
4a6ad47
add rate limiter
zhangchiqing Jan 15, 2024
f02c967
add admin tool for adding rate limiter
zhangchiqing Jan 16, 2024
eb9bc60
use default limiter
zhangchiqing Jan 16, 2024
b8d4ad3
add comments
zhangchiqing Jan 16, 2024
a770036
add flag
zhangchiqing Jan 16, 2024
afd7ad2
Apply suggestions from code review
zhangchiqing Jan 19, 2024
04202a4
limitted -> limited
zhangchiqing Jan 19, 2024
b5e73c4
speed up rate limit tests
zhangchiqing Jan 19, 2024
d7106d6
add comments
zhangchiqing Jan 19, 2024
f5aea8b
add comment
zhangchiqing Jan 19, 2024
a6261d2
admin to config rate limiter
zhangchiqing Jan 19, 2024
02038bc
allow limiter to be configured
zhangchiqing Jan 20, 2024
8501337
update comments
zhangchiqing Jan 20, 2024
2ff49dd
log
zhangchiqing Jan 20, 2024
05e6756
fix readme
zhangchiqing Jan 20, 2024
f30197b
read config
zhangchiqing Jan 20, 2024
49d9467
tmp
zhangchiqing Jan 20, 2024
ef63bdb
update readme
zhangchiqing Jan 20, 2024
1984408
update tests
zhangchiqing Jan 23, 2024
afb775b
Update engine/collection/ingest/rate_limiter.go
zhangchiqing Jan 23, 2024
fb9d5e6
Update access/validator.go
zhangchiqing Jan 23, 2024
cd2cac6
fix lint
zhangchiqing Jan 23, 2024
c4fe1c6
Apply suggestions from code review
zhangchiqing Jan 23, 2024
00598fe
Apply suggestions from code review
zhangchiqing Jan 23, 2024
a9577ed
fix lint
zhangchiqing Jan 24, 2024
e090e15
Apply suggestions from code review
zhangchiqing Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions access/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,11 @@ type InvalidTxByteSizeError struct {
func (e InvalidTxByteSizeError) Error() string {
return fmt.Sprintf("transaction byte size (%d) exceeds the maximum byte size allowed for a transaction (%d)", e.Actual, e.Maximum)
}

type InvalidTxRateLimitedError struct {
Payer flow.Address
}

func (e InvalidTxRateLimitedError) Error() string {
return fmt.Sprintf("transaction rate limited for payer (%s)", e.Payer)
}
54 changes: 54 additions & 0 deletions access/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ func (b *ProtocolStateBlocks) FinalizedHeader() (*flow.Header, error) {
return b.state.Final().Head()
}

// RateLimiter is an interface for checking if an address is rate limited.
// By convention, the address used is the payer field of a transaction.
// This rate limiter is applied when a transaction is first received by a
// node, meaning that if a transaction is rate-limited it will be dropped.
type RateLimiter interface {
jordanschalm marked this conversation as resolved.
Show resolved Hide resolved
// IsRateLimited returns true if the address is rate limited
IsRateLimited(address flow.Address) bool
}

type NoopLimiter struct{}

func NewNoopLimiter() *NoopLimiter {
return &NoopLimiter{}
}

func (l *NoopLimiter) IsRateLimited(address flow.Address) bool {
return false
}

type TransactionValidationOptions struct {
Expiry uint
ExpiryBuffer uint
Expand All @@ -58,6 +77,7 @@ type TransactionValidator struct {
chain flow.Chain // for checking validity of addresses
options TransactionValidationOptions
serviceAccountAddress flow.Address
limiter RateLimiter
}

func NewTransactionValidator(
Expand All @@ -70,10 +90,35 @@ func NewTransactionValidator(
chain: chain,
options: options,
serviceAccountAddress: chain.ServiceAddress(),
limiter: NewNoopLimiter(),
}
}

func NewTransactionValidatorWithLimiter(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a new method, so that the emulator can still depend on the existing NewTransactionValidator method

blocks Blocks,
chain flow.Chain,
options TransactionValidationOptions,
rateLimiter RateLimiter,
) *TransactionValidator {
return &TransactionValidator{
blocks: blocks,
chain: chain,
options: options,
serviceAccountAddress: chain.ServiceAddress(),
limiter: rateLimiter,
}
}

func (v *TransactionValidator) Validate(tx *flow.TransactionBody) (err error) {
// rate limit transactions for specific payers.
// a short term solution to prevent attacks that send too many failed transactions
// if a transaction is from a payer that should be rate limited, all the following
// checks will be skipped
err = v.checkRateLimitPayer(tx)
if err != nil {
return err
}

err = v.checkTxSizeLimit(tx)
if err != nil {
return err
Expand Down Expand Up @@ -119,6 +164,15 @@ func (v *TransactionValidator) Validate(tx *flow.TransactionBody) (err error) {
return nil
}

func (v *TransactionValidator) checkRateLimitPayer(tx *flow.TransactionBody) error {
if v.limiter.IsRateLimited(tx.Payer) {
return InvalidTxRateLimitedError{
Payer: tx.Payer,
}
}
return nil
}

func (v *TransactionValidator) checkTxSizeLimit(tx *flow.TransactionBody) error {
txSize := uint64(tx.ByteSize())
// first check compatibility to collection byte size
Expand Down
9 changes: 9 additions & 0 deletions admin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,12 @@ curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"
```
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "stop-at-height", "data": { "height": 1111, "crash": false }}'
```

### Add/Remove/Get address to rate limit a payer from adding transactions to collection nodes' mempool
```
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "add", "addresses": "a08d349e8037d6e5,e6765c6113547fb7" }}'
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "remove", "addresses": "a08d349e8037d6e5,e6765c6113547fb7" }}'
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "get" }}'
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "get_config" }}'
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "set_config", "limit": 1, "burst": 1 }}'
```
122 changes: 122 additions & 0 deletions admin/commands/collection/tx_rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package collection

import (
"context"
"fmt"

"github.com/rs/zerolog/log"
"golang.org/x/time/rate"

"github.com/onflow/flow-go/admin"
"github.com/onflow/flow-go/admin/commands"
"github.com/onflow/flow-go/engine/collection/ingest"
)

var _ commands.AdminCommand = (*TxRateLimitCommand)(nil)

// TxRateLimitCommand will adjust the transaction ingest rate limiter.
type TxRateLimitCommand struct {
limiter *ingest.AddressRateLimiter
}

type TxRateLimitCommandAddress struct {
Addresses []string
}

func NewTxRateLimitCommand(limiter *ingest.AddressRateLimiter) *TxRateLimitCommand {
return &TxRateLimitCommand{
limiter: limiter,
}
}

func (s *TxRateLimitCommand) Handler(_ context.Context, req *admin.CommandRequest) (interface{}, error) {
jordanschalm marked this conversation as resolved.
Show resolved Hide resolved
input, ok := req.Data.(map[string]interface{})
if !ok {
return admin.NewInvalidAdminReqFormatError("expected { \"command\": \"add|remove|get|get_config|set_config\", \"addresses\": \"addresses\""), nil
}

command, ok := input["command"]
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"command\" field is empty, must be one of add|remove|get|get_config|set_config"), nil
}

cmd, ok := command.(string)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"command\" field is not string, must be one of add|remove|get|get_config|set_config"), nil
}

if cmd == "get" {
list := s.limiter.GetAddresses()
return fmt.Sprintf("rate limited list contains a total of %d addresses: %v", len(list), list), nil
}

if cmd == "add" || cmd == "remove" {
result, ok := input["addresses"]
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"addresses\" field is empty, must be hex formated addresses, can be splitted by \",\""), nil
}
addresses, ok := result.(string)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"addresses\" field is not string, must be hex formated addresses, can be splitted by \",\""), nil
}

log.Info().Msgf("admintool %v addresses: %v", cmd, addresses)

resp, err := s.AddOrRemove(cmd, addresses)
if err != nil {
return nil, err
}
return resp, nil
}

if cmd == "get_config" {
limit, burst := s.limiter.GetLimitConfig()
return fmt.Sprintf("limit: %v, burst: %v", limit, burst), nil
}

if cmd == "set_config" {
dataLimit, limit_ok := input["limit"]
dataBurst, burst_ok := input["burst"]
if !burst_ok || !limit_ok {
return admin.NewInvalidAdminReqErrorf("the \"limit\" or \"burst\" field is empty, must be number"), nil
}
limit, ok := dataLimit.(float64)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"limit\" field is not number: %v", dataLimit), nil
}

burst, ok := dataBurst.(float64)
if !ok {
return admin.NewInvalidAdminReqErrorf("the \"burst\" field is not number: %v", dataBurst), nil
}

oldLimit, oldBurst := s.limiter.GetLimitConfig()
log.Info().Msgf("admintool set_config limit: %v, burst: %v, old limit: %v, old burst: %v", limit, burst, oldLimit, oldBurst)
s.limiter.SetLimitConfig(rate.Limit(limit), int(burst))
return fmt.Sprintf("succesfully set limit %v, burst %v", limit, burst), nil
}

return fmt.Sprintf(
"invalid command field (%s), must be either \"add\" or \"remove\" or \"get\" or \"get_config\" or \"set_config\"",
cmd), nil
}

func (s *TxRateLimitCommand) Validator(req *admin.CommandRequest) error {
return nil
}

func (s *TxRateLimitCommand) AddOrRemove(command string, addresses string) (string, error) {
addrList, err := ingest.ParseAddresses(addresses)
if err != nil {
return "", err
}

if command == "add" {
ingest.AddAddresses(s.limiter, addrList)
return fmt.Sprintf("added %d addresses", len(addrList)), nil
}

// command == "remove"
ingest.RemoveAddresses(s.limiter, addrList)
return fmt.Sprintf("removed %d addresses", len(addrList)), nil
}
33 changes: 33 additions & 0 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"time"

"github.com/spf13/pflag"
"golang.org/x/time/rate"

client "github.com/onflow/flow-go-sdk/access/grpc"
sdkcrypto "github.com/onflow/flow-go-sdk/crypto"
"github.com/onflow/flow-go/admin/commands"
collectionCommands "github.com/onflow/flow-go/admin/commands/collection"
storageCommands "github.com/onflow/flow-go/admin/commands/storage"
"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/cmd/util/cmd/common"
Expand Down Expand Up @@ -83,6 +85,7 @@ func main() {

pools *epochpool.TransactionPools // epoch-scoped transaction pools
followerDistributor *pubsub.FollowerDistributor
addressRateLimiter *ingest.AddressRateLimiter

push *pusher.Engine
ing *ingest.Engine
Expand All @@ -99,6 +102,9 @@ func main() {
accessNodeIDS []string
apiRatelimits map[string]int
apiBurstlimits map[string]int
txRatelimits float64
txBurstlimits int
txRatelimitPayers string
)
var deprecatedFlagBlockRateDelay time.Duration

Expand Down Expand Up @@ -159,6 +165,17 @@ func main() {
flags.StringToIntVar(&apiRatelimits, "api-rate-limits", map[string]int{}, "per second rate limits for GRPC API methods e.g. Ping=300,SendTransaction=500 etc. note limits apply globally to all clients.")
flags.StringToIntVar(&apiBurstlimits, "api-burst-limits", map[string]int{}, "burst limits for gRPC API methods e.g. Ping=100,SendTransaction=100 etc. note limits apply globally to all clients.")

// rate limiting for accounts, default is 2 transactions every 2.5 seconds
zhangchiqing marked this conversation as resolved.
Show resolved Hide resolved
// Note: The rate limit configured for each node may differ from the effective network-wide rate limit
// for a given payer. In particular, the number of clusters and the message propagation factor will
// influence how the individual rate limit translates to a network-wide rate limit.
// For example, suppose we have 5 collection clusters and configure each Collection Node with a rate
// limit of 1 message per second. Then, the effective network-wide rate limit for a payer address would
// be *at least* 5 messages per second.
flags.Float64Var(&txRatelimits, "ingest-tx-rate-limits", 2.5, "per second rate limits for processing transactions for limited account")
flags.IntVar(&txBurstlimits, "ingest-tx-burst-limits", 2, "burst limits for processing transactions for limited account")
flags.StringVar(&txRatelimitPayers, "ingest-tx-rate-limit-payers", "", "comma separated list of accounts to apply rate limiting to")

// deprecated flags
flags.DurationVar(&deprecatedFlagBlockRateDelay, "block-rate-delay", 0, "the delay to broadcast block proposal in order to control block production rate")
}).ValidateFlags(func() error {
Expand All @@ -181,6 +198,21 @@ func main() {

nodeBuilder.
PreInit(cmd.DynamicStartPreInit).
Module("transaction rate limiter", func(node *cmd.NodeConfig) error {
// To be managed by admin tool, and used by ingestion engine
addressRateLimiter = ingest.NewAddressRateLimiter(rate.Limit(txRatelimits), txBurstlimits)
// read the rate limit addresses from flag and add to the rate limiter
addrs, err := ingest.ParseAddresses(txRatelimitPayers)
if err != nil {
return fmt.Errorf("could not parse rate limit addresses: %w", err)
}
ingest.AddAddresses(addressRateLimiter, addrs)

return nil
}).
AdminCommand("ingest-tx-rate-limit", func(node *cmd.NodeConfig) commands.AdminCommand {
return collectionCommands.NewTxRateLimitCommand(addressRateLimiter)
}).
AdminCommand("read-range-cluster-blocks", func(conf *cmd.NodeConfig) commands.AdminCommand {
clusterPayloads := badger.NewClusterPayloads(&metrics.NoopCollector{}, conf.DB)
headers, ok := conf.Storage.Headers.(*badger.Headers)
Expand Down Expand Up @@ -391,6 +423,7 @@ func main() {
node.RootChainID.Chain(),
pools,
ingestConf,
addressRateLimiter,
)
return ing, err
}).
Expand Down
4 changes: 3 additions & 1 deletion engine/collection/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ func New(
chain flow.Chain,
pools *epochs.TransactionPools,
config Config,
limiter *AddressRateLimiter,
) (*Engine, error) {

logger := log.With().Str("engine", "ingest").Logger()

transactionValidator := access.NewTransactionValidator(
transactionValidator := access.NewTransactionValidatorWithLimiter(
access.NewProtocolStateBlocks(state),
chain,
access.TransactionValidationOptions{
Expand All @@ -70,6 +71,7 @@ func New(
MaxTransactionByteSize: config.MaxTransactionByteSize,
MaxCollectionByteSize: config.MaxCollectionByteSize,
},
limiter,
)

// FIFO queue for transactions
Expand Down
3 changes: 2 additions & 1 deletion engine/collection/ingest/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"golang.org/x/time/rate"

"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine"
Expand Down Expand Up @@ -125,7 +126,7 @@ func (suite *Suite) SetupTest() {

suite.conf = DefaultConfig()
chain := flow.Testnet.Chain()
suite.engine, err = New(log, net, suite.state, metrics, metrics, metrics, suite.me, chain, suite.pools, suite.conf)
suite.engine, err = New(log, net, suite.state, metrics, metrics, metrics, suite.me, chain, suite.pools, suite.conf, NewAddressRateLimiter(rate.Limit(1), 1))
suite.Require().NoError(err)
}

Expand Down
Loading
Loading