Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TxnManager] Check if replaced txns are confirmed #228

Merged
merged 2 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ type EthClient interface {
EstimateGasPriceAndLimitAndSendTx(ctx context.Context, tx *types.Transaction, tag string, value *big.Int) (*types.Receipt, error)
UpdateGas(ctx context.Context, tx *types.Transaction, value, gasTipCap, gasFeeCap *big.Int) (*types.Transaction, error)
EnsureTransactionEvaled(ctx context.Context, tx *types.Transaction, tag string) (*types.Receipt, error)
EnsureAnyTransactionEvaled(ctx context.Context, txs []*types.Transaction, tag string) (*types.Receipt, error)
}
51 changes: 34 additions & 17 deletions common/geth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (c *EthClient) EstimateGasPriceAndLimitAndSendTx(
// EnsureTransactionEvaled waits for tx to be mined on the blockchain and returns the receipt.
// If the context times out but the receipt is available, it returns both receipt and error, noting that the transaction is confirmed but has not accumulated the required number of confirmations.
func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Transaction, tag string) (*types.Receipt, error) {
receipt, err := c.waitMined(ctx, tx)
receipt, err := c.waitMined(ctx, []*types.Transaction{tx})
if err != nil {
return receipt, fmt.Errorf("EnsureTransactionEvaled: failed to wait for transaction (%s) to mine: %w", tag, err)
}
Expand All @@ -214,36 +214,53 @@ func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Trans
return receipt, nil
}

// waitMined waits for tx to be mined on the blockchain and returns the receipt.
// EnsureAnyTransactionEvaled takes multiple transactions and waits for any of them to be mined on the blockchain and returns the receipt.
// If the context times out but the receipt is available, it returns both receipt and error, noting that the transaction is confirmed but has not accumulated the required number of confirmations.
func (c *EthClient) EnsureAnyTransactionEvaled(ctx context.Context, txs []*types.Transaction, tag string) (*types.Receipt, error) {
receipt, err := c.waitMined(ctx, txs)
if err != nil {
return receipt, fmt.Errorf("EnsureTransactionEvaled: failed to wait for transaction (%s) to mine: %w", tag, err)
}
if receipt.Status != 1 {
c.Logger.Error("Transaction Failed", "tag", tag, "txHash", receipt.TxHash.Hex(), "status", receipt.Status, "GasUsed", receipt.GasUsed)
return nil, ErrTransactionFailed
}
c.Logger.Trace("transaction confirmed", "txHash", receipt.TxHash.Hex(), "tag", tag, "gasUsed", receipt.GasUsed)
return receipt, nil
}

