diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index e2b20efd332..8095d68f8e3 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -22,7 +22,7 @@ func (svc *Service) handleEvent( ) error { raw := event.Raw - log.Infof("event found for msgHash: %v", common.Hash(event.MsgHash).Hex()) + log.Infof("event found for msgHash: %v, txHash: %v", common.Hash(event.MsgHash).Hex(), event.Raw.TxHash.Hex()) // handle chain re-org by checking Removed property, no need to // return error, just continue and do not process. diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go index 46b8f82002c..1a87f5e2605 100644 --- a/packages/relayer/indexer/subscribe.go +++ b/packages/relayer/indexer/subscribe.go @@ -30,6 +30,8 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { log.Info("context finished") return nil case err := <-errChan: + relayer.ErrorsEncounteredDuringSubscription.Inc() + return errors.Wrap(err, "errChan") } } @@ -43,6 +45,8 @@ func (svc *Service) subscribeMessageSent(ctx context.Context, chainID *big.Int, log.Errorf("svc.bridge.WatchMessageSent: %v", err) } + log.Info("resubscribing to WatchMessageSent events") + return svc.bridge.WatchMessageSent(&bind.WatchOpts{ Context: ctx, }, sink, nil) @@ -96,6 +100,7 @@ func (svc *Service) subscribeMessageStatusChanged(ctx context.Context, chainID * if err != nil { log.Errorf("svc.bridge.WatchMessageStatusChanged: %v", err) } + log.Info("resubscribing to WatchMessageStatusChanged events") return svc.bridge.WatchMessageStatusChanged(&bind.WatchOpts{ Context: ctx, diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index 2cc5ecbbf47..c75fc518c16 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -55,12 +55,13 @@ func (p *Processor) ProcessMessage( encodedSignalProof, err := p.prover.EncodedSignalProof(ctx, p.rpc, p.srcSignalServiceAddress, key, latestSyncedHeader) if err != nil { - log.Errorf("srcChainID: %v, destChainID: %v, txHash: %v: msgHash: %v, from: %v", + log.Errorf("srcChainID: %v, destChainID: %v, txHash: %v: msgHash: %v, from: %v encountered signalProofError %v", event.Message.SrcChainId, event.Message.DestChainId, event.Raw.TxHash.Hex(), common.Hash(event.MsgHash).Hex(), event.Message.Owner.Hex(), + err, ) return errors.Wrap(err, "p.prover.GetEncodedSignalProof") @@ -78,7 +79,15 @@ func (p *Processor) ProcessMessage( // message will fail when we try to process it if !received { - log.Warnf("msgHash %v not received on dest chain", common.Hash(event.MsgHash).Hex()) + log.Warnf( + "msgHash: %v, srcChainId: %v, encodedSignalProof: %v not received on dest chain", + common.Hash(event.MsgHash).Hex(), + event.Message.SrcChainId, + hex.EncodeToString(encodedSignalProof), + ) + + relayer.MessagesNotReceivedOnDestChain.Inc() + return errors.New("message not received") } @@ -89,8 +98,6 @@ func (p *Processor) ProcessMessage( relayer.EventsProcessed.Inc() - log.Infof("waiting for tx hash %v", hex.EncodeToString(tx.Hash().Bytes())) - receipt, err := relayer.WaitReceipt(ctx, p.destEthClient, tx.Hash()) if err != nil { return errors.Wrap(err, "relayer.WaitReceipt") @@ -107,7 +114,12 @@ func (p *Processor) ProcessMessage( return errors.Wrap(err, "p.destBridge.GetMessageStatus") } - log.Infof("updating message status to: %v", relayer.EventStatus(messageStatus).String()) + log.Infof( + "updating message status to: %v for txHash: %v, processed in txHash: %v", + relayer.EventStatus(messageStatus).String(), + event.Raw.TxHash.Hex(), + hex.EncodeToString(tx.Hash().Bytes()), + ) if messageStatus == uint8(relayer.EventStatusRetriable) { relayer.RetriableEvents.Inc() diff --git a/packages/relayer/message/wait_for_confirmations.go b/packages/relayer/message/wait_for_confirmations.go index 600ac574a02..d71c34dddbb 100644 --- a/packages/relayer/message/wait_for_confirmations.go +++ b/packages/relayer/message/wait_for_confirmations.go @@ -10,7 +10,6 @@ import ( ) func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error { - // TODO: make timeout a config var ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Minute) defer cancelFunc() diff --git a/packages/relayer/message/wait_header_synced.go b/packages/relayer/message/wait_header_synced.go index f839bb4872e..22c67e0e287 100644 --- a/packages/relayer/message/wait_header_synced.go +++ b/packages/relayer/message/wait_header_synced.go @@ -21,8 +21,9 @@ func (p *Processor) waitHeaderSynced(ctx context.Context, event *bridge.BridgeMe return ctx.Err() case <-ticker.C: log.Infof( - "msgHash: %v waiting to be processable. occured in block %v", + "msgHash: %v, txHash: %v is waiting to be processable. occured in block %v", common.Hash(event.MsgHash).Hex(), + event.Raw.TxHash.Hex(), event.Raw.BlockNumber, ) // get latest synced header since not every header is synced from L1 => L2, @@ -40,8 +41,9 @@ func (p *Processor) waitHeaderSynced(ctx context.Context, event *bridge.BridgeMe // header is caught up and processible if header.Number.Uint64() >= event.Raw.BlockNumber { log.Infof( - "msgHash: %v is processable. occured in block %v, latestSynced is block %v", + "msgHash: %v, txHash: %v is processable. occured in block %v, latestSynced is block %v", common.Hash(event.MsgHash).Hex(), + event.Raw.TxHash.Hex(), event.Raw.BlockNumber, header.Number.Uint64(), ) @@ -50,8 +52,9 @@ func (p *Processor) waitHeaderSynced(ctx context.Context, event *bridge.BridgeMe } log.Infof( - "msgHash: %v waiting to be processable. occured in block %v, latestSynced is block %v", + "msgHash: %v, txHash: %v is waiting to be processable. occured in block %v, latestSynced is block %v", common.Hash(event.MsgHash).Hex(), + event.Raw.TxHash.Hex(), event.Raw.BlockNumber, header.Number.Uint64(), ) diff --git a/packages/relayer/prometheus.go b/packages/relayer/prometheus.go index 8d3dfefb11f..b2df6d3c558 100644 --- a/packages/relayer/prometheus.go +++ b/packages/relayer/prometheus.go @@ -26,4 +26,12 @@ var ( Name: "events_processed_error_ops_total", Help: "The total number of processed events that failed due to an error", }) + MessagesNotReceivedOnDestChain = promauto.NewCounter(prometheus.CounterOpts{ + Name: "messages_not_received_on_dest_chain_opts_total", + Help: "The total number of messages that were not received on the destination chain", + }) + ErrorsEncounteredDuringSubscription = promauto.NewCounter(prometheus.CounterOpts{ + Name: "errors_encountered_during_subscription_opts_total", + Help: "The total number of errors that occured during active subscription", + }) ) diff --git a/packages/relayer/types.go b/packages/relayer/types.go index f9d32388ff8..81ccf346669 100644 --- a/packages/relayer/types.go +++ b/packages/relayer/types.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + log "github.com/sirupsen/logrus" ) var ( @@ -37,6 +38,8 @@ func WaitReceipt(ctx context.Context, confirmer confirmer, txHash common.Hash) ( ticker := time.NewTicker(time.Second) defer ticker.Stop() + log.Infof("waiting for transaction receipt for txHash %v", txHash.Hex()) + for { select { case <-ctx.Done(): @@ -51,6 +54,8 @@ func WaitReceipt(ctx context.Context, confirmer confirmer, txHash common.Hash) ( return nil, fmt.Errorf("transaction reverted, hash: %s", txHash) } + log.Infof("transaction receipt found for txHash %v", txHash.Hex()) + return receipt, nil } } @@ -59,7 +64,10 @@ func WaitReceipt(ctx context.Context, confirmer confirmer, txHash common.Hash) ( // WaitConfirmations won't return before N blocks confirmations have been seen // on destination chain. func WaitConfirmations(ctx context.Context, confirmer confirmer, confirmations uint64, txHash common.Hash) error { - ticker := time.NewTicker(time.Second) + log.Infof("txHash %v beginning waiting for confirmations", txHash.Hex()) + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() for { @@ -73,6 +81,8 @@ func WaitConfirmations(ctx context.Context, confirmer confirmer, confirmations u continue } + log.Errorf("txHash: %v encountered error getting receipt: %v", txHash.Hex(), err) + return err } @@ -81,10 +91,21 @@ func WaitConfirmations(ctx context.Context, confirmer confirmer, confirmations u return err } + want := receipt.BlockNumber.Uint64() + confirmations + log.Infof( + "txHash: %v waiting for %v confirmations which will happen in block number: %v, latestBlockNumber: %v", + txHash.Hex(), + confirmations, + want, + latest, + ) + if latest < receipt.BlockNumber.Uint64()+confirmations { continue } + log.Infof("txHash %v received %v confirmations, done", txHash.Hex(), confirmations) + return nil } }