Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Isolate transaction broadcasting latency from regas logic #465

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type TimeoutConfig struct {
ChainWriteTimeout time.Duration
ChainStateTimeout time.Duration
FireblocksAPITimeout time.Duration
TxnBroadcastTimeout time.Duration
}

type Config struct {
Expand Down
9 changes: 5 additions & 4 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.
FinalizationBlockDelay: finalizationBlockDelay,
}
timeoutConfig := bat.TimeoutConfig{
EncodingTimeout: 10 * time.Second,
AttestationTimeout: 10 * time.Second,
ChainReadTimeout: 10 * time.Second,
ChainWriteTimeout: 10 * time.Second,
EncodingTimeout: 10 * time.Second,
AttestationTimeout: 10 * time.Second,
ChainReadTimeout: 10 * time.Second,
ChainWriteTimeout: 10 * time.Second,
TxnBroadcastTimeout: 10 * time.Second,
}

metrics := bat.NewMetrics("9100", logger)
Expand Down
9 changes: 5 additions & 4 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type EncodingStreamerMetrics struct {
}

type TxnManagerMetrics struct {
Latency prometheus.Summary
Latency *prometheus.SummaryVec
GasUsed prometheus.Gauge
SpeedUps prometheus.Gauge
TxQueue prometheus.Gauge
Expand Down Expand Up @@ -89,13 +89,14 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
}

