Skip to content

Commit

Permalink
broadcasting timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Apr 10, 2024
1 parent 45982f4 commit d5cf05c
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 72 deletions.
9 changes: 5 additions & 4 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type QuorumInfo struct {
}

type TimeoutConfig struct {
EncodingTimeout time.Duration
AttestationTimeout time.Duration
ChainReadTimeout time.Duration
ChainWriteTimeout time.Duration
EncodingTimeout time.Duration
AttestationTimeout time.Duration
ChainReadTimeout time.Duration
ChainWriteTimeout time.Duration
TxnBroadcastTimeout time.Duration
}

type Config struct {
Expand Down
9 changes: 5 additions & 4 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.
FinalizationBlockDelay: finalizationBlockDelay,
}
timeoutConfig := bat.TimeoutConfig{
EncodingTimeout: 10 * time.Second,
AttestationTimeout: 10 * time.Second,
ChainReadTimeout: 10 * time.Second,
ChainWriteTimeout: 10 * time.Second,
EncodingTimeout: 10 * time.Second,
AttestationTimeout: 10 * time.Second,
ChainReadTimeout: 10 * time.Second,
ChainWriteTimeout: 10 * time.Second,
TxnBroadcastTimeout: 10 * time.Second,
}

metrics := bat.NewMetrics("9100", logger)
Expand Down
9 changes: 5 additions & 4 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type EncodingStreamerMetrics struct {
}

type TxnManagerMetrics struct {
Latency prometheus.Summary
Latency *prometheus.SummaryVec
GasUsed prometheus.Gauge
SpeedUps prometheus.Gauge
TxQueue prometheus.Gauge
Expand Down Expand Up @@ -89,13 +89,14 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
}

