diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index 18ea5905c..856da887c 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -44,19 +44,25 @@ func (m *Mempool) AnteHandle( telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs) msgs := tx.GetMsgs() + ctx.Logger().Info("AnteHandle Polaris Mempool", "msgs", len(msgs), "simulate", simulate) + // TODO: Record the time it takes to build a payload. // We only want to eject transactions from comet on recheck. if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck { + ctx.Logger().Info("AnteHandle in Check/Recheck tx") if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok { ethTx := wet.Unwrap() + ctx.Logger().Info("AnteHandle for eth tx", "tx", ethTx.Hash()) if shouldEject := m.shouldEjectFromCometMempool( - ctx.BlockTime().Unix(), ethTx, + ctx, ethTx, ); shouldEject { + ctx.Logger().Info("AnteHandle dropping tx from comet mempool", "tx", ethTx.Hash()) m.crc.DropRemoteTx(ethTx.Hash()) telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs) return ctx, errors.New("eject from comet mempool") } + ctx.Logger().Info("AnteHandle NOT dropping comet mempool", "tx", ethTx.Hash()) } } return next(ctx, tx, simulate) @@ -64,25 +70,30 @@ func (m *Mempool) AnteHandle( // shouldEject returns true if the transaction should be ejected from the CometBFT mempool. func (m *Mempool) shouldEjectFromCometMempool( - currentTime int64, tx *ethtypes.Transaction, + ctx sdk.Context, tx *ethtypes.Transaction, ) bool { defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject) if tx == nil { + ctx.Logger().Info("shouldEjectFromCometMempool: tx is nil") return false } // First check things that are stateless. - if m.validateStateless(tx, currentTime) { + if m.validateStateless(ctx, tx) { + ctx.Logger().Info("shouldEjectFromCometMempool: stateless failed", "tx", tx.Hash()) return true } // Then check for things that are stateful. - return m.validateStateful(tx) + return m.validateStateful(ctx, tx) } // validateStateless returns whether the tx of the given hash is stateless. -func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool { +func (m *Mempool) validateStateless(ctx sdk.Context, tx *ethtypes.Transaction) bool { txHash := tx.Hash() + currentTime := ctx.BlockTime().Unix() + ctx.Logger().Info("validateStateless", "txHash", txHash, "currentTime", currentTime) + // 1. If the transaction has been in the mempool for longer than the configured timeout. // 2. If the transaction's gas params are less than or equal to the configured limit. expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime @@ -95,20 +106,22 @@ func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit) } + ctx.Logger().Info("validateStateless", "expired", expired, "priceLeLimit", priceLeLimit) + return expired || priceLeLimit } // includedCanonicalChain returns whether the tx of the given hash is included in the canonical // Eth chain. -func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool { +func (m *Mempool) validateStateful(ctx sdk.Context, tx *ethtypes.Transaction) bool { // // 1. If the transaction has been included in a block. // signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID) // if _, err := ethtypes.Sender(signer, tx); err != nil { // return true // } - // tx.Nonce() < included := m.chain.GetTransactionLookup(tx.Hash()) != nil telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion) + ctx.Logger().Info("validateStateful", "included", included) return included } diff --git a/cosmos/runtime/txpool/comet.go b/cosmos/runtime/txpool/comet.go index c40809db9..9589f1eb5 100644 --- a/cosmos/runtime/txpool/comet.go +++ b/cosmos/runtime/txpool/comet.go @@ -59,9 +59,10 @@ func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool { return ok } -// Record the time the tx was inserted from Comet successfully. +// Record the first time the tx was inserted from Comet successfully. func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) { crc.timeInsertedMu.Lock() + // TODO: only insert a new timestamp if not already seen. crc.timeInserted[txHash] = time.Now().Unix() crc.timeInsertedMu.Unlock() } diff --git a/cosmos/runtime/txpool/handler.go b/cosmos/runtime/txpool/handler.go index 069003b68..2842dc0d8 100644 --- a/cosmos/runtime/txpool/handler.go +++ b/cosmos/runtime/txpool/handler.go @@ -172,6 +172,7 @@ func (h *handler) failedLoop() { h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries) continue } + h.logger.Info("retrying failed tx", "tx", failed.tx.Hash(), "retries", failed.retries) telemetry.IncrCounter(float32(1), MetricKeyBroadcastRetry) h.broadcastTransaction(failed.tx, failed.retries-1) } @@ -225,11 +226,12 @@ func (h *handler) broadcastTransactions(txs ethtypes.Transactions) { numBroadcasted := 0 for _, signedEthTx := range txs { if !h.crc.IsRemoteTx(signedEthTx.Hash()) { + h.logger.Info("broadcasting local eth tx", "hash", signedEthTx.Hash().Hex()) h.broadcastTransaction(signedEthTx, maxRetries) numBroadcasted++ } } - h.logger.Debug( + h.logger.Info( "broadcasting transactions", "num_received", len(txs), "num_broadcasted", numBroadcasted, ) } @@ -254,6 +256,7 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) { // If rsp == 1, likely the txn is already in a block, and thus the broadcast failing is actually // the desired behaviour. if rsp == nil || rsp.Code == 0 || rsp.Code == 1 { + h.logger.Info("broadcasting to comet", "hash", tx.Hash(), "rsp", rsp, "code", rsp.Code) return } @@ -261,8 +264,7 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) { case sdkerrors.ErrMempoolIsFull.ABCICode(): h.logger.Error("failed to broadcast: comet-bft mempool is full", "tx_hash", tx.Hash()) telemetry.IncrCounter(float32(1), MetricKeyMempoolFull) - case - sdkerrors.ErrTxInMempoolCache.ABCICode(): + case sdkerrors.ErrTxInMempoolCache.ABCICode(): return default: h.logger.Error("failed to broadcast transaction", @@ -270,5 +272,7 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) { telemetry.IncrCounter(float32(1), MetricKeyBroadcastFailure) } + h.logger.Info("failed to broadcast transaction", "tx_hash", tx.Hash(), "retries", retries) + h.failedTxs <- &failedTx{tx: tx, retries: retries} } diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index ecf582eb1..3aca3868d 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -25,6 +25,7 @@ import ( "errors" "math/big" "sync" + "time" "cosmossdk.io/log" @@ -111,12 +112,14 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { sCtx := sdk.UnwrapSDKContext(ctx) msgs := sdkTx.GetMsgs() if len(msgs) != 1 { + sCtx.Logger().Error("mempool insert: only one message is supported") return errors.New("only one message is supported") } wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]) if !ok { // We have to return nil for non-ethereum transactions as to not fail check-tx. + sCtx.Logger().Info("mempool insert: not an ethereum transaction") return nil } @@ -130,14 +133,20 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { if len(errs) > 0 { // Handle case where a node broadcasts to itself, we don't want it to fail CheckTx. if errors.Is(errs[0], ethtxpool.ErrAlreadyKnown) && + // TODO: checking for CheckTx/ReCheck here is not necessary (only ever called in CheckTx) (sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) { telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs) + sCtx.Logger().Info("mempool insert: tx already in mempool", "mode", sCtx.ExecMode()) return nil } return errs[0] } // Add the eth tx to the remote cache. + sCtx.Logger().Info( + "mempool insert: marking remote seen", "tx", ethTx.Hash(), "time", time.Now().Unix(), + "is(already)RemoteTx", m.crc.IsRemoteTx(ethTx.Hash()), + ) m.crc.MarkRemoteSeen(ethTx.Hash()) return nil