Skip to content

Commit

Permalink
feat(taiko-client): introduce TxMgrSelector for proposer / prover (#…
Browse files Browse the repository at this point in the history
…17986)

Co-authored-by: maskpp <[email protected]>
  • Loading branch information
davidtaikocha and mask-pp authored Aug 27, 2024
1 parent 28d9072 commit 6eb298f
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 83 deletions.
16 changes: 0 additions & 16 deletions packages/taiko-client/internal/utils/test_utils.go

This file was deleted.

62 changes: 62 additions & 0 deletions packages/taiko-client/internal/utils/txmgr_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package utils

import (
"time"

"github.com/ethereum-optimism/optimism/op-service/txmgr"
)

var (
defaultPrivateTxMgrRetryInterval = 5 * time.Minute
)

// TxMgrSelector is responsible for selecting the correct transaction manager,
// it will choose the transaction manager for a private mempool if it is available and works well,
// otherwise it will choose the normal transaction manager.
type TxMgrSelector struct {
txMgr *txmgr.SimpleTxManager
privateTxMgr *txmgr.SimpleTxManager
privateTxMgrFailedAt *time.Time
privateTxMgrRetryInterval time.Duration
}

// NewTxMgrSelector creates a new TxMgrSelector instance.
func NewTxMgrSelector(
txMgr *txmgr.SimpleTxManager,
privateTxMgr *txmgr.SimpleTxManager,
privateTxMgrRetryInterval *time.Duration,
) *TxMgrSelector {
retryInterval := defaultPrivateTxMgrRetryInterval
if privateTxMgrRetryInterval != nil {
retryInterval = *privateTxMgrRetryInterval
}

return &TxMgrSelector{
txMgr: txMgr,
privateTxMgr: privateTxMgr,
privateTxMgrFailedAt: nil,
privateTxMgrRetryInterval: retryInterval,
}
}

// Select selects a transaction manager based on the current state.
func (s *TxMgrSelector) Select() (*txmgr.SimpleTxManager, bool) {
// If there is no private transaction manager, return the normal transaction manager.
if s.privateTxMgr == nil {
return s.txMgr, false
}

// If the private transaction manager has failed, check if it is time to retry.
if s.privateTxMgrFailedAt == nil || time.Now().After(s.privateTxMgrFailedAt.Add(s.privateTxMgrRetryInterval)) {
return s.privateTxMgr, true
}

// Otherwise, return the normal transaction manager.
return s.txMgr, false
}

// RecordPrivateTxMgrFailed records the time when the private transaction manager has failed.
func (s *TxMgrSelector) RecordPrivateTxMgrFailed() {
now := time.Now()
s.privateTxMgrFailedAt = &now
}
29 changes: 29 additions & 0 deletions packages/taiko-client/internal/utils/txmgr_selector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package utils

import (
"testing"

"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/stretchr/testify/require"
)

var (
testTxMgr = &txmgr.SimpleTxManager{}
testSelector = NewTxMgrSelector(testTxMgr, nil, nil)
)

func TestNewTxMgrSelector(t *testing.T) {
require.Equal(t, defaultPrivateTxMgrRetryInterval, testSelector.privateTxMgrRetryInterval)
}

func TestSelect(t *testing.T) {
txMgr, isPrivate := testSelector.Select()
require.NotNil(t, txMgr)
require.False(t, isPrivate)
}