txnManagerMetrics := TxnManagerMetrics{
Latency: promauto.With(reg).NewSummary(
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "txn_manager_latency_ms",
Help: "transaction confirmation latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"stage"},
),
GasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -313,8 +314,8 @@ func (e *EncodingStreamerMetrics) UpdateEncodedBlobs(count int, size uint64) {
e.EncodedBlobs.WithLabelValues("number").Set(float64(count))
}

func (t *TxnManagerMetrics) ObserveLatency(latencyMs float64) {
t.Latency.Observe(latencyMs)
func (t *TxnManagerMetrics) ObserveLatency(stage string, latencyMs float64) {
t.Latency.WithLabelValues(stage).Observe(latencyMs)
}

func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) {
Expand Down
142 changes: 104 additions & 38 deletions disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ var (
gasPricePercentageMultiplier = big.NewInt(10)
hundred = big.NewInt(100)
maxSendTransactionRetry = 3
queryTickerDuration = 3 * time.Second
ErrTransactionNotBroadcasted = errors.New("transaction not broadcasted")
)

// TxnManager receives transactions from the caller, sends them to the chain, and monitors their status.
Expand All @@ -36,7 +38,8 @@ type TxnManager interface {

type transaction struct {
*types.Transaction
TxID walletsdk.TxID
TxID walletsdk.TxID
requestedAt time.Time
}

type TxnRequest struct {
Expand Down Expand Up @@ -70,25 +73,27 @@ type txnManager struct {
requestChan chan *TxnRequest
logger logging.Logger

receiptChan chan *ReceiptOrErr
queueSize int
txnRefreshInterval time.Duration
metrics *TxnManagerMetrics
receiptChan chan *ReceiptOrErr
queueSize int
txnBroadcastTimeout time.Duration
txnRefreshInterval time.Duration
metrics *TxnManagerMetrics
}

var _ TxnManager = (*txnManager)(nil)

func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnBroadcastTimeout time.Duration, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
return &txnManager{
ethClient: ethClient,
wallet: wallet,
numConfirmations: numConfirmations,
requestChan: make(chan *TxnRequest, queueSize),
logger: logger.With("component", "TxnManager"),
receiptChan: make(chan *ReceiptOrErr, queueSize),
queueSize: queueSize,
txnRefreshInterval: txnRefreshInterval,
metrics: metrics,
ethClient: ethClient,
wallet: wallet,
numConfirmations: numConfirmations,
requestChan: make(chan *TxnRequest, queueSize),
logger: logger.With("component", "TxnManager"),
receiptChan: make(chan *ReceiptOrErr, queueSize),
queueSize: queueSize,
txnBroadcastTimeout: txnBroadcastTimeout,
txnRefreshInterval: txnRefreshInterval,
metrics: metrics,
}
}

Expand Down Expand Up @@ -128,7 +133,7 @@ func (t *txnManager) Start(ctx context.Context) {
t.metrics.UpdateGasUsed(receipt.GasUsed)
}
}
t.metrics.ObserveLatency(float64(time.Since(req.requestedAt).Milliseconds()))
t.metrics.ObserveLatency("total", float64(time.Since(req.requestedAt).Milliseconds()))
}
}
}()
Expand All @@ -141,7 +146,7 @@ func (t *txnManager) Start(ctx context.Context) {
func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) error {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Debug("new transaction", "component", "TxnManager", "method", "ProcessTransaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())
t.logger.Debug("new transaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())

var txn *types.Transaction
var txID walletsdk.TxID
Expand All @@ -164,13 +169,13 @@ func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er
didTimeout = urlErr.Timeout()
}
if didTimeout || errors.Is(err, context.DeadlineExceeded) {
t.logger.Warn("failed to send txn due to timeout", "tag", req.Tag, "hash", req.Tx.Hash().Hex(), "numRetries", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.logger.Warn("failed to send txn due to timeout", "tag", req.Tag, "hash", txn.Hash().Hex(), "numRetries", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
retryFromFailure++
continue
} else if err != nil {
return fmt.Errorf("failed to send txn (%s) %s: %w", req.Tag, req.Tx.Hash().Hex(), err)
return fmt.Errorf("failed to send txn (%s) %s: %w", req.Tag, txn.Hash().Hex(), err)
} else {
t.logger.Debug("successfully sent txn", "component", "TxnManager", "method", "ProcessTransaction", "tag", req.Tag, "txID", txID, "txHash", txn.Hash().Hex())
t.logger.Debug("successfully sent txn", "tag", req.Tag, "txID", txID, "txHash", txn.Hash().Hex())
break
}
}
Expand All @@ -183,6 +188,7 @@ func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er
req.txAttempts = append(req.txAttempts, &transaction{
TxID: txID,
Transaction: txn,
requestedAt: time.Now(),
})

t.requestChan <- req
Expand All @@ -194,8 +200,31 @@ func (t *txnManager) ReceiptChan() chan *ReceiptOrErr {
return t.receiptChan
}

// ensureAnyTransactionBroadcasted waits until all given transactions are broadcasted to the network.
func (t *txnManager) ensureAnyTransactionBroadcasted(ctx context.Context, txs []*transaction) error {
queryTicker := time.NewTicker(queryTickerDuration)
defer queryTicker.Stop()

for {
for _, tx := range txs {
_, err := t.wallet.GetTransactionReceipt(ctx, tx.TxID)
if err == nil || errors.Is(err, walletsdk.ErrReceiptNotYetAvailable) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking error types seems like it's a bit brittle. I guess there's no other option here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change the implementation in eigensdk, but I think this signature should be kept.
One thing we can do is make the error type more expressive and return Fireblocks status as part of the error, but not sure if that will be less brittle

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment: "Before the transaction is broadcast, this wallet method will return error of type walletsdk.Whatever. Once the transaction has been broadcast, it will either successfully return the receipt or return an error of type walletsdk.ErrReceiptNotYetAvailable."

t.metrics.ObserveLatency("broadcasted", float64(time.Since(tx.requestedAt).Milliseconds()))
return nil
}
}

// Wait for the next round.
select {
case <-ctx.Done():
return ctx.Err()
case <-queryTicker.C:
}
}
}

func (t *txnManager) ensureAnyTransactionEvaled(ctx context.Context, txs []*transaction) (*types.Receipt, error) {
queryTicker := time.NewTicker(3 * time.Second)
queryTicker := time.NewTicker(queryTickerDuration)
defer queryTicker.Stop()
var receipt *types.Receipt
var err error
Expand All @@ -212,25 +241,32 @@ func (t *txnManager) ensureAnyTransactionEvaled(ctx context.Context, txs []*tran
chainTip, err := t.ethClient.BlockNumber(ctx)
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(t.numConfirmations) > chainTip {
t.logger.Debug("transaction has been mined but don't have enough confirmations at current chain tip", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", t.numConfirmations, "chainTip", chainTip)
t.logger.Debug("transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", t.numConfirmations, "chainTip", chainTip)
break
} else {
return receipt, nil
}
} else {
t.logger.Debug("failed to get chain tip while waiting for transaction to mine", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "err", err)
t.logger.Debug("failed to get chain tip while waiting for transaction to mine", "err", err)
}
}

if errors.Is(err, ethereum.NotFound) || errors.Is(err, walletsdk.ErrReceiptNotYetAvailable) {
t.logger.Debug("Transaction not yet mined", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
t.logger.Debug("Transaction not yet mined", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
} else if errors.Is(err, walletsdk.ErrTransactionFailed) {
t.logger.Debug("Transaction failed", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
t.logger.Debug("Transaction failed", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
delete(txnsToQuery, txID)
} else if err != nil {
t.logger.Debug("Transaction receipt retrieval failed", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "err", err)
} else if errors.Is(err, walletsdk.ErrNotYetBroadcasted) {
t.logger.Error("Transaction has not been broadcasted to network but attempted to retrieve receipt", "err", err)
} else {
t.logger.Debug("Transaction receipt retrieval failed", "err", err)
}
}

if len(txnsToQuery) == 0 {
return nil, fmt.Errorf("all transactions failed")
}

// Wait for the next round.
select {
case <-ctx.Done():
Expand All @@ -251,10 +287,40 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
var err error

rpcCallAttempt := func() error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancel()
t.logger.Debug("monitoring transaction", "component", "TxnManager", "method", "monitorTransaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())
t.logger.Debug("monitoring transaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())

ctxWithTimeout, cancelBroadcastTimeout := context.WithTimeout(ctx, t.txnBroadcastTimeout)
defer cancelBroadcastTimeout()

// Ensure transactions are broadcasted to the network before querying the receipt.
// This is to avoid querying the receipt of a transaction that hasn't been broadcasted yet.
// For example, when Fireblocks wallet is used, there may be delays in broadcasting the transaction due to latency from cosigning and MPC operations.
err = t.ensureAnyTransactionBroadcasted(ctxWithTimeout, req.txAttempts)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't retry here because there wouldn't be any advantage over just waiting longer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah as explained in this comment

if err != nil && errors.Is(err, context.DeadlineExceeded) {
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
t.logger.Warn("transaction not broadcasted within timeout", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
fireblocksWallet, ok := t.wallet.(interface {
CancelTransactionBroadcast(ctx context.Context, txID walletsdk.TxID) (bool, error)
})
if ok {
// Consider these transactions failed as they haven't been broadcasted within timeout.
// Cancel these transactions to avoid blocking the next transactions.
for _, tx := range req.txAttempts {
cancelled, err := fireblocksWallet.CancelTransactionBroadcast(ctx, tx.TxID)
if err != nil {
t.logger.Warn("failed to cancel Fireblocks transaction broadcast", "txID", tx.TxID, "err", err)
} else if cancelled {
t.logger.Info("cancelled Fireblocks transaction broadcast because it didn't get broadcasted within timeout", "txID", tx.TxID, "timeout", t.txnBroadcastTimeout.String())
}
}
}
return ErrTransactionNotBroadcasted
} else if err != nil {
t.logger.Error("unexpected error while waiting for Fireblocks transaction to broadcast", "txHash", req.Tx.Hash().Hex(), "err", err)
return err
}

ctxWithTimeout, cancelEvaluationTimeout := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancelEvaluationTimeout()
receipt, err = t.ensureAnyTransactionEvaled(
ctxWithTimeout,
req.txAttempts,
Expand All @@ -272,38 +338,38 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*

if errors.Is(err, context.DeadlineExceeded) {
if receipt != nil {
t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
continue
}
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
newTx, err := t.speedUpTxn(ctx, req.Tx, req.Tag)
if err != nil {
t.logger.Error("failed to speed up transaction", "component", "TxnManager", "method", "monitorTransaction", "err", err)
t.logger.Error("failed to speed up transaction", "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
txID, err := t.wallet.SendTransaction(ctx, newTx)
if err != nil {
if retryFromFailure >= maxSendTransactionRetry {
t.logger.Warn("failed to send txn - retries exhausted", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.logger.Warn("failed to send txn - retries exhausted", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
} else {
t.logger.Warn("failed to send txn - retrying", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.logger.Warn("failed to send txn - retrying", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
}
retryFromFailure++
continue
}

t.logger.Debug("successfully sent txn", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txID", txID, "txHash", newTx.Hash().Hex())
t.logger.Debug("successfully sent txn", "tag", req.Tag, "txID", txID, "txHash", newTx.Hash().Hex())
req.Tx = newTx
req.txAttempts = append(req.txAttempts, &transaction{
TxID: txID,
Transaction: newTx,
})
numSpeedUps++
} else {
t.logger.Error("transaction failed", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.logger.Error("transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
Expand Down Expand Up @@ -335,7 +401,7 @@ func (t *txnManager) speedUpTxn(ctx context.Context, tx *types.Transaction, tag
newGasFeeCap = increasedGasFeeCap
}

t.logger.Info("increasing gas price", "component", "TxnManager", "method", "speedUpTxn", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
t.logger.Info("increasing gas price", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
return t.ethClient.UpdateGas(ctx, tx, tx.Value(), newGasTipCap, newGasFeeCap)
}

Expand Down
Loading
Loading