Skip to content

Commit

Permalink
improve: improving
Browse files Browse the repository at this point in the history
Signed-off-by: Nikolay Nedkov <[email protected]>
  • Loading branch information
Psykepro committed Mar 23, 2023
1 parent 07f8a63 commit 1c12b20
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 42 deletions.
18 changes: 10 additions & 8 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ func start(cliCtx *cli.Context) error {
}

etm := ethtxmanager.New(c.EthTxManager, etherman, ethTxManagerStorage, st)
defaultGasPriceWei := c.L2GasPriceSuggester.DefaultGasPriceWei

for _, component := range components {
switch component {
Expand All @@ -122,21 +121,24 @@ func start(cliCtx *cli.Context) error {
go runAggregator(ctx, c.Aggregator, etherman, etm, st)
case SEQUENCER:
log.Info("Running sequencer")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, defaultGasPriceWei)
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
seq := createSequencer(*c, poolInstance, ethTxManagerStorage, st)
go seq.Start(ctx)
case RPC:
log.Info("Running JSON-RPC server")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, defaultGasPriceWei)
poolInstance.StartPollingMinSuggestedGasPrice(ctx)
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
if c.RPC.EnableL2SuggestedGasPricePolling {
// Needed for rejecting transactions with too low gas price
poolInstance.StartPollingMinSuggestedGasPrice(ctx)
}
apis := map[string]bool{}
for _, a := range cliCtx.StringSlice(config.FlagHTTPAPI) {
apis[a] = true
}
go runJSONRPCServer(*c, poolInstance, st, apis)
case SYNCHRONIZER:
log.Info("Running synchronizer")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, defaultGasPriceWei)
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
go runSynchronizer(*c, etherman, etm, st, poolInstance)
case BROADCAST:
log.Info("Running broadcast service")
Expand All @@ -147,7 +149,7 @@ func start(cliCtx *cli.Context) error {
go etm.Start()
case L2GASPRICER:
log.Info("Running L2 gasPricer")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, defaultGasPriceWei)
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
go runL2GasPriceSuggester(c.L2GasPriceSuggester, st, poolInstance, etherman)
}
}
Expand Down Expand Up @@ -312,13 +314,13 @@ func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDInt
return st
}

