Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

Commit

Permalink
logs
Browse files Browse the repository at this point in the history
  • Loading branch information
calbera committed Jan 30, 2024
1 parent ec7dffc commit 73754bb
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
27 changes: 20 additions & 7 deletions cosmos/runtime/txpool/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,56 @@ func (m *Mempool) AnteHandle(
telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs)
msgs := tx.GetMsgs()

ctx.Logger().Info("AnteHandle Polaris Mempool", "msgs", len(msgs), "simulate", simulate)

// TODO: Record the time it takes to build a payload.

// We only want to eject transactions from comet on recheck.
if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck {
ctx.Logger().Info("AnteHandle in Check/Recheck tx")
if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok {
ethTx := wet.Unwrap()
ctx.Logger().Info("AnteHandle for eth tx", "tx", ethTx.Hash())
if shouldEject := m.shouldEjectFromCometMempool(
ctx.BlockTime().Unix(), ethTx,
ctx, ethTx,
); shouldEject {
ctx.Logger().Info("AnteHandle dropping tx from comet mempool", "tx", ethTx.Hash())
m.crc.DropRemoteTx(ethTx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs)
return ctx, errors.New("eject from comet mempool")
}
ctx.Logger().Info("AnteHandle NOT dropping comet mempool", "tx", ethTx.Hash())
}
}
return next(ctx, tx, simulate)
}

// shouldEject returns true if the transaction should be ejected from the CometBFT mempool.
func (m *Mempool) shouldEjectFromCometMempool(
currentTime int64, tx *ethtypes.Transaction,
ctx sdk.Context, tx *ethtypes.Transaction,
) bool {
defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject)
if tx == nil {
ctx.Logger().Info("shouldEjectFromCometMempool: tx is nil")
return false
}

// First check things that are stateless.
if m.validateStateless(tx, currentTime) {
if m.validateStateless(ctx, tx) {
ctx.Logger().Info("shouldEjectFromCometMempool: stateless failed", "tx", tx.Hash())
return true
}

// Then check for things that are stateful.
return m.validateStateful(tx)
return m.validateStateful(ctx, tx)
}

// validateStateless returns whether the tx of the given hash is stateless.
func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool {
func (m *Mempool) validateStateless(ctx sdk.Context, tx *ethtypes.Transaction) bool {
txHash := tx.Hash()
currentTime := ctx.BlockTime().Unix()
ctx.Logger().Info("validateStateless", "txHash", txHash, "currentTime", currentTime)

// 1. If the transaction has been in the mempool for longer than the configured timeout.
// 2. If the transaction's gas params are less than or equal to the configured limit.
expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime
Expand All @@ -95,20 +106,22 @@ func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64)
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit)
}

ctx.Logger().Info("validateStateless", "expired", expired, "priceLeLimit", priceLeLimit)

return expired || priceLeLimit
}

// includedCanonicalChain returns whether the tx of the given hash is included in the canonical
// Eth chain.
func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool {
func (m *Mempool) validateStateful(ctx sdk.Context, tx *ethtypes.Transaction) bool {
// // 1. If the transaction has been included in a block.
// signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID)
// if _, err := ethtypes.Sender(signer, tx); err != nil {
// return true
// }

// tx.Nonce() <
included := m.chain.GetTransactionLookup(tx.Hash()) != nil
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion)
ctx.Logger().Info("validateStateful", "included", included)
return included
}
3 changes: 2 additions & 1 deletion cosmos/runtime/txpool/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool {
return ok
}

// Record the time the tx was inserted from Comet successfully.
// Record the first time the tx was inserted from Comet successfully.
func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) {
crc.timeInsertedMu.Lock()
// TODO: only insert a new timestamp if not already seen.
crc.timeInserted[txHash] = time.Now().Unix()
crc.timeInsertedMu.Unlock()
}
Expand Down
10 changes: 7 additions & 3 deletions cosmos/runtime/txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (h *handler) failedLoop() {
h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries)
continue
}
h.logger.Info("retrying failed tx", "tx", failed.tx.Hash(), "retries", failed.retries)
telemetry.IncrCounter(float32(1), MetricKeyBroadcastRetry)
h.broadcastTransaction(failed.tx, failed.retries-1)
}
Expand Down Expand Up @@ -225,11 +226,12 @@ func (h *handler) broadcastTransactions(txs ethtypes.Transactions) {
numBroadcasted := 0
for _, signedEthTx := range txs {
if !h.crc.IsRemoteTx(signedEthTx.Hash()) {
h.logger.Info("broadcasting local eth tx", "hash", signedEthTx.Hash().Hex())
h.broadcastTransaction(signedEthTx, maxRetries)
numBroadcasted++
}
}
h.logger.Debug(
h.logger.Info(
"broadcasting transactions", "num_received", len(txs), "num_broadcasted", numBroadcasted,
)
}
Expand All @@ -254,21 +256,23 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) {
// If rsp == 1, likely the txn is already in a block, and thus the broadcast failing is actually
// the desired behaviour.
if rsp == nil || rsp.Code == 0 || rsp.Code == 1 {
h.logger.Info("broadcasting to comet", "hash", tx.Hash(), "rsp", rsp, "code", rsp.Code)
return
}

switch rsp.Code {
case sdkerrors.ErrMempoolIsFull.ABCICode():
h.logger.Error("failed to broadcast: comet-bft mempool is full", "tx_hash", tx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyMempoolFull)
case
sdkerrors.ErrTxInMempoolCache.ABCICode():
case sdkerrors.ErrTxInMempoolCache.ABCICode():
return
default:
h.logger.Error("failed to broadcast transaction",
"codespace", rsp.Codespace, "code", rsp.Code, "info", rsp.Info, "tx_hash", tx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyBroadcastFailure)
}

h.logger.Info("failed to broadcast transaction", "tx_hash", tx.Hash(), "retries", retries)

h.failedTxs <- &failedTx{tx: tx, retries: retries}
}
9 changes: 9 additions & 0 deletions cosmos/runtime/txpool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"math/big"
"sync"
"time"

"cosmossdk.io/log"

Expand Down Expand Up @@ -111,12 +112,14 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
sCtx := sdk.UnwrapSDKContext(ctx)
msgs := sdkTx.GetMsgs()
if len(msgs) != 1 {
sCtx.Logger().Error("mempool insert: only one message is supported")
return errors.New("only one message is supported")
}

wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0])
if !ok {
// We have to return nil for non-ethereum transactions as to not fail check-tx.
sCtx.Logger().Info("mempool insert: not an ethereum transaction")
return nil
}

Expand All @@ -130,14 +133,20 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
if len(errs) > 0 {
// Handle case where a node broadcasts to itself, we don't want it to fail CheckTx.
if errors.Is(errs[0], ethtxpool.ErrAlreadyKnown) &&
// TODO: checking for CheckTx/ReCheck here is not necessary (only ever called in CheckTx)
(sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) {
telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs)
sCtx.Logger().Info("mempool insert: tx already in mempool", "mode", sCtx.ExecMode())
return nil
}
return errs[0]
}

// Add the eth tx to the remote cache.
sCtx.Logger().Info(
"mempool insert: marking remote seen", "tx", ethTx.Hash(), "time", time.Now().Unix(),
"is(already)RemoteTx", m.crc.IsRemoteTx(ethTx.Hash()),
)
m.crc.MarkRemoteSeen(ethTx.Hash())

return nil
Expand Down

0 comments on commit 73754bb

Please sign in to comment.