// waitMined takes multiple transactions and waits for any of them to be mined on the blockchain and returns the receipt.
// If the context times out but the receipt is available, it returns both receipt and error, noting that the transaction is confirmed but has not accumulated the required number of confirmations.
// Taken from https://github.com/ethereum/go-ethereum/blob/master/accounts/abi/bind/util.go#L32,
// but added a check for number of confirmations.
func (c *EthClient) waitMined(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
func (c *EthClient) waitMined(ctx context.Context, txs []*types.Transaction) (*types.Receipt, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks there is no constraints on what those txns should be based on impl below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this method doesn't put any constraints on which txns it's waiting on. It just returns anything that is confirmed first.

queryTicker := time.NewTicker(3 * time.Second)
defer queryTicker.Stop()
var receipt *types.Receipt
var err error
for {
receipt, err = c.TransactionReceipt(ctx, tx.Hash())
if err == nil {
chainTip, err := c.BlockNumber(ctx)
for _, tx := range txs {
receipt, err = c.TransactionReceipt(ctx, tx.Hash())
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(c.numConfirmations) > chainTip {
c.Logger.Trace("EnsureTransactionEvaled: transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", c.numConfirmations, "chainTip", chainTip)
chainTip, err := c.BlockNumber(ctx)
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(c.numConfirmations) > chainTip {
c.Logger.Trace("EnsureTransactionEvaled: transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", c.numConfirmations, "chainTip", chainTip)
break
} else {
return receipt, nil
}
} else {
return receipt, nil
c.Logger.Trace("EnsureTransactionEvaled: failed to get chain tip while waiting for transaction to mine", "err", err)
}
} else {
c.Logger.Trace("EnsureTransactionEvaled: failed to get chain tip while waiting for transaction to mine", "err", err)
}
}

if errors.Is(err, ethereum.NotFound) {
c.Logger.Trace("Transaction not yet mined", "txHash", tx.Hash().Hex())
} else if err != nil {
c.Logger.Trace("Receipt retrieval failed", "err", err)
if errors.Is(err, ethereum.NotFound) {
c.Logger.Trace("Transaction not yet mined", "txHash", tx.Hash().Hex())
} else if err != nil {
c.Logger.Trace("Transaction receipt retrieval failed", "err", err)
}
}

// Wait for the next round.
select {
case <-ctx.Done():
Expand Down
10 changes: 10 additions & 0 deletions common/mock/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,13 @@ func (mock *MockEthClient) EnsureTransactionEvaled(ctx context.Context, tx *type

return result, args.Error(1)
}

func (mock *MockEthClient) EnsureAnyTransactionEvaled(ctx context.Context, txs []*types.Transaction, tag string) (*types.Receipt, error) {
args := mock.Called()
var result *types.Receipt
if args.Get(0) != nil {
result = args.Get(0).(*types.Receipt)
}

return result, args.Error(1)
}
33 changes: 19 additions & 14 deletions disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/Layr-Labs/eigenda/common"
gcore "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
)

Expand All @@ -18,6 +17,7 @@ import (
var (
gasPricePercentageMultiplier = big.NewInt(10)
hundred = big.NewInt(100)
maxSpeedUpRetry = 3
)

// TxnManager receives transactions from the caller, sends them to the chain, and monitors their status.
Expand All @@ -37,6 +37,10 @@ type TxnRequest struct {
Metadata interface{}

requestedAt time.Time
// txAttempts are the transactions that have been attempted to be mined for this request.
// If a transaction hasn't been confirmed within the timeout and a replacement transaction is sent,
// the original transaction hash will be kept in this slice
txAttempts []*types.Transaction
siddimore marked this conversation as resolved.
Show resolved Hide resolved
}

// ReceiptOrErr is a wrapper for a transaction receipt or an error.
Expand Down Expand Up @@ -83,6 +87,7 @@ func NewTxnRequest(tx *types.Transaction, tag string, value *big.Int, metadata i
Metadata: metadata,

requestedAt: time.Now(),
txAttempts: make([]*types.Transaction, 0),
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -140,6 +145,7 @@ func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er
t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", txn.Hash().Hex())
}
req.Tx = txn
req.txAttempts = append(req.txAttempts, txn)

t.requestChan <- req
t.metrics.UpdateTxQueue(len(t.requestChan))
Expand All @@ -155,14 +161,15 @@ func (t *txnManager) ReceiptChan() chan *ReceiptOrErr {
// It returns an error if the transaction fails to be sent for reasons other than timeouts.
func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*types.Receipt, error) {
numSpeedUps := 0
retryFromFailure := 0
for {
ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancel()

t.logger.Debug("[TxnManager] monitoring transaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())
receipt, err := t.ethClient.EnsureTransactionEvaled(
receipt, err := t.ethClient.EnsureAnyTransactionEvaled(
ctxWithTimeout,
req.Tx,
req.txAttempts,
req.Tag,
)
if err == nil {
Expand All @@ -185,20 +192,18 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
}
err = t.ethClient.SendTransaction(ctx, newTx)
if err != nil {
if errors.Is(err, gcore.ErrNonceTooLow) {
// try to get the receipt again to see if the transaction has been mined
_, receiptErr := t.ethClient.TransactionReceipt(ctx, req.Tx.Hash())
if receiptErr == nil {
continue
}
}
t.logger.Error("[TxnManager] failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "err", err)
t.logger.Error("[TxnManager] failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSpeedUpRetry, "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
} else {
t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", newTx.Hash().Hex())
if retryFromFailure >= maxSpeedUpRetry {
return nil, err
}
retryFromFailure++
continue
}

t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", newTx.Hash().Hex())
req.Tx = newTx
req.txAttempts = append(req.txAttempts, newTx)
numSpeedUps++
} else {
t.logger.Error("[TxnManager] transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
Expand Down
90 changes: 81 additions & 9 deletions disperser/batcher/txn_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestProcessTransaction(t *testing.T) {
ethClient.On("GetLatestGasCaps").Return(big.NewInt(1e9), big.NewInt(1e9), nil)
ethClient.On("UpdateGas").Return(txn, nil)
ethClient.On("SendTransaction").Return(nil)
ethClient.On("EnsureTransactionEvaled").Return(&types.Receipt{
ethClient.On("EnsureAnyTransactionEvaled").Return(&types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
}, nil).Once()

Expand All @@ -44,11 +44,11 @@ func TestProcessTransaction(t *testing.T) {
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 1)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 1)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 1)
ethClient.AssertNumberOfCalls(t, "EnsureTransactionEvaled", 1)
ethClient.AssertNumberOfCalls(t, "EnsureAnyTransactionEvaled", 1)

// now test the case where the transaction fails
randomErr := fmt.Errorf("random error")
ethClient.On("EnsureTransactionEvaled").Return(nil, randomErr)
ethClient.On("EnsureAnyTransactionEvaled").Return(nil, randomErr)
err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
Tx: txn,
Tag: "test transaction",
Expand All @@ -62,7 +62,7 @@ func TestProcessTransaction(t *testing.T) {
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 2)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 2)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 2)
ethClient.AssertNumberOfCalls(t, "EnsureTransactionEvaled", 2)
ethClient.AssertNumberOfCalls(t, "EnsureAnyTransactionEvaled", 2)
}

func TestReplaceGasFee(t *testing.T) {
Expand All @@ -79,8 +79,8 @@ func TestReplaceGasFee(t *testing.T) {
ethClient.On("UpdateGas").Return(txn, nil)
ethClient.On("SendTransaction").Return(nil)
// assume that the transaction is not mined within the timeout
ethClient.On("EnsureTransactionEvaled").Return(nil, context.DeadlineExceeded).Once()
ethClient.On("EnsureTransactionEvaled").Return(&types.Receipt{
ethClient.On("EnsureAnyTransactionEvaled").Return(nil, context.DeadlineExceeded).Once()
ethClient.On("EnsureAnyTransactionEvaled").Return(&types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
}, nil)

Expand All @@ -94,7 +94,7 @@ func TestReplaceGasFee(t *testing.T) {
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 2)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 2)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 2)
ethClient.AssertNumberOfCalls(t, "EnsureTransactionEvaled", 2)
ethClient.AssertNumberOfCalls(t, "EnsureAnyTransactionEvaled", 2)
}

func TestTransactionFailure(t *testing.T) {
Expand All @@ -114,8 +114,8 @@ func TestTransactionFailure(t *testing.T) {
ethClient.On("UpdateGas").Return(nil, speedUpFailure).Once()
ethClient.On("SendTransaction").Return(nil)
// assume that the transaction is not mined within the timeout
ethClient.On("EnsureTransactionEvaled").Return(nil, context.DeadlineExceeded).Once()
ethClient.On("EnsureTransactionEvaled").Return(&types.Receipt{
ethClient.On("EnsureAnyTransactionEvaled").Return(nil, context.DeadlineExceeded).Once()
ethClient.On("EnsureAnyTransactionEvaled").Return(&types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
}, nil)

Expand All @@ -129,3 +129,75 @@ func TestTransactionFailure(t *testing.T) {
res := <-txnManager.ReceiptChan()
assert.Error(t, res.Err, speedUpFailure)
}

func TestSendTransactionRetry(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
txn := types.NewTransaction(0, common.HexToAddress("0x1"), big.NewInt(1e18), 100000, big.NewInt(1e9), []byte{})
ethClient.On("GetLatestGasCaps").Return(big.NewInt(1e9), big.NewInt(1e9), nil)
ethClient.On("UpdateGas").Return(txn, nil)
ethClient.On("SendTransaction").Return(nil).Once()
// assume that it fails to send the replacement transaction once
ethClient.On("SendTransaction").Return(fmt.Errorf("send txn failure")).Once()
// assume that the transaction is not mined within the timeout
ethClient.On("EnsureAnyTransactionEvaled").Return(nil, context.DeadlineExceeded).Once()
ethClient.On("EnsureAnyTransactionEvaled").Return(&types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
}, nil)

err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
})
<-ctx.Done()
assert.NoError(t, err)
res := <-txnManager.ReceiptChan()
assert.NoError(t, res.Err)
assert.Equal(t, uint64(1), res.Receipt.BlockNumber.Uint64())
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 2)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 2)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 2)
ethClient.AssertNumberOfCalls(t, "EnsureAnyTransactionEvaled", 2)
}

func TestSendTransactionRetryFailure(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
txn := types.NewTransaction(0, common.HexToAddress("0x1"), big.NewInt(1e18), 100000, big.NewInt(1e9), []byte{})
ethClient.On("GetLatestGasCaps").Return(big.NewInt(1e9), big.NewInt(1e9), nil)
ethClient.On("UpdateGas").Return(txn, nil)
ethClient.On("SendTransaction").Return(nil).Once()
// assume that it keeps failing to send the replacement transaction
sendErr := fmt.Errorf("send txn failure")
ethClient.On("SendTransaction").Return(sendErr)
// assume that the transaction is not mined within the timeout
ethClient.On("EnsureAnyTransactionEvaled").Return(nil, context.DeadlineExceeded)

err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
})
<-ctx.Done()
assert.NoError(t, err)
res := <-txnManager.ReceiptChan()
assert.Error(t, res.Err, sendErr)
assert.Nil(t, res.Receipt)
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 5)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 5)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 5)
ethClient.AssertNumberOfCalls(t, "EnsureAnyTransactionEvaled", 4)
}
Loading