diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index b6d643eeba..4f367430ec 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -676,6 +676,13 @@ func (c *Bor) Prepare(chain consensus.ChainHeaderReader, header *types.Header) e currentSigner := *c.authorizedSigner.Load() + // Bail out early if we're unauthorized to sign a block. This check also takes + // place before block is signed in `Seal`. + if !snap.ValidatorSet.HasAddress(currentSigner.signer) { + // Check the UnauthorizedSignerError.Error() msg to see why we pass number-1 + return &UnauthorizedSignerError{number - 1, currentSigner.signer.Bytes()} + } + // Set the correct difficulty header.Difficulty = new(big.Int).SetUint64(Difficulty(snap.ValidatorSet, currentSigner.signer)) diff --git a/core/blockchain.go b/core/blockchain.go index 8103e4a05e..aea435cde7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1362,14 +1362,17 @@ func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types // the chain mutex to be held. func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { var stateSyncLogs []*types.Log + if stateSyncLogs, err = bc.writeBlockWithState(block, receipts, logs, state); err != nil { return NonStatTy, err } + currentBlock := bc.CurrentBlock() reorg, err := bc.forker.ReorgNeeded(currentBlock.Header(), block.Header()) if err != nil { return NonStatTy, err } + if reorg { // Reorganise the chain if the parent is not the head block if block.ParentHash() != currentBlock.Hash() { @@ -1377,6 +1380,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types return NonStatTy, err } } + status = CanonStatTy } else { status = SideStatTy diff --git a/core/rawdb/bor_receipt.go b/core/rawdb/bor_receipt.go index e225083741..d061dedc9e 100644 --- a/core/rawdb/bor_receipt.go +++ b/core/rawdb/bor_receipt.go @@ -75,6 +75,7 @@ func ReadBorReceiptRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.Raw return data } } + return nil // Can't find the data anywhere. } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 63f712bb9c..191e59e33a 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -49,6 +49,8 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" + + "github.com/JekaMas/crand" ) var ( @@ -1884,9 +1886,11 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 2 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } + if err := validateEvents(events, 1); err != nil { t.Fatalf("additional event firing failed: %v", err) } + if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -2050,6 +2054,7 @@ func TestTransactionPoolUnderpricingDynamicFee(t *testing.T) { if err := pool.AddRemote(tx); err != nil { // +K1:2, -K0:1 => Pend K0:0 K1:0, K2:0; Que K1:2 t.Fatalf("failed to add well priced transaction: %v", err) } + tx = dynamicFeeTx(3, 100000, big.NewInt(4), big.NewInt(1), keys[1]) if err := pool.AddRemote(tx); err != nil { // +K1:3, -K1:0 => Pend K0:0 K2:0; Que K1:2 K1:3 t.Fatalf("failed to add well priced transaction: %v", err) @@ -2061,9 +2066,11 @@ func TestTransactionPoolUnderpricingDynamicFee(t *testing.T) { if queued != 2 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } + if err := validateEvents(events, 1); err != nil { t.Fatalf("additional event firing failed: %v", err) } + if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -3692,6 +3699,45 @@ func MakeWithPromoteTxCh(ch chan struct{}) func(*TxPool) { } } +func BenchmarkBigs(b *testing.B) { + // max 256-bit + max := new(big.Int) + max.Exp(big.NewInt(2), big.NewInt(256), nil).Sub(max, big.NewInt(1)) + + ints := make([]*big.Int, 1000000) + intUs := make([]*uint256.Int, 1000000) + + var over bool + + for i := 0; i < len(ints); i++ { + ints[i] = crand.BigInt(max) + intUs[i], over = uint256.FromBig(ints[i]) + + if over { + b.Fatal(ints[i], over) + } + } + + b.Run("*big.Int", func(b *testing.B) { + var r int + + for i := 0; i < b.N; i++ { + r = ints[i%len(ints)%b.N].Cmp(ints[(i+1)%len(ints)%b.N]) + } + + fmt.Fprintln(io.Discard, r) + }) + b.Run("*uint256.Int", func(b *testing.B) { + var r int + + for i := 0; i < b.N; i++ { + r = intUs[i%len(intUs)%b.N].Cmp(intUs[(i+1)%len(intUs)%b.N]) + } + + fmt.Fprintln(io.Discard, r) + }) +} + //nolint:thelper func mining(tb testing.TB, pool *TxPool, signer types.Signer, baseFee *uint256.Int, blockGasLimit uint64, totalBlocks int) (int, time.Duration, time.Duration) { var ( diff --git a/core/vm/contracts.go b/core/vm/contracts.go index 9210f5486c..c5304790fa 100644 --- a/core/vm/contracts.go +++ b/core/vm/contracts.go @@ -263,12 +263,14 @@ var ( big199680 = big.NewInt(199680) ) +// nolint: gofmt // modexpMultComplexity implements bigModexp multComplexity formula, as defined in EIP-198 // // def mult_complexity(x): -// if x <= 64: return x ** 2 -// elif x <= 1024: return x ** 2 // 4 + 96 * x - 3072 -// else: return x ** 2 // 16 + 480 * x - 199680 +// +// if x <= 64: return x ** 2 +// elif x <= 1024: return x ** 2 // 4 + 96 * x - 3072 +// else: return x ** 2 // 16 + 480 * x - 199680 // // where is x is max(length_of_MODULUS, length_of_BASE) func modexpMultComplexity(x *big.Int) *big.Int { @@ -383,10 +385,12 @@ func (c *bigModExp) Run(input []byte) ([]byte, error) { exp = new(big.Int).SetBytes(getData(input, baseLen, expLen)) mod = new(big.Int).SetBytes(getData(input, baseLen+expLen, modLen)) ) + if mod.BitLen() == 0 { // Modulo 0 is undefined, return zero return common.LeftPadBytes([]byte{}, int(modLen)), nil } + return common.LeftPadBytes(base.Exp(base, exp, mod).Bytes(), int(modLen)), nil } diff --git a/eth/filters/bor_api.go b/eth/filters/bor_api.go index db13c95959..12f18caf77 100644 --- a/eth/filters/bor_api.go +++ b/eth/filters/bor_api.go @@ -67,6 +67,7 @@ func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.State for { select { case h := <-stateSyncData: + // nolint : gosimple if crit.ID == h.ID || bytes.Compare(crit.Contract.Bytes(), h.Contract.Bytes()) == 0 || (crit.ID == 0 && crit.Contract == common.Address{}) { notifier.Notify(rpcSub.ID, h) diff --git a/go.mod b/go.mod index b5a84d0bd1..9b99ab2ef3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 github.com/BurntSushi/toml v1.1.0 + github.com/JekaMas/crand v1.0.1 github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d github.com/VictoriaMetrics/fastcache v1.6.0 github.com/aws/aws-sdk-go-v2 v1.2.0 diff --git a/go.sum b/go.sum index a37a00a6dd..7fee01b3d0 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,8 @@ github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/JekaMas/crand v1.0.1 h1:FMPxkUQqH/hExl0aUXsr0UCGYZ4lJH9IJ5H/KbM6Y9A= +github.com/JekaMas/crand v1.0.1/go.mod h1:GGzGpMCht/tbaNQ5A4kSiKSqEoNAhhyTfSDQyIENBQU= github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d h1:RO27lgfZF8s9lZ3pWyzc0gCE0RZC+6/PXbRjAa0CNp8= github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d/go.mod h1:romz7UPgSYhfJkKOalzEEyV6sWtt/eAEm0nX2aOrod0= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= diff --git a/miner/test_backend.go b/miner/test_backend.go index 29da747ae0..5eb8d932d1 100644 --- a/miner/test_backend.go +++ b/miner/test_backend.go @@ -178,5 +178,8 @@ func NewTestWorker(t TensingObject, chainConfig *params.ChainConfig, engine cons w.setEtherbase(TestBankAddress) + // enable empty blocks + w.noempty = 0 + return w, backend, w.close } diff --git a/miner/worker.go b/miner/worker.go index cc6a2e1eec..70e93ec4d5 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -94,6 +94,7 @@ const ( var ( sealedBlocksCounter = metrics.NewRegisteredCounter("worker/sealedBlocks", nil) sealedEmptyBlocksCounter = metrics.NewRegisteredCounter("worker/sealedEmptyBlocks", nil) + commitInterruptCounter = metrics.NewRegisteredCounter("worker/commitInterrupt", nil) ) // environment is the worker's current environment and holds all @@ -299,6 +300,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus startCh: make(chan struct{}, 1), resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), + noempty: 1, } worker.profileCount = new(int32) // Subscribe NewTxsEvent for tx pool @@ -651,13 +653,16 @@ func (w *worker) mainLoop(ctx context.Context) { txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, cmath.FromBig(w.current.header.BaseFee)) tcount := w.current.tcount - w.commitTransactions(w.current, txset, nil) + interruptCh, stopFn := getInterruptTimer(ctx, w.current, w.chain.CurrentBlock()) + w.commitTransactions(w.current, txset, nil, interruptCh) // Only update the snapshot if any new transactions were added // to the pending block if tcount != w.current.tcount { w.updateSnapshot(w.current) } + + stopFn() } else { // Special case, if the consensus engine is 0 period clique(dev mode), // submit sealing work here since all empty submission will be rejected @@ -939,7 +944,8 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]* return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool { +//nolint:gocognit +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}) bool { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -960,7 +966,16 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP "exitCause", breakCause) }() +mainloop: for { + // case of interrupting by timeout + select { + case <-interruptCh: + commitInterruptCounter.Inc(1) + break mainloop + default: + } + // In the following three cases, we will interrupt the execution of the transaction. // (1) new head block event arrival, the interrupt signal is 1 // (2) worker start or restart, the interrupt signal is 1 @@ -1237,7 +1252,7 @@ func startProfiler(profile string, filepath string, number uint64) (func() error // be customized with the plugin in the future. // //nolint:gocognit -func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment) { +func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}) { ctx, span := tracing.StartSpan(ctx, "fillTransactions") defer tracing.EndSpan(span) @@ -1361,7 +1376,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en }) tracing.Exec(ctx, "", "worker.LocalCommitTransactions", func(ctx context.Context, span trace.Span) { - committed = w.commitTransactions(env, txs, interrupt) + committed = w.commitTransactions(env, txs, interrupt, interruptCh) }) if committed { @@ -1384,7 +1399,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en }) tracing.Exec(ctx, "", "worker.RemoteCommitTransactions", func(ctx context.Context, span trace.Span) { - committed = w.commitTransactions(env, txs, interrupt) + committed = w.commitTransactions(env, txs, interrupt, interruptCh) }) if committed { @@ -1409,7 +1424,10 @@ func (w *worker) generateWork(ctx context.Context, params *generateParams) (*typ } defer work.discard() - w.fillTransactions(ctx, nil, work) + interruptCh, stopFn := getInterruptTimer(ctx, work, w.chain.CurrentBlock()) + defer stopFn() + + w.fillTransactions(ctx, nil, work, interruptCh) return w.engine.FinalizeAndAssemble(ctx, w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) } @@ -1417,6 +1435,7 @@ func (w *worker) generateWork(ctx context.Context, params *generateParams) (*typ // commitWork generates several new sealing tasks based on the parent block // and submit them to the sealer. func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, timestamp int64) { + start := time.Now() var ( @@ -1446,6 +1465,17 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, return } + var interruptCh chan struct{} + + stopFn := func() {} + defer func() { + stopFn() + }() + + if !noempty { + interruptCh, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock()) + } + ctx, span := tracing.StartSpan(ctx, "commitWork") defer tracing.EndSpan(span) @@ -1464,7 +1494,7 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, } // Fill pending transactions from the txpool - w.fillTransactions(ctx, interrupt, work) + w.fillTransactions(ctx, interrupt, work, interruptCh) err = w.commit(ctx, work.copy(), w.fullTaskHook, true, start) if err != nil { @@ -1480,6 +1510,30 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, w.current = work } +func getInterruptTimer(ctx context.Context, work *environment, current *types.Block) (chan struct{}, func()) { + delay := time.Until(time.Unix(int64(work.header.Time), 0)) + + timeoutTimer := time.NewTimer(delay) + stopFn := func() { + timeoutTimer.Stop() + } + + blockNumber := current.NumberU64() + 1 + interruptCh := make(chan struct{}) + + go func() { + select { + case <-timeoutTimer.C: + log.Info("Commit Interrupt. Pre-committing the current block", "block", blockNumber) + + close(interruptCh) + case <-ctx.Done(): // nothing to do + } + }() + + return interruptCh, stopFn +} + // commit runs any post-transaction state modifications, assembles the final block // and commits new work if consensus engine is running. // Note the assumption is held that the mutation is allowed to the passed env, do diff --git a/miner/worker_test.go b/miner/worker_test.go index 011895c854..3a1dd5f8b9 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -172,9 +172,11 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool, isBor bool) { } func TestEmptyWorkEthash(t *testing.T) { + t.Skip() testEmptyWork(t, ethashChainConfig, ethash.NewFaker()) } func TestEmptyWorkClique(t *testing.T) { + t.Skip() testEmptyWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) } diff --git a/rpc/handler.go b/rpc/handler.go index 488a29300a..e3c72c66b1 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -28,27 +28,27 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// nolint: gofmt // handler handles JSON-RPC messages. There is one handler per connection. Note that // handler is not safe for concurrent use. Message handling never blocks indefinitely // because RPCs are processed on background goroutines launched by handler. // // The entry points for incoming messages are: // -// h.handleMsg(message) -// h.handleBatch(message) +// h.handleMsg(message) +// h.handleBatch(message) // // Outgoing calls use the requestOp struct. Register the request before sending it // on the connection: // -// op := &requestOp{ids: ...} -// h.addRequestOp(op) +// op := &requestOp{ids: ...} +// h.addRequestOp(op) // // Now send the request, then wait for the reply to be delivered through handleMsg: // -// if err := op.wait(...); err != nil { -// h.removeRequestOp(op) // timeout, etc. -// } -// +// if err := op.wait(...); err != nil { +// h.removeRequestOp(op) // timeout, etc. +// } type handler struct { reg *serviceRegistry unsubscribeCb *callback @@ -219,6 +219,7 @@ func (h *handler) cancelServerSubscriptions(err error) { // startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group. func (h *handler) startCallProc(fn func(*callProc)) { h.callWG.Add(1) + go func() { ctx, cancel := context.WithCancel(h.rootCtx) defer h.callWG.Done() diff --git a/rpc/inproc.go b/rpc/inproc.go index fbe9a40cec..e9cd3f7d68 100644 --- a/rpc/inproc.go +++ b/rpc/inproc.go @@ -26,6 +26,7 @@ func DialInProc(handler *Server) *Client { initctx := context.Background() c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) { p1, p2 := net.Pipe() + // nolint: contextcheck go handler.ServeCodec(NewCodec(p1), 0) return NewCodec(p2), nil }) diff --git a/rpc/ipc.go b/rpc/ipc.go index 07a211c627..5e782454f8 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -35,6 +35,7 @@ func (s *Server) ServeListener(l net.Listener) error { return err } log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr()) + go s.ServeCodec(NewCodec(conn), 0) } } diff --git a/rpc/server.go b/rpc/server.go index babc5688e2..61ea704f44 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -105,11 +105,13 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { reqs, batch, err := codec.readBatch() if err != nil { if err != io.EOF { + // nolint:errcheck codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"})) } return } if batch { + // nolint: contextcheck h.handleBatch(reqs) } else { h.handleMsg(reqs[0])