func TestRecordPrivateTxMgrFailed(t *testing.T) {
require.Nil(t, testSelector.privateTxMgrFailedAt)
testSelector.RecordPrivateTxMgrFailed()
require.NotNil(t, testSelector.privateTxMgrFailedAt)
}
50 changes: 16 additions & 34 deletions packages/taiko-client/proposer/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ type Proposer struct {
lastProposedAt time.Time
totalEpochs uint64

txmgr *txmgr.SimpleTxManager
privateTxmgr *txmgr.SimpleTxManager
txmgrSelector *utils.TxMgrSelector

ctx context.Context
wg sync.WaitGroup
Expand Down Expand Up @@ -94,10 +93,8 @@ func (p *Proposer) InitFromConfig(
return err
}

if txMgr != nil {
p.txmgr = txMgr
} else {
if p.txmgr, err = txmgr.NewSimpleTxManager(
if txMgr == nil {
if txMgr, err = txmgr.NewSimpleTxManager(
"proposer",
log.Root(),
&metrics.TxMgrMetrics,
Expand All @@ -106,22 +103,8 @@ func (p *Proposer) InitFromConfig(
return err
}
}
if privateTxMgr != nil {
p.privateTxmgr = privateTxMgr
} else {
if cfg.PrivateTxmgrConfigs != nil && len(cfg.PrivateTxmgrConfigs.L1RPCURL) > 0 {
if p.privateTxmgr, err = txmgr.NewSimpleTxManager(
"privateTxProposer",
log.Root(),
&metrics.TxMgrMetrics,
*cfg.PrivateTxmgrConfigs,
); err != nil {
return err
}
} else {
p.privateTxmgr = nil
}
}

p.txmgrSelector = utils.NewTxMgrSelector(txMgr, privateTxMgr, nil)

chainConfig := config.NewChainConfig(p.protocolConfigs)
p.chainConfig = chainConfig
Expand Down Expand Up @@ -410,20 +393,19 @@ func (p *Proposer) updateProposingTicker() {
p.proposingTimer = time.NewTimer(duration)
}

// sendTx is the internal function to send a transaction with a selected tx manager.
func (p *Proposer) sendTx(ctx context.Context, txCandidate *txmgr.TxCandidate) error {
if p.privateTxmgr != nil {
receipt, err := p.privateTxmgr.Send(ctx, *txCandidate)
if err != nil || receipt.Status != types.ReceiptStatusSuccessful {
log.Warn("Failed to send TaikoL1.proposeBlock transaction by private tx manager",
"error", encoding.TryParsingCustomError(err),
)
} else {
return nil
}
}
receipt, err := p.txmgr.Send(ctx, *txCandidate)
txMgr, isPrivate := p.txmgrSelector.Select()
receipt, err := txMgr.Send(ctx, *txCandidate)
if err != nil {
log.Warn("Failed to send TaikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err))
log.Warn(
"Failed to send TaikoL1.proposeBlock / TaikoL1.proposeBlockV2 transaction by tx manager",
"isPrivateMempool", isPrivate,
"error", encoding.TryParsingCustomError(err),
)
if isPrivate {
p.txmgrSelector.RecordPrivateTxMgrFailed()
}
return err
}

Expand Down
3 changes: 2 additions & 1 deletion packages/taiko-client/proposer/proposer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ func (s *ProposerTestSuite) TestProposeTxLists() {
}

for _, txCandidate := range txCandidates {
receipt, err := p.txmgr.Send(ctx, txCandidate)
txMgr, _ := p.txmgrSelector.Select()
receipt, err := txMgr.Send(ctx, txCandidate)
s.Nil(err)
s.Nil(encoding.TryParsingCustomErrorFromReceipt(ctx, p.rpc.L1, p.proposerAddress, receipt))
}
Expand Down
53 changes: 21 additions & 32 deletions packages/taiko-client/prover/proof_submitter/transaction/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ import (

"github.com/taikoxyz/taiko-mono/packages/taiko-client/bindings/encoding"
"github.com/taikoxyz/taiko-mono/packages/taiko-client/internal/metrics"
"github.com/taikoxyz/taiko-mono/packages/taiko-client/internal/utils"
"github.com/taikoxyz/taiko-mono/packages/taiko-client/pkg/rpc"
producer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer"
)

// Sender is responsible for sending proof submission transactions with a backoff policy.
type Sender struct {
rpc *rpc.Client
txmgr *txmgr.SimpleTxManager
privateTxmgr *txmgr.SimpleTxManager
txmgrSelector *utils.TxMgrSelector
proverSetAddress common.Address
gasLimit uint64
}
Expand All @@ -37,8 +37,7 @@ func NewSender(
) *Sender {
return &Sender{
rpc: cli,
txmgr: txmgr,
privateTxmgr: privateTxmgr,
txmgrSelector: utils.NewTxMgrSelector(txmgr, privateTxmgr, nil),
proverSetAddress: proverSetAddress,
gasLimit: gasLimit,
}
Expand Down Expand Up @@ -78,38 +77,28 @@ func (s *Sender) Send(
}

// Send the transaction.
var (
next = true
receipt *types.Receipt
)
if s.privateTxmgr != nil {
receipt, err = s.privateTxmgr.Send(ctx, *txCandidate)
if err != nil || receipt.Status != types.ReceiptStatusSuccessful {
log.Warn("Failed to send transaction by private tx manager in sender",
"error", encoding.TryParsingCustomError(err),
)
} else {
next = false
txMgr, isPrivate := s.txmgrSelector.Select()
receipt, err := txMgr.Send(ctx, *txCandidate)
if err != nil {
if isPrivate {
s.txmgrSelector.RecordPrivateTxMgrFailed()
}
return encoding.TryParsingCustomError(err)
}
if next {
receipt, err = s.txmgr.Send(ctx, *txCandidate)
if err != nil {
return encoding.TryParsingCustomError(err)
}

if receipt.Status != types.ReceiptStatusSuccessful {
log.Error(
"Failed to submit proof",
"blockID", proofWithHeader.BlockID,
"tier", proofWithHeader.Tier,
"txHash", receipt.TxHash,
"error", encoding.TryParsingCustomErrorFromReceipt(ctx, s.rpc.L1, s.txmgr.From(), receipt),
)
metrics.ProverSubmissionRevertedCounter.Add(1)
return ErrUnretryableSubmission
}
if receipt.Status != types.ReceiptStatusSuccessful {
log.Error(
"Failed to submit proof",
"blockID", proofWithHeader.BlockID,
"tier", proofWithHeader.Tier,
"txHash", receipt.TxHash,
"isPrivateMempool", isPrivate,
"error", encoding.TryParsingCustomErrorFromReceipt(ctx, s.rpc.L1, txMgr.From(), receipt),
)
metrics.ProverSubmissionRevertedCounter.Add(1)
return ErrUnretryableSubmission
}

log.Info(
"💰 Your block proof was accepted",
"blockID", proofWithHeader.BlockID,
Expand Down

0 comments on commit 6eb298f

Please sign in to comment.