Skip to content

Commit

Permalink
[TxnManager] Check if replaced txns are confirmed (#228)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Feb 9, 2024
1 parent ff2f2bd commit 9aaa171
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 40 deletions.
1 change: 1 addition & 0 deletions common/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ type EthClient interface {
EstimateGasPriceAndLimitAndSendTx(ctx context.Context, tx *types.Transaction, tag string, value *big.Int) (*types.Receipt, error)
UpdateGas(ctx context.Context, tx *types.Transaction, value, gasTipCap, gasFeeCap *big.Int) (*types.Transaction, error)
EnsureTransactionEvaled(ctx context.Context, tx *types.Transaction, tag string) (*types.Receipt, error)
EnsureAnyTransactionEvaled(ctx context.Context, txs []*types.Transaction, tag string) (*types.Receipt, error)
}
51 changes: 34 additions & 17 deletions common/geth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (c *EthClient) EstimateGasPriceAndLimitAndSendTx(
// EnsureTransactionEvaled waits for tx to be mined on the blockchain and returns the receipt.
// If the context times out but the receipt is available, it returns both receipt and error, noting that the transaction is confirmed but has not accumulated the required number of confirmations.
func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Transaction, tag string) (*types.Receipt, error) {
receipt, err := c.waitMined(ctx, tx)
receipt, err := c.waitMined(ctx, []*types.Transaction{tx})
if err != nil {
return receipt, fmt.Errorf("EnsureTransactionEvaled: failed to wait for transaction (%s) to mine: %w", tag, err)
}
Expand All @@ -214,36 +214,53 @@ func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Trans
return receipt, nil
}

// waitMined waits for tx to be mined on the blockchain and returns the receipt.
// EnsureAnyTransactionEvaled takes multiple transactions and waits for any of them to be mined on the blockchain and returns the receipt.
// If the context times out but the receipt is available, it returns both receipt and error, noting that the transaction is confirmed but has not accumulated the required number of confirmations.
func (c *EthClient) EnsureAnyTransactionEvaled(ctx context.Context, txs []*types.Transaction, tag string) (*types.Receipt, error) {
receipt, err := c.waitMined(ctx, txs)
if err != nil {
return receipt, fmt.Errorf("EnsureTransactionEvaled: failed to wait for transaction (%s) to mine: %w", tag, err)
}
if receipt.Status != 1 {
c.Logger.Error("Transaction Failed", "tag", tag, "txHash", receipt.TxHash.Hex(), "status", receipt.Status, "GasUsed", receipt.GasUsed)
return nil, ErrTransactionFailed
}
c.Logger.Trace("transaction confirmed", "txHash", receipt.TxHash.Hex(), "tag", tag, "gasUsed", receipt.GasUsed)
return receipt, nil
}

// waitMined takes multiple transactions and waits for any of them to be mined on the blockchain and returns the receipt.
// If the context times out but the receipt is available, it returns both receipt and error, noting that the transaction is confirmed but has not accumulated the required number of confirmations.
// Taken from https://github.com/ethereum/go-ethereum/blob/master/accounts/abi/bind/util.go#L32,
// but added a check for number of confirmations.
func (c *EthClient) waitMined(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
func (c *EthClient) waitMined(ctx context.Context, txs []*types.Transaction) (*types.Receipt, error) {
queryTicker := time.NewTicker(3 * time.Second)
defer queryTicker.Stop()
var receipt *types.Receipt
var err error
for {
receipt, err = c.TransactionReceipt(ctx, tx.Hash())
if err == nil {
chainTip, err := c.BlockNumber(ctx)
for _, tx := range txs {
receipt, err = c.TransactionReceipt(ctx, tx.Hash())
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(c.numConfirmations) > chainTip {
c.Logger.Trace("EnsureTransactionEvaled: transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", c.numConfirmations, "chainTip", chainTip)
chainTip, err := c.BlockNumber(ctx)
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(c.numConfirmations) > chainTip {
c.Logger.Trace("EnsureTransactionEvaled: transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", c.numConfirmations, "chainTip", chainTip)
break
} else {
return receipt, nil
}
} else {
return receipt, nil
c.Logger.Trace("EnsureTransactionEvaled: failed to get chain tip while waiting for transaction to mine", "err", err)
}
} else {
c.Logger.Trace("EnsureTransactionEvaled: failed to get chain tip while waiting for transaction to mine", "err", err)
}
}

if errors.Is(err, ethereum.NotFound) {
c.Logger.Trace("Transaction not yet mined", "txHash", tx.Hash().Hex())
} else if err != nil {
c.Logger.Trace("Receipt retrieval failed", "err", err)
if errors.Is(err, ethereum.NotFound) {
c.Logger.Trace("Transaction not yet mined", "txHash", tx.Hash().Hex())
} else if err != nil {
c.Logger.Trace("Transaction receipt retrieval failed", "err", err)
}
}

// Wait for the next round.
select {
case <-ctx.Done():
Expand Down
10 changes: 10 additions & 0 deletions common/mock/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,13 @@ func (mock *MockEthClient) EnsureTransactionEvaled(ctx context.Context, tx *type

return result, args.Error(1)
}

func (mock *MockEthClient) EnsureAnyTransactionEvaled(ctx context.Context, txs []*types.Transaction, tag string) (*types.Receipt, error) {
args := mock.Called()
var result *types.Receipt
if args.Get(0) != nil {
result = args.Get(0).(*types.Receipt)
}

return result, args.Error(1)
}
33 changes: 19 additions & 14 deletions disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/Layr-Labs/eigenda/common"
gcore "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
)

Expand All @@ -18,6 +17,7 @@ import (
var (
gasPricePercentageMultiplier = big.NewInt(10)
hundred = big.NewInt(100)
maxSpeedUpRetry = 3
)

// TxnManager receives transactions from the caller, sends them to the chain, and monitors their status.
Expand All @@ -37,6 +37,10 @@ type TxnRequest struct {
Metadata interface{}

requestedAt time.Time
// txAttempts are the transactions that have been attempted to be mined for this request.
// If a transaction hasn't been confirmed within the timeout and a replacement transaction is sent,
// the original transaction hash will be kept in this slice
txAttempts []*types.Transaction
}

// ReceiptOrErr is a wrapper for a transaction receipt or an error.
Expand Down Expand Up @@ -83,6 +87,7 @@ func NewTxnRequest(tx *types.Transaction, tag string, value *big.Int, metadata i
Metadata: metadata,

requestedAt: time.Now(),
txAttempts: make([]*types.Transaction, 0),
}
}

Expand Down Expand Up @@ -140,6 +145,7 @@ func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er
t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", txn.Hash().Hex())
}
req.Tx = txn
req.txAttempts = append(req.txAttempts, txn)

t.requestChan <- req
t.metrics.UpdateTxQueue(len(t.requestChan))
Expand All @@ -155,14 +161,15 @@ func (t *txnManager) ReceiptChan() chan *ReceiptOrErr {
// It returns an error if the transaction fails to be sent for reasons other than timeouts.
func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*types.Receipt, error) {
numSpeedUps := 0
retryFromFailure := 0
for {
ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancel()

t.logger.Debug("[TxnManager] monitoring transaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())
receipt, err := t.ethClient.EnsureTransactionEvaled(
receipt, err := t.ethClient.EnsureAnyTransactionEvaled(
ctxWithTimeout,
req.Tx,
req.txAttempts,
req.Tag,
)
if err == nil {
Expand All @@ -185,20 +192,18 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
}
err = t.ethClient.SendTransaction(ctx, newTx)
if err != nil {
if errors.Is(err, gcore.ErrNonceTooLow) {
// try to get the receipt again to see if the transaction has been mined
_, receiptErr := t.ethClient.TransactionReceipt(ctx, req.Tx.Hash())
if receiptErr == nil {
continue
}
}
t.logger.Error("[TxnManager] failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "err", err)
t.logger.Error("[TxnManager] failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSpeedUpRetry, "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
} else {
t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", newTx.Hash().Hex())
if retryFromFailure >= maxSpeedUpRetry {
return nil, err
}
retryFromFailure++
continue
}

t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", newTx.Hash().Hex())
req.Tx = newTx
req.txAttempts = append(req.txAttempts, newTx)
numSpeedUps++
} else {
t.logger.Error("[TxnManager] transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
Expand Down
90 changes: 81 additions & 9 deletions disperser/batcher/txn_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestProcessTransaction(t *testing.T) {
ethClient.On("GetLatestGasCaps").Return(big.NewInt(1e9), big.NewInt(1e9), nil)
ethClient.On("UpdateGas").Return(txn, nil)
ethClient.On("SendTransaction").Return(nil)
ethClient.On("EnsureTransactionEvaled").Return(&types.Receipt{
ethClient.On("EnsureAnyTransactionEvaled").Return(&types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
}, nil).Once()

Expand All @@ -44,11 +44,11 @@ func TestProcessTransaction(t *testing.T) {
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 1)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 1)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 1)
ethClient.AssertNumberOfCalls(t, "EnsureTransactionEvaled", 1)
ethClient.AssertNumberOfCalls(t, "EnsureAnyTransactionEvaled", 1)

// now test the case where the transaction fails
randomErr := fmt.Errorf("random error")
ethClient.On("EnsureTransactionEvaled").Return(nil, randomErr)
ethClient.On("EnsureAnyTransactionEvaled").Return(nil, randomErr)
err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
Tx: txn,
Tag: "test transaction",
Expand All @@ -62,7 +62,7 @@ func TestProcessTransaction(t *testing.T) {
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 2)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 2)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 2)
ethClient.AssertNumberOfCalls(t, "EnsureTransactionEvaled", 2)
ethClient.AssertNumberOfCalls(t, "EnsureAnyTransactionEvaled", 2)
}

func TestReplaceGasFee(t *testing.T) {
Expand All @@ -79,8 +79,8 @@ func TestReplaceGasFee(t *testing.T) {
ethClient.On("UpdateGas").Return(txn, nil)
ethClient.On("SendTransaction").Return(nil)
// assume that the transaction is not mined within the timeout
ethClient.On("EnsureTransactionEvaled").Return(nil, context.DeadlineExceeded).Once()
ethClient.On("EnsureTransactionEvaled").Return(&types.Receipt{
ethClient.On("EnsureAnyTransactionEvaled").Return(nil, context.DeadlineExceeded).Once()
ethClient.On("EnsureAnyTransactionEvaled").Return(&types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
}, nil)

Expand All @@ -94,7 +94,7 @@ func TestReplaceGasFee(t *testing.T) {
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 2)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 2)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 2)
ethClient.AssertNumberOfCalls(t, "EnsureTransactionEvaled", 2)
ethClient.AssertNumberOfCalls(t, "EnsureAnyTransactionEvaled", 2)
}

func TestTransactionFailure(t *testing.T) {
Expand All @@ -114,8 +114,8 @@ func TestTransactionFailure(t *testing.T) {
ethClient.On("UpdateGas").Return(nil, speedUpFailure).Once()
ethClient.On("SendTransaction").Return(nil)
// assume that the transaction is not mined within the timeout
ethClient.On("EnsureTransactionEvaled").Return(nil, context.DeadlineExceeded).Once()
ethClient.On("EnsureTransactionEvaled").Return(&types.Receipt{
ethClient.On("EnsureAnyTransactionEvaled").Return(nil, context.DeadlineExceeded).Once()
ethClient.On("EnsureAnyTransactionEvaled").Return(&types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
}, nil)

Expand All @@ -129,3 +129,75 @@ func TestTransactionFailure(t *testing.T) {
res := <-txnManager.ReceiptChan()
assert.Error(t, res.Err, speedUpFailure)
}

func TestSendTransactionRetry(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
txn := types.NewTransaction(0, common.HexToAddress("0x1"), big.NewInt(1e18), 100000, big.NewInt(1e9), []byte{})
ethClient.On("GetLatestGasCaps").Return(big.NewInt(1e9), big.NewInt(1e9), nil)
ethClient.On("UpdateGas").Return(txn, nil)
ethClient.On("SendTransaction").Return(nil).Once()
// assume that it fails to send the replacement transaction once
ethClient.On("SendTransaction").Return(fmt.Errorf("send txn failure")).Once()
// assume that the transaction is not mined within the timeout
ethClient.On("EnsureAnyTransactionEvaled").Return(nil, context.DeadlineExceeded).Once()
ethClient.On("EnsureAnyTransactionEvaled").Return(&types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
}, nil)

err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
})
<-ctx.Done()
assert.NoError(t, err)
res := <-txnManager.ReceiptChan()
assert.NoError(t, res.Err)
assert.Equal(t, uint64(1), res.Receipt.BlockNumber.Uint64())
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 2)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 2)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 2)
ethClient.AssertNumberOfCalls(t, "EnsureAnyTransactionEvaled", 2)
}

func TestSendTransactionRetryFailure(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
txn := types.NewTransaction(0, common.HexToAddress("0x1"), big.NewInt(1e18), 100000, big.NewInt(1e9), []byte{})
ethClient.On("GetLatestGasCaps").Return(big.NewInt(1e9), big.NewInt(1e9), nil)
ethClient.On("UpdateGas").Return(txn, nil)
ethClient.On("SendTransaction").Return(nil).Once()
// assume that it keeps failing to send the replacement transaction
sendErr := fmt.Errorf("send txn failure")
ethClient.On("SendTransaction").Return(sendErr)
// assume that the transaction is not mined within the timeout
ethClient.On("EnsureAnyTransactionEvaled").Return(nil, context.DeadlineExceeded)

err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
})
<-ctx.Done()
assert.NoError(t, err)
res := <-txnManager.ReceiptChan()
assert.Error(t, res.Err, sendErr)
assert.Nil(t, res.Receipt)
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 5)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 5)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 5)
ethClient.AssertNumberOfCalls(t, "EnsureAnyTransactionEvaled", 4)
}

0 comments on commit 9aaa171

Please sign in to comment.