txnManagerMetrics := TxnManagerMetrics{
Latency: promauto.With(reg).NewSummary(
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "txn_manager_latency_ms",
Help: "transaction confirmation latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"stage"},
),
GasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -313,8 +314,8 @@ func (e *EncodingStreamerMetrics) UpdateEncodedBlobs(count int, size uint64) {
e.EncodedBlobs.WithLabelValues("number").Set(float64(count))
}

func (t *TxnManagerMetrics) ObserveLatency(latencyMs float64) {
t.Latency.Observe(latencyMs)
func (t *TxnManagerMetrics) ObserveLatency(stage string, latencyMs float64) {
t.Latency.WithLabelValues(stage).Observe(latencyMs)
}

func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) {
Expand Down
137 changes: 100 additions & 37 deletions disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
gasPricePercentageMultiplier = big.NewInt(10)
hundred = big.NewInt(100)
maxSendTransactionRetry = 3
queryTickerDuration = 3 * time.Second
ErrTransactionNotBroadcasted = errors.New("transaction not broadcasted")
)

// TxnManager receives transactions from the caller, sends them to the chain, and monitors their status.
Expand All @@ -35,7 +37,8 @@ type TxnManager interface {

type transaction struct {
*types.Transaction
TxID walletsdk.TxID
TxID walletsdk.TxID
requestedAt time.Time
}

type TxnRequest struct {
Expand Down Expand Up @@ -69,25 +72,27 @@ type txnManager struct {
requestChan chan *TxnRequest
logger logging.Logger

receiptChan chan *ReceiptOrErr
queueSize int
txnRefreshInterval time.Duration
metrics *TxnManagerMetrics
receiptChan chan *ReceiptOrErr
queueSize int
txnBroadcastTimeout time.Duration
txnRefreshInterval time.Duration
metrics *TxnManagerMetrics
}

var _ TxnManager = (*txnManager)(nil)

func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnBroadcastTimeout time.Duration, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
return &txnManager{
ethClient: ethClient,
wallet: wallet,
numConfirmations: numConfirmations,
requestChan: make(chan *TxnRequest, queueSize),
logger: logger.With("component", "TxnManager"),
receiptChan: make(chan *ReceiptOrErr, queueSize),
queueSize: queueSize,
txnRefreshInterval: txnRefreshInterval,
metrics: metrics,
ethClient: ethClient,
wallet: wallet,
numConfirmations: numConfirmations,
requestChan: make(chan *TxnRequest, queueSize),
logger: logger.With("component", "TxnManager"),
receiptChan: make(chan *ReceiptOrErr, queueSize),
queueSize: queueSize,
txnBroadcastTimeout: txnBroadcastTimeout,
txnRefreshInterval: txnRefreshInterval,
metrics: metrics,
}
}

Expand Down Expand Up @@ -127,7 +132,7 @@ func (t *txnManager) Start(ctx context.Context) {
t.metrics.UpdateGasUsed(receipt.GasUsed)
}
}
t.metrics.ObserveLatency(float64(time.Since(req.requestedAt).Milliseconds()))
t.metrics.ObserveLatency("total", float64(time.Since(req.requestedAt).Milliseconds()))
}
}
}()
Expand All @@ -140,7 +145,7 @@ func (t *txnManager) Start(ctx context.Context) {
func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) error {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Debug("new transaction", "component", "TxnManager", "method", "ProcessTransaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())
t.logger.Debug("new transaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())

var txn *types.Transaction
var txID walletsdk.TxID
Expand All @@ -161,14 +166,14 @@ func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er
Timeout() bool
})
didTimeout := ok && terr.Timeout()
if didTimeout {
if didTimeout || errors.Is(err, context.DeadlineExceeded) {
t.logger.Warn("failed to send txn due to timeout", "tag", req.Tag, "hash", req.Tx.Hash().Hex(), "numRetries", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
retryFromFailure++
continue
} else if err != nil {
return fmt.Errorf("failed to send txn (%s) %s: %w", req.Tag, req.Tx.Hash().Hex(), err)
} else {
t.logger.Debug("successfully sent txn", "component", "TxnManager", "method", "ProcessTransaction", "tag", req.Tag, "txID", txID, "txHash", txn.Hash().Hex())
t.logger.Debug("successfully sent txn", "tag", req.Tag, "txID", txID, "txHash", txn.Hash().Hex())
break
}
}
Expand All @@ -181,6 +186,7 @@ func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er
req.txAttempts = append(req.txAttempts, &transaction{
TxID: txID,
Transaction: txn,
requestedAt: time.Now(),
})

t.requestChan <- req
Expand All @@ -192,8 +198,31 @@ func (t *txnManager) ReceiptChan() chan *ReceiptOrErr {
return t.receiptChan
}

// ensureAnyTransactionBroadcasted waits until all given transactions are broadcasted to the network.
func (t *txnManager) ensureAnyTransactionBroadcasted(ctx context.Context, txs []*transaction) error {
queryTicker := time.NewTicker(queryTickerDuration)
defer queryTicker.Stop()

for {
for _, tx := range txs {
_, err := t.wallet.GetTransactionReceipt(ctx, tx.TxID)
if err == nil || errors.Is(err, walletsdk.ErrReceiptNotYetAvailable) {
t.metrics.ObserveLatency("broadcasted", float64(time.Since(tx.requestedAt).Milliseconds()))
return nil
}
}

// Wait for the next round.
select {
case <-ctx.Done():
return ctx.Err()
case <-queryTicker.C:
}
}
}

func (t *txnManager) ensureAnyTransactionEvaled(ctx context.Context, txs []*transaction) (*types.Receipt, error) {
queryTicker := time.NewTicker(3 * time.Second)
queryTicker := time.NewTicker(queryTickerDuration)
defer queryTicker.Stop()
var receipt *types.Receipt
var err error
Expand All @@ -210,25 +239,32 @@ func (t *txnManager) ensureAnyTransactionEvaled(ctx context.Context, txs []*tran
chainTip, err := t.ethClient.BlockNumber(ctx)
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(t.numConfirmations) > chainTip {
t.logger.Debug("transaction has been mined but don't have enough confirmations at current chain tip", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", t.numConfirmations, "chainTip", chainTip)
t.logger.Debug("transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", t.numConfirmations, "chainTip", chainTip)
break
} else {
return receipt, nil
}
} else {
t.logger.Debug("failed to get chain tip while waiting for transaction to mine", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "err", err)
t.logger.Debug("failed to get chain tip while waiting for transaction to mine", "err", err)
}
}

if errors.Is(err, ethereum.NotFound) || errors.Is(err, walletsdk.ErrReceiptNotYetAvailable) {
t.logger.Debug("Transaction not yet mined", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
t.logger.Debug("Transaction not yet mined", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
} else if errors.Is(err, walletsdk.ErrTransactionFailed) {
t.logger.Debug("Transaction failed", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
t.logger.Debug("Transaction failed", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
delete(txnsToQuery, txID)
} else if err != nil {
t.logger.Debug("Transaction receipt retrieval failed", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "err", err)
} else if errors.Is(err, walletsdk.ErrNotYetBroadcasted) {
t.logger.Error("Transaction has not been broadcasted to network but attempted to retrieve receipt", "err", err)
} else {
t.logger.Debug("Transaction receipt retrieval failed", "err", err)
}
}

if len(txnsToQuery) == 0 {
return nil, fmt.Errorf("all transactions failed")
}

// Wait for the next round.
select {
case <-ctx.Done():
Expand All @@ -249,10 +285,37 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
var err error

rpcCallAttempt := func() error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancel()
t.logger.Debug("monitoring transaction", "component", "TxnManager", "method", "monitorTransaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())
t.logger.Debug("monitoring transaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())

ctxWithTimeout, cancelBroadcastTimeout := context.WithTimeout(ctx, t.txnBroadcastTimeout)
defer cancelBroadcastTimeout()

// Ensure transactions are broadcasted to the network before querying the receipt.
// This is to avoid querying the receipt of a transaction that hasn't been broadcasted yet.
// For example, when Fireblocks wallet is used, there may be delays in broadcasting the transaction due to latency from cosigning and MPC operations.
err = t.ensureAnyTransactionBroadcasted(ctxWithTimeout, req.txAttempts)
if err != nil && errors.Is(err, context.DeadlineExceeded) {
t.logger.Warn("transaction not broadcasted within timeout", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
fireblocksWallet, ok := t.wallet.(interface {
CancelTransactionBroadcast(ctx context.Context, txID walletsdk.TxID) (bool, error)
})
if ok {
// Consider these transactions failed as they haven't been broadcasted within timeout.
// Cancel these transactions to avoid blocking the next transactions.
for _, tx := range req.txAttempts {
cancelled, err := fireblocksWallet.CancelTransactionBroadcast(ctx, tx.TxID)
if err != nil {
t.logger.Warn("failed to cancel Fireblocks transaction broadcast", "txID", tx.TxID, "err", err)
} else if cancelled {
t.logger.Info("cancelled Fireblocks transaction broadcast because it didn't get broadcasted within timeout", "txID", tx.TxID, "timeout", t.txnBroadcastTimeout.String())
}
}
}
return ErrTransactionNotBroadcasted
}

ctxWithTimeout, cancelEvaluationTimeout := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancelEvaluationTimeout()
receipt, err = t.ensureAnyTransactionEvaled(
ctxWithTimeout,
req.txAttempts,
Expand All @@ -270,38 +333,38 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*

if errors.Is(err, context.DeadlineExceeded) {
if receipt != nil {
t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
continue
}
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
newTx, err := t.speedUpTxn(ctx, req.Tx, req.Tag)
if err != nil {
t.logger.Error("failed to speed up transaction", "component", "TxnManager", "method", "monitorTransaction", "err", err)
t.logger.Error("failed to speed up transaction", "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
txID, err := t.wallet.SendTransaction(ctx, newTx)
if err != nil {
if retryFromFailure >= maxSendTransactionRetry {
t.logger.Warn("failed to send txn - retries exhausted", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.logger.Warn("failed to send txn - retries exhausted", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
} else {
t.logger.Warn("failed to send txn - retrying", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.logger.Warn("failed to send txn - retrying", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
}
retryFromFailure++
continue
}

t.logger.Debug("successfully sent txn", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txID", txID, "txHash", newTx.Hash().Hex())
t.logger.Debug("successfully sent txn", "tag", req.Tag, "txID", txID, "txHash", newTx.Hash().Hex())
req.Tx = newTx
req.txAttempts = append(req.txAttempts, &transaction{
TxID: txID,
Transaction: newTx,
})
numSpeedUps++
} else {
t.logger.Error("transaction failed", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.logger.Error("transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
Expand Down Expand Up @@ -333,7 +396,7 @@ func (t *txnManager) speedUpTxn(ctx context.Context, tx *types.Transaction, tag
newGasFeeCap = increasedGasFeeCap
}

t.logger.Info("increasing gas price", "component", "TxnManager", "method", "speedUpTxn", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
t.logger.Info("increasing gas price", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
return t.ethClient.UpdateGas(ctx, tx, tx.Value(), newGasTipCap, newGasFeeCap)
}

Expand Down
Loading

0 comments on commit d5cf05c

Please sign in to comment.