func createPool(cfgPool pool.Config, l2BridgeAddr common.Address, l2ChainID uint64, st *state.State, minGasPriceWei uint64) *pool.Pool {
func createPool(cfgPool pool.Config, l2BridgeAddr common.Address, l2ChainID uint64, st *state.State) *pool.Pool {
runPoolMigrations(cfgPool.DB)
poolStorage, err := pgpoolstorage.NewPostgresPoolStorage(cfgPool.DB)
if err != nil {
log.Fatal(err)
}
poolInstance := pool.NewPool(cfgPool, poolStorage, st, l2BridgeAddr, l2ChainID, minGasPriceWei)
poolInstance := pool.NewPool(cfgPool, poolStorage, st, l2BridgeAddr, l2ChainID)
return poolInstance
}

Expand Down
5 changes: 5 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ func Test_Defaults(t *testing.T) {
path: "RPC.WebSockets.Enabled",
expectedValue: false,
},

{
path: "RPC.EnableL2SuggestedGasPricePolling",
expectedValue: true,
},
{
path: "RPC.WebSockets.Port",
expectedValue: 8133,
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ MaxRequestsPerIPAndSecond = 50
SequencerNodeURI = ""
BroadcastURI = "127.0.0.1:61090"
DefaultSenderAddress = "0x1111111111111111111111111111111111111111"
EnableL2SuggestedGasPricePolling = true
[RPC.WebSockets]
Enabled = false
Port = 8133
Expand Down
1 change: 1 addition & 0 deletions config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ MaxRequestsPerIPAndSecond = 5000
SequencerNodeURI = "https://internal.zkevm-test.net:2083/"
BroadcastURI = "internal.zkevm-test.net:61090"
DefaultSenderAddress = "0x1111111111111111111111111111111111111111"
EnableL2SuggestedGasPricePolling = true
[RPC.WebSockets]
Enabled = true
Port = 8546
Expand Down
1 change: 1 addition & 0 deletions config/environments/public/public.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ MaxRequestsPerIPAndSecond = 5000
SequencerNodeURI = "https://rpc.public.zkevm-test.net/"
BroadcastURI = "public-grpc.zkevm-test.net:61090"
DefaultSenderAddress = "0x1111111111111111111111111111111111111111"
EnableL2SuggestedGasPricePolling = false
[RPC.WebSockets]
Enabled = true
Port = 8546
Expand Down
3 changes: 3 additions & 0 deletions jsonrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type Config struct {

// Websockets
WebSockets WebSocketsConfig `mapstructure:"WebSockets"`

// EnableL2SuggestedGasPricePolling enables polling of the L2 gas price to block tx in the RPC with lower gas price.
EnableL2SuggestedGasPricePolling bool `mapstructure:"EnableL2SuggestedGasPricePolling"`
}

// WebSocketsConfig has parameters to config the rpc websocket support
Expand Down
4 changes: 2 additions & 2 deletions pool/pgpoolstorage/pgpoolstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,10 @@ func (p *PostgresPoolStorage) GetGasPrice(ctx context.Context) (uint64, error) {

// MinGasPriceSince returns the min gas price after given timestamp
func (p *PostgresPoolStorage) MinGasPriceSince(ctx context.Context, timestamp time.Time) (uint64, error) {
sql := "SELECT MIN(price) FROM pool.gas_price WHERE \"timestamp\" >= $1 LIMIT 1"
sql := "SELECT COALESCE(MIN(price), 0) FROM pool.gas_price WHERE \"timestamp\" >= $1 LIMIT 1"
var gasPrice uint64
err := p.db.QueryRow(ctx, sql, timestamp).Scan(&gasPrice)
if errors.Is(err, pgx.ErrNoRows) {
if gasPrice == 0 || errors.Is(err, pgx.ErrNoRows) {
return 0, state.ErrNotFound
} else if err != nil {
return 0, err
Expand Down
39 changes: 26 additions & 13 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
const (
// bridgeClaimMethodSignature for tracking bridgeClaimMethodSignature method
bridgeClaimMethodSignature = "0x2cffd02e"
retryInterval = 2 * time.Second
)

var (
Expand All @@ -38,34 +39,45 @@ type Pool struct {
l2BridgeAddr common.Address
chainID uint64
cfg Config
minGasPriceWei *big.Int
minSuggestedGasPrice *big.Int
minSuggestedGasPriceMux *sync.RWMutex
}

// NewPool creates and initializes an instance of Pool
func NewPool(cfg Config, s storage, st stateInterface, l2BridgeAddr common.Address, chainID uint64, minGasPriceWei uint64) *Pool {
func NewPool(cfg Config, s storage, st stateInterface, l2BridgeAddr common.Address, chainID uint64) *Pool {
return &Pool{
cfg: cfg,
storage: s,
state: st,
l2BridgeAddr: l2BridgeAddr,
chainID: chainID,
minGasPriceWei: big.NewInt(0).SetUint64(minGasPriceWei),
minSuggestedGasPriceMux: new(sync.RWMutex),
}
}

// StartPollingMinSuggestedGasPrice starts polling the minimum suggested gas price
func (p *Pool) StartPollingMinSuggestedGasPrice(ctx context.Context) {
p.pollMinSuggestedGasPrice(ctx)
err := p.pollMinSuggestedGasPrice(ctx)
if err != nil && err != state.ErrNotFound {
log.Fatalf("Error polling min suggested gas price: %v", err)
}
// Retrying until we have a valid minSuggestedGasPrice
for err == state.ErrNotFound {
err = p.pollMinSuggestedGasPrice(ctx)
log.Infof("Retrying to poll min suggested gas price ...")
time.Sleep(retryInterval)
}

go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(p.cfg.PollMinSuggestedGasPriceInterval.Duration):
p.pollMinSuggestedGasPrice(ctx)
err = p.pollMinSuggestedGasPrice(ctx)
if err != nil {
log.Errorf("Error polling min suggested gas price: %v", err)
}
}
}
}()
Expand Down Expand Up @@ -193,15 +205,11 @@ func (p *Pool) validateTx(ctx context.Context, tx types.Transaction) error {
return ErrOversizedData
}
// Reject transactions with a gas price lower than the minimum gas price
if tx.GasPrice().Cmp(p.minGasPriceWei) == -1 {
p.minSuggestedGasPriceMux.RLock()
if tx.GasPrice().Cmp(p.minSuggestedGasPrice) == -1 {
return ErrGasPrice
} else {
p.minSuggestedGasPriceMux.RLock()
if p.minSuggestedGasPrice != nil && tx.GasPrice().Cmp(p.minSuggestedGasPrice) == -1 {
return ErrGasPrice
}
p.minSuggestedGasPriceMux.RUnlock()
}
p.minSuggestedGasPriceMux.RUnlock()
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
Expand Down Expand Up @@ -287,18 +295,23 @@ func (p *Pool) validateTx(ctx context.Context, tx types.Transaction) error {
return nil
}

func (p *Pool) pollMinSuggestedGasPrice(ctx context.Context) {
func (p *Pool) pollMinSuggestedGasPrice(ctx context.Context) error {
fromTimestamp := time.Now().UTC().Add(-p.cfg.MinSuggestedGasPriceInterval.Duration)
gasPrice, err := p.storage.MinGasPriceSince(ctx, fromTimestamp)
if err == state.ErrNotFound {
log.Warnf("No suggested min gas price since: %v", fromTimestamp)
return err
} else if err != nil {
log.Errorf("Error getting min gas price since: %v", fromTimestamp)
return err
} else {
p.minSuggestedGasPriceMux.Lock()
p.minSuggestedGasPrice = big.NewInt(0).SetUint64(gasPrice)
p.minSuggestedGasPriceMux.Unlock()
log.Infof("Min suggested gas price updated to: %d", gasPrice)
}

return nil
}

// checkTxFieldCompatibilityWithExecutor checks the field sizes of the transaction to make sure
Expand Down
30 changes: 15 additions & 15 deletions pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func Test_AddTx(t *testing.T) {
}

const chainID = 2576980377
p := pool.NewPool(cfg, s, st, common.Address{}, chainID, gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID)
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -200,7 +200,7 @@ func Test_AddTx_OversizedData(t *testing.T) {
}

const chainID = 2576980377
p := pool.NewPool(cfg, s, st, common.Address{}, chainID, gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID)

b := make([]byte, cfg.MaxTxBytesSize+1)
to := common.HexToAddress(operations.DefaultSequencerAddress)
Expand Down Expand Up @@ -266,7 +266,7 @@ func Test_AddPreEIP155Tx(t *testing.T) {
}

const chainID = 2576980377
p := pool.NewPool(cfg, s, st, common.Address{}, chainID, gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID)
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -338,7 +338,7 @@ func Test_GetPendingTxs(t *testing.T) {
t.Error(err)
}

p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64(), gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64())
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -402,7 +402,7 @@ func Test_GetPendingTxsZeroPassed(t *testing.T) {
if err != nil {
t.Error(err)
}
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64(), gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64())
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -467,7 +467,7 @@ func Test_GetTopPendingTxByProfitabilityAndZkCounters(t *testing.T) {
if err != nil {
t.Error(err)
}
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64(), gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64())
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -532,7 +532,7 @@ func Test_UpdateTxsStatus(t *testing.T) {
if err != nil {
t.Error(err)
}
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64(), gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64())
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -606,7 +606,7 @@ func Test_UpdateTxStatus(t *testing.T) {
if err != nil {
t.Error(err)
}
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64(), gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64())
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -651,7 +651,7 @@ func Test_SetAndGetGasPrice(t *testing.T) {
if err != nil {
t.Error(err)
}
p := pool.NewPool(cfg, s, nil, common.Address{}, chainID.Uint64(), gasPrice.Uint64())
p := pool.NewPool(cfg, s, nil, common.Address{}, chainID.Uint64())

nBig, err := rand.Int(rand.Reader, big.NewInt(0).SetUint64(math.MaxUint64))
if err != nil {
Expand Down Expand Up @@ -702,7 +702,7 @@ func TestGetPendingTxSince(t *testing.T) {
if err != nil {
t.Error(err)
}
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64(), gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64())
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -808,7 +808,7 @@ func Test_DeleteTransactionsByHashes(t *testing.T) {
if err != nil {
t.Error(err)
}
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64(), gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64())
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -968,7 +968,7 @@ func Test_TryAddIncompatibleTxs(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
incompatibleTx := testCase.createIncompatibleTx()
p := pool.NewPool(cfg, s, st, common.Address{}, incompatibleTx.ChainId().Uint64(), gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, incompatibleTx.ChainId().Uint64())
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -1035,7 +1035,7 @@ func Test_AddTxWithIntrinsicGasTooLow(t *testing.T) {
if err != nil {
t.Error(err)
}
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64(), gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64())
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -1184,7 +1184,7 @@ func Test_AddTxWithGasPriceTooLowErr(t *testing.T) {
}

const chainID = 2576980377
p := pool.NewPool(cfg, s, st, common.Address{}, chainID, gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID)
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -1238,7 +1238,7 @@ func Test_AddRevertedTx(t *testing.T) {
if err != nil {
t.Error(err)
}
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64(), gasPrice.Uint64())
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64())
err = p.SetGasPrice(ctx, gasPrice.Uint64())
if err != nil {
t.Error(err)
Expand Down
2 changes: 1 addition & 1 deletion test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ run: ## Runs a full node
sleep 2
$(RUNETHTXMANAGER)
$(RUNSEQUENCER)
$(RUNL2GASPRICER)
#$(RUNL2GASPRICER)
$(RUNAGGREGATOR)
$(RUNJSONRPC)

Expand Down
4 changes: 1 addition & 3 deletions test/benchmarks/sequencer/common/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

const (
sleepDuration = 5 * time.Second
minGasPriceInWei = 1000000000
minSuggestedGasPriceIntervalMinutes = 5
pollMinSuggestedGasPriceIntervalSeconds = 15
)
Expand Down Expand Up @@ -53,15 +52,14 @@ func Environment(ctx context.Context, b *testing.B) (*operations.Manager, *ethcl
st := opsman.State()
s, err := pgpoolstorage.NewPostgresPoolStorage(params.PoolDbConfig)
require.NoError(b, err)
minGasPriceWei := big.NewInt(minGasPriceInWei)
config := pool.Config{
FreeClaimGasLimit: 1000000, //nolint:gomnd
DB: params.PoolDbConfig,
MinSuggestedGasPriceInterval: types.NewDuration(minSuggestedGasPriceIntervalMinutes * time.Minute),
PollMinSuggestedGasPriceInterval: types.NewDuration(pollMinSuggestedGasPriceIntervalSeconds * time.Second),
}

pl := pool.NewPool(config, s, st, common.Address{}, params.ChainID, minGasPriceWei.Uint64())
pl := pool.NewPool(config, s, st, common.Address{}, params.ChainID)

// Print Info before send
senderBalance, err := client.BalanceAt(ctx, auth.From, nil)
Expand Down
Loading

0 comments on commit 1c12b20

Please sign in to comment.