Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(driver): optimize error handling for CalldataSyncer (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtaikocha committed Jun 6, 2023
1 parent e4b6a94 commit 352d4fd
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 41 deletions.
58 changes: 25 additions & 33 deletions driver/chain_syncer/calldata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *Syncer) onBlockProposed(
txListBytes = []byte{}
}

payloadData, rpcError, payloadError := s.insertNewHead(
payloadData, err := s.insertNewHead(
ctx,
event,
parent,
Expand All @@ -227,16 +227,8 @@ func (s *Syncer) onBlockProposed(
l1Origin,
)

// RPC errors are recoverable.
if rpcError != nil {
return fmt.Errorf("failed to insert new head to L2 execution engine: %w", rpcError)
}

if payloadError != nil {
log.Warn(
"Ignore invalid block context", "blockID", event.Id, "payloadError", payloadError, "payloadData", payloadData,
)
return nil
if err != nil {
return fmt.Errorf("failed to insert new head to L2 execution engine: %w", err)
}

log.Debug("Payload data", "hash", payloadData.BlockHash, "txs", len(payloadData.Transactions))
Expand Down Expand Up @@ -272,7 +264,7 @@ func (s *Syncer) insertNewHead(
headBlockID *big.Int,
txListBytes []byte,
l1Origin *rawdb.L1Origin,
) (*engine.ExecutableData, error, error) {
) (*engine.ExecutableData, error) {
log.Debug(
"Try to insert a new L2 head block",
"parentNumber", parent.Number,
Expand All @@ -285,14 +277,14 @@ func (s *Syncer) insertNewHead(
var txList []*types.Transaction
if len(txListBytes) != 0 {
if err := rlp.DecodeBytes(txListBytes, &txList); err != nil {
log.Info("Ignore invalid txList bytes", "blockID", event.Id)
return nil, nil, err
log.Error("Invalid txList bytes", "blockID", event.Id)
return nil, err
}
}

parentTimestamp, err := s.rpc.TaikoL2.ParentTimestamp(&bind.CallOpts{BlockNumber: parent.Number})
if err != nil {
return nil, nil, err
return nil, err
}

// Get L2 baseFee
Expand All @@ -303,7 +295,7 @@ func (s *Syncer) insertNewHead(
parent.GasUsed,
)
if err != nil {
return nil, nil, fmt.Errorf("failed to get L2 baseFee: %w", encoding.TryParsingCustomError(err))
return nil, fmt.Errorf("failed to get L2 baseFee: %w", encoding.TryParsingCustomError(err))
}

log.Debug(
Expand All @@ -330,17 +322,17 @@ func (s *Syncer) insertNewHead(
parent.GasUsed,
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create TaikoL2.anchor transaction: %w", err)
return nil, fmt.Errorf("failed to create TaikoL2.anchor transaction: %w", err)
}

txList = append([]*types.Transaction{anchorTx}, txList...)

if txListBytes, err = rlp.EncodeToBytes(txList); err != nil {
log.Warn("Encode txList error", "blockID", event.Id, "error", err)
return nil, nil, err
log.Error("Encode txList error", "blockID", event.Id, "error", err)
return nil, err
}

payload, rpcErr, payloadErr := s.createExecutionPayloads(
payload, err := s.createExecutionPayloads(
ctx,
event,
parent.Hash(),
Expand All @@ -351,8 +343,8 @@ func (s *Syncer) insertNewHead(
withdrawals,
)

if rpcErr != nil || payloadErr != nil {
return nil, rpcErr, payloadErr
if err != nil {
return nil, fmt.Errorf("failed to create execution payloads: %w", err)
}

fc := &engine.ForkchoiceStateV1{HeadBlockHash: parent.Hash()}
Expand All @@ -361,13 +353,13 @@ func (s *Syncer) insertNewHead(
fc.HeadBlockHash = payload.BlockHash
fcRes, err := s.rpc.L2Engine.ForkchoiceUpdate(ctx, fc, nil)
if err != nil {
return nil, err, nil
return nil, err
}
if fcRes.PayloadStatus.Status != engine.VALID {
return nil, nil, fmt.Errorf("unexpected ForkchoiceUpdate response status: %s", fcRes.PayloadStatus.Status)
return nil, fmt.Errorf("unexpected ForkchoiceUpdate response status: %s", fcRes.PayloadStatus.Status)
}

return payload, nil, nil
return payload, nil
}

// createExecutionPayloads creates a new execution payloads through
Expand All @@ -381,7 +373,7 @@ func (s *Syncer) createExecutionPayloads(
txListBytes []byte,
baseFeee *big.Int,
withdrawals types.Withdrawals,
) (payloadData *engine.ExecutableData, rpcError error, payloadError error) {
) (payloadData *engine.ExecutableData, err error) {
fc := &engine.ForkchoiceStateV1{HeadBlockHash: parentHash}
attributes := &engine.PayloadAttributes{
Timestamp: event.Meta.Timestamp,
Expand All @@ -406,31 +398,31 @@ func (s *Syncer) createExecutionPayloads(
// Step 1, prepare a payload
fcRes, err := s.rpc.L2Engine.ForkchoiceUpdate(ctx, fc, attributes)
if err != nil {
return nil, err, nil
return nil, fmt.Errorf("failed to update fork choice: %w", err)
}
if fcRes.PayloadStatus.Status != engine.VALID {
return nil, nil, fmt.Errorf("unexpected ForkchoiceUpdate response status: %s", fcRes.PayloadStatus.Status)
return nil, fmt.Errorf("unexpected ForkchoiceUpdate response status: %s", fcRes.PayloadStatus.Status)
}
if fcRes.PayloadID == nil {
return nil, nil, errors.New("empty payload ID")
return nil, errors.New("empty payload ID")
}

// Step 2, get the payload
payload, err := s.rpc.L2Engine.GetPayload(ctx, fcRes.PayloadID)
if err != nil {
return nil, err, nil
return nil, fmt.Errorf("failed to get payload: %w", err)
}

log.Debug("Payload", "payload", payload)

// Step 3, execute the payload
execStatus, err := s.rpc.L2Engine.NewPayload(ctx, payload)
if err != nil {
return nil, err, nil
return nil, fmt.Errorf("failed to create a new payload: %w", err)
}
if execStatus.Status != engine.VALID {
return nil, nil, fmt.Errorf("unexpected NewPayload response status: %s", execStatus.Status)
return nil, fmt.Errorf("unexpected NewPayload response status: %s", execStatus.Status)
}

return payload, nil, nil
return payload, nil
}
5 changes: 2 additions & 3 deletions driver/chain_syncer/calldata/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *CalldataSyncerTestSuite) TestInsertNewHead() {
s.Nil(err)
l1Head, err := s.s.rpc.L1.BlockByNumber(context.Background(), nil)
s.Nil(err)
_, rpcErr, payloadErr := s.s.insertNewHead(
_, err = s.s.insertNewHead(
context.Background(),
&bindings.TaikoL1ClientBlockProposed{
Id: common.Big1,
Expand All @@ -99,8 +99,7 @@ func (s *CalldataSyncerTestSuite) TestInsertNewHead() {
L1BlockHash: testutils.RandomHash(),
},
)
s.Nil(rpcErr)
s.Nil(payloadErr)
s.Nil(err)
}

func (s *CalldataSyncerTestSuite) TestTreasuryIncomeAllAnchors() {
Expand Down
4 changes: 2 additions & 2 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (d *Driver) Close() {
// eventLoop starts the main loop of a L2 execution engine's driver.
func (d *Driver) eventLoop() {
defer d.wg.Done()
exponentialBackoff := backoff.NewExponentialBackOff()
constatnBackoff := backoff.NewConstantBackOff(12 * time.Second)

// reqSync requests performing a synchronising operation, won't block
// if we are already synchronising.
Expand All @@ -129,7 +129,7 @@ func (d *Driver) eventLoop() {

// doSyncWithBackoff performs a synchronising operation with a backoff strategy.
doSyncWithBackoff := func() {
if err := backoff.Retry(d.doSync, exponentialBackoff); err != nil {
if err := backoff.Retry(d.doSync, constatnBackoff); err != nil {
log.Error("Sync L2 execution engine's block chain error", "error", err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/chain_iterator/block_batch_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"math/big"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -163,7 +164,7 @@ func (i *BlockBatchIterator) Iter() error {
return nil
}

if err := backoff.Retry(iterOp, backoff.NewExponentialBackOff()); err != nil {
if err := backoff.Retry(iterOp, backoff.NewConstantBackOff(12*time.Second)); err != nil {
return err
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/rpc/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"fmt"
"net/url"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prysmaticlabs/prysm/v4/network"
"github.com/prysmaticlabs/prysm/v4/network/authorization"
Expand All @@ -19,9 +21,10 @@ func DialClientWithBackoff(ctx context.Context, url string) (*ethclient.Client,
if err := backoff.Retry(
func() (err error) {
client, err = ethclient.DialContext(ctx, url)
log.Error("Dial ethclient error", "url", url, "error", err)
return err
},
backoff.NewExponentialBackOff(),
backoff.NewConstantBackOff(12*time.Second),
); err != nil {
return nil, err
}
Expand All @@ -37,13 +40,14 @@ func DialEngineClientWithBackoff(ctx context.Context, url string, jwtSecret stri
func() (err error) {
client, err := DialEngineClient(ctx, url, jwtSecret)
if err != nil {
log.Error("Dial engine client error", "url", url, "error", err)
return err
}

engineClient = &EngineClient{client}
return nil
},
backoff.NewExponentialBackOff(),
backoff.NewConstantBackOff(12*time.Second),
); err != nil {
return nil, err
}
Expand Down

0 comments on commit 352d4fd

Please sign in to comment.