Skip to content

Commit

Permalink
rebase fix
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Oct 2, 2024
1 parent 5077963 commit db40fc9
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 526 deletions.
11 changes: 2 additions & 9 deletions .mockery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,7 @@ filename: "{{.InterfaceName | lower }}.generated.go"
mockname: "{{.InterfaceName}}"
outpkg: "mocks"
packages:
github.com/0xPolygon/zkevm-ethtx-manager/ethtxmanager:
github.com/0xPolygon/zkevm-ethtx-manager/types:
interfaces:
EthermanInterface:
config:
StorageInterface:
config:
dir: "{{.InterfaceDir}}"
filename: "{{.InterfaceName | lower }}.generated.go"
mockname: "{{.InterfaceName}}Mock"
outpkg: "{{.PackageName}}"
inpackage: True
config:
35 changes: 18 additions & 17 deletions ethtxmanager/ethtxmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type Client struct {
cancel context.CancelFunc

cfg Config
etherman EthermanInterface
storage StorageInterface
etherman types.EthermanInterface
storage types.StorageInterface
from common.Address
}

Expand Down Expand Up @@ -173,13 +173,13 @@ func pendingL1Txs(URL string, from common.Address, httpHeaders map[string]string

// Add a transaction to be sent and monitored
func (c *Client) Add(ctx context.Context, to *common.Address, value *big.Int,
data []byte, gasOffset uint64, sidecar *types.BlobTxSidecar) (common.Hash, error) {
data []byte, gasOffset uint64, sidecar *ethTypes.BlobTxSidecar) (common.Hash, error) {
return c.add(ctx, to, value, data, gasOffset, sidecar, 0)
}

// AddWithGas adds a transaction to be sent and monitored with a defined gas to be used so it's not estimated
func (c *Client) AddWithGas(ctx context.Context, to *common.Address,
value *big.Int, data []byte, gasOffset uint64, sidecar *types.BlobTxSidecar, gas uint64) (common.Hash, error) {
value *big.Int, data []byte, gasOffset uint64, sidecar *ethTypes.BlobTxSidecar, gas uint64) (common.Hash, error) {
return c.add(ctx, to, value, data, gasOffset, sidecar, gas)
}

Expand Down Expand Up @@ -486,7 +486,7 @@ func (c *Client) monitorTxs(ctx context.Context) error {
for _, mTx := range iterations {
mTx := mTx // force variable shadowing to avoid pointer conflicts
go func(c *Client, mTx *monitoredTxnIteration) {
mTxLogger := createMonitoredTxLogger(*mTx.monitoredTx)
mTxLogger := createMonitoredTxLogger(*mTx.MonitoredTx)
defer func(mTxLogger *log.Logger) {
if err := recover(); err != nil {
mTxLogger.Errorf("monitoring recovered from this err: %v", err)
Expand Down Expand Up @@ -591,10 +591,10 @@ func (c *Client) monitorTx(ctx context.Context, mTx *monitoredTxnIteration, logg
var err error
logger.Info("processing")

var signedTx *types.Transaction
var signedTx *ethTypes.Transaction
if !mTx.confirmed {
// review tx and increase gas and gas price if needed
if mTx.Status == MonitoredTxStatusSent {
if mTx.Status == types.MonitoredTxStatusSent {
err := c.reviewMonitoredTxGas(ctx, mTx, logger)
if err != nil {
logger.Errorf("failed to review monitored tx: %v", err)
Expand Down Expand Up @@ -623,7 +623,7 @@ func (c *Client) monitorTx(ctx context.Context, mTx *monitoredTxnIteration, logg
return
} else {
// update monitored tx changes into storage
err = c.storage.Update(ctx, *mTx.monitoredTx)
err = c.storage.Update(ctx, *mTx.MonitoredTx)
if err != nil {
logger.Errorf("failed to update monitored tx: %v", err)
return
Expand All @@ -647,7 +647,7 @@ func (c *Client) monitorTx(ctx context.Context, mTx *monitoredTxnIteration, logg
mTx.Status = types.MonitoredTxStatusSent
logger.Debugf("status changed to %v", string(mTx.Status))
// update monitored tx changes into storage
err = c.storage.Update(ctx, *mTx.monitoredTx)
err = c.storage.Update(ctx, *mTx.MonitoredTx)
if err != nil {
logger.Errorf("failed to update monitored tx changes: %v", err)
return
Expand Down Expand Up @@ -697,8 +697,8 @@ func (c *Client) monitorTx(ctx context.Context, mTx *monitoredTxnIteration, logg
}

// if mined, check receipt and mark as Failed or Confirmed
if mTx.lastReceipt.Status == types.ReceiptStatusSuccessful {
mTx.Status = MonitoredTxStatusMined
if mTx.lastReceipt.Status == ethTypes.ReceiptStatusSuccessful {
mTx.Status = types.MonitoredTxStatusMined
mTx.BlockNumber = mTx.lastReceipt.BlockNumber
logger.Info("mined")
} else {
Expand All @@ -708,13 +708,13 @@ func (c *Client) monitorTx(ctx context.Context, mTx *monitoredTxnIteration, logg
return
}
// otherwise we understand this monitored tx has failed
mTx.Status = MonitoredTxStatusFailed
mTx.Status = types.MonitoredTxStatusFailed
mTx.BlockNumber = mTx.lastReceipt.BlockNumber
logger.Info("failed")
}

// update monitored tx changes into storage
err = c.storage.Update(ctx, *mTx.monitoredTx)
err = c.storage.Update(ctx, *mTx.MonitoredTx)
if err != nil {
logger.Errorf("failed to update monitored tx: %v", err)
return
Expand All @@ -723,7 +723,7 @@ func (c *Client) monitorTx(ctx context.Context, mTx *monitoredTxnIteration, logg

// shouldContinueToMonitorThisTx checks the the tx receipt and decides if it should
// continue or not to monitor the monitored tx related to the tx from this receipt
func (c *Client) shouldContinueToMonitorThisTx(ctx context.Context, receipt *types.Receipt) bool {
func (c *Client) shouldContinueToMonitorThisTx(ctx context.Context, receipt *ethTypes.Receipt) bool {
// if the receipt has a is successful result, stop monitoring
if receipt.Status == ethTypes.ReceiptStatusSuccessful {
return false
Expand Down Expand Up @@ -845,7 +845,7 @@ func (c *Client) reviewMonitoredTxGas(ctx context.Context, mTx *monitoredTxnIter
mTx.Gas = gas
}

err = c.storage.Update(ctx, *mTx.monitoredTx)
err = c.storage.Update(ctx, *mTx.MonitoredTx)
if err != nil {
return fmt.Errorf("failed to update monitored tx changes: %w", err)
}
Expand All @@ -855,7 +855,8 @@ func (c *Client) reviewMonitoredTxGas(ctx context.Context, mTx *monitoredTxnIter

// getMonitoredTxnIteration gets all monitored txs that need to be sent or resent in current monitor iteration
func (c *Client) getMonitoredTxnIteration(ctx context.Context) ([]*monitoredTxnIteration, error) {
txsToUpdate, err := c.storage.GetByStatus(ctx, []MonitoredTxStatus{MonitoredTxStatusCreated, MonitoredTxStatusSent})
txsToUpdate, err := c.storage.GetByStatus(ctx,
[]types.MonitoredTxStatus{types.MonitoredTxStatusCreated, types.MonitoredTxStatusSent})
if err != nil {
return nil, fmt.Errorf("failed to get txs to update nonces: %w", err)
}
Expand All @@ -866,7 +867,7 @@ func (c *Client) getMonitoredTxnIteration(ctx context.Context) ([]*monitoredTxnI
for _, tx := range txsToUpdate {
tx := tx

iteration := &monitoredTxnIteration{monitoredTx: &tx}
iteration := &monitoredTxnIteration{MonitoredTx: &tx}
iterations = append(iterations, iteration)

updateNonce := iteration.shouldUpdateNonce(ctx, c.etherman)
Expand Down
88 changes: 58 additions & 30 deletions ethtxmanager/ethtxmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@ package ethtxmanager
import (
context "context"
"errors"
"math/big"
"testing"
"time"

localCommon "github.com/0xPolygon/zkevm-ethtx-manager/common"
"github.com/0xPolygon/zkevm-ethtx-manager/ethtxmanager/sqlstorage"
"github.com/0xPolygon/zkevm-ethtx-manager/mocks"
"github.com/0xPolygon/zkevm-ethtx-manager/types"
common "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestGetMonitoredTxnIteration(t *testing.T) {
ctx := context.Background()
etherman := mocks.NewEthermanInterface(t)
storage := NewMemStorage("")
storage, err := sqlstorage.NewStorage(localCommon.SQLLiteDriverName, ":memory:")
require.NoError(t, err)

client := &Client{
etherman: etherman,
Expand All @@ -24,7 +30,7 @@ func TestGetMonitoredTxnIteration(t *testing.T) {

tests := []struct {
name string
storageTxn *monitoredTx
storageTxn *types.MonitoredTx
ethermanNonce uint64
shouldUpdate bool
expectedResult []*monitoredTxnIteration
Expand All @@ -37,58 +43,63 @@ func TestGetMonitoredTxnIteration(t *testing.T) {
},
{
name: "Transaction should not update nonce",
storageTxn: &monitoredTx{
ID: common.HexToHash("0x1"),
From: common.HexToAddress("0x1"),
Status: MonitoredTxStatusSent,
storageTxn: &types.MonitoredTx{
ID: common.HexToHash("0x1"),
From: common.HexToAddress("0x1"),
BlockNumber: big.NewInt(10),
Status: types.MonitoredTxStatusSent,
History: map[common.Hash]bool{
common.HexToHash("0x1"): true,
},
},
shouldUpdate: false,
expectedResult: []*monitoredTxnIteration{
{
monitoredTx: &monitoredTx{
ID: common.HexToHash("0x1"),
From: common.HexToAddress("0x1"),
Status: MonitoredTxStatusSent,
MonitoredTx: &types.MonitoredTx{
ID: common.HexToHash("0x1"),
From: common.HexToAddress("0x1"),
BlockNumber: big.NewInt(10),
Status: types.MonitoredTxStatusSent,
History: map[common.Hash]bool{
common.HexToHash("0x1"): true,
},
},
confirmed: true,
lastReceipt: &types.Receipt{Status: types.ReceiptStatusSuccessful},
lastReceipt: &ethtypes.Receipt{Status: ethtypes.ReceiptStatusSuccessful},
},
},
expectedError: nil,
},
{
name: "Transaction should update nonce",
storageTxn: &monitoredTx{
ID: common.HexToHash("0x1"),
From: common.HexToAddress("0x1"),
Status: MonitoredTxStatusCreated,
storageTxn: &types.MonitoredTx{
ID: common.HexToHash("0x1"),
From: common.HexToAddress("0x1"),
Status: types.MonitoredTxStatusCreated,
BlockNumber: big.NewInt(10),
},
shouldUpdate: true,
ethermanNonce: 1,
expectedResult: []*monitoredTxnIteration{
{
monitoredTx: &monitoredTx{
ID: common.HexToHash("0x1"),
From: common.HexToAddress("0x1"),
Status: MonitoredTxStatusCreated,
Nonce: 1,
MonitoredTx: &types.MonitoredTx{
ID: common.HexToHash("0x1"),
From: common.HexToAddress("0x1"),
Status: types.MonitoredTxStatusCreated,
Nonce: 1,
BlockNumber: big.NewInt(10),
},
},
},
expectedError: nil,
},
{
name: "Error getting pending nonce",
storageTxn: &monitoredTx{
ID: common.HexToHash("0x1"),
From: common.HexToAddress("0x1"),
Status: MonitoredTxStatusCreated,
storageTxn: &types.MonitoredTx{
ID: common.HexToHash("0x1"),
From: common.HexToAddress("0x1"),
Status: types.MonitoredTxStatusCreated,
BlockNumber: big.NewInt(10),
},
shouldUpdate: true,
expectedError: errors.New("failed to get pending nonce for sender: 0x1. Error: some error"),
Expand All @@ -97,9 +108,9 @@ func TestGetMonitoredTxnIteration(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
storage.Transactions = make(map[common.Hash]monitoredTx, 1)
require.NoError(t, storage.Empty(ctx))
if tt.storageTxn != nil {
storage.Transactions[tt.storageTxn.ID] = *tt.storageTxn
require.NoError(t, storage.Add(ctx, *tt.storageTxn))
}

etherman.ExpectedCalls = nil
Expand All @@ -116,18 +127,35 @@ func TestGetMonitoredTxnIteration(t *testing.T) {
require.ErrorContains(t, err, tt.expectedError.Error())
} else {
require.NoError(t, err)
require.Equal(t, tt.expectedResult, result)
if len(tt.expectedResult) > 0 {
require.Len(t, result, len(tt.expectedResult))
compareTxsWithoutDates(t, *tt.expectedResult[0].MonitoredTx, *result[0].MonitoredTx)
} else {
require.Empty(t, result)
}

// now check from storage
if len(tt.expectedResult) > 0 {
dbTxns, err := storage.GetByStatus(ctx, []MonitoredTxStatus{tt.storageTxn.Status})
dbTxns, err := storage.GetByStatus(ctx, []types.MonitoredTxStatus{tt.storageTxn.Status})
require.NoError(t, err)
require.Len(t, dbTxns, 1)
require.Equal(t, tt.expectedResult[0].monitoredTx.Nonce, dbTxns[0].Nonce)
require.Equal(t, tt.expectedResult[0].MonitoredTx.Nonce, dbTxns[0].Nonce)
}
}

etherman.AssertExpectations(t)
})
}
}

// compareTxsWithout dates compares the two MonitoredTx instances, but without dates, since some functions are altering it
func compareTxsWithoutDates(t *testing.T, expected, actual types.MonitoredTx) {
t.Helper()

expected.CreatedAt = time.Time{}
expected.UpdatedAt = time.Time{}
actual.CreatedAt = time.Time{}
actual.UpdatedAt = time.Time{}

require.Equal(t, expected, actual)
}
75 changes: 75 additions & 0 deletions ethtxmanager/monitored_tx_iteration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package ethtxmanager

import (
"context"

"github.com/0xPolygon/zkevm-ethtx-manager/types"
ethtypes "github.com/ethereum/go-ethereum/core/types"
)

type monitoredTxnIteration struct {
*types.MonitoredTx
confirmed bool
lastReceipt *ethtypes.Receipt
}

func (m *monitoredTxnIteration) shouldUpdateNonce(ctx context.Context, etherman types.EthermanInterface) bool {
if m.Status == types.MonitoredTxStatusCreated {
// transaction was not sent, so no need to check if it was mined
// we need to update the nonce in this case
return true
}

// check if any of the txs in the history was confirmed
var lastReceiptChecked *ethtypes.Receipt
// monitored tx is confirmed until we find a successful receipt
confirmed := false
// monitored tx doesn't have a failed receipt until we find a failed receipt for any
// tx in the monitored tx history
hasFailedReceipts := false
// all history txs are considered mined until we can't find a receipt for any
// tx in the monitored tx history
allHistoryTxsWereMined := true
for txHash := range m.History {
mined, receipt, err := etherman.CheckTxWasMined(ctx, txHash)
if err != nil {
continue
}

// if the tx is not mined yet, check that not all the tx were mined and go to the next
if !mined {
allHistoryTxsWereMined = false
continue
}

lastReceiptChecked = receipt

// if the tx was mined successfully we can set it as confirmed and break the loop
if lastReceiptChecked.Status == ethtypes.ReceiptStatusSuccessful {
confirmed = true
break
}

// if the tx was mined but failed, we continue to consider it was not confirmed
// and set that we have found a failed receipt. This info will be used later
// to check if nonce needs to be reviewed
confirmed = false
hasFailedReceipts = true
}

m.confirmed = confirmed
m.lastReceipt = lastReceiptChecked

// we need to check if we need to review the nonce carefully, to avoid sending
// duplicated data to the roll-up and causing an unnecessary trusted state reorg.
//
// if we have failed receipts, this means at least one of the generated txs was mined,
// in this case maybe the current nonce was already consumed(if this is the first iteration
// of this cycle, next iteration might have the nonce already updated by the preivous one),
// then we need to check if there are tx that were not mined yet, if so, we just need to wait
// because maybe one of them will get mined successfully
//
// in case of the monitored tx is not confirmed yet, all tx were mined and none of them were
// mined successfully, we need to review the nonce
return !confirmed && hasFailedReceipts && allHistoryTxsWereMined
}
Loading

0 comments on commit db40fc9

Please sign in to comment.