Skip to content

Commit

Permalink
feat(relayer): Add improved logging/prometheus events for testnet (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey authored Mar 20, 2023
1 parent eb2faf7 commit 03bba59
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 11 deletions.
2 changes: 1 addition & 1 deletion packages/relayer/indexer/handle_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (svc *Service) handleEvent(
) error {
raw := event.Raw

log.Infof("event found for msgHash: %v", common.Hash(event.MsgHash).Hex())
log.Infof("event found for msgHash: %v, txHash: %v", common.Hash(event.MsgHash).Hex(), event.Raw.TxHash.Hex())

// handle chain re-org by checking Removed property, no need to
// return error, just continue and do not process.
Expand Down
5 changes: 5 additions & 0 deletions packages/relayer/indexer/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {
log.Info("context finished")
return nil
case err := <-errChan:
relayer.ErrorsEncounteredDuringSubscription.Inc()

return errors.Wrap(err, "errChan")
}
}
Expand All @@ -43,6 +45,8 @@ func (svc *Service) subscribeMessageSent(ctx context.Context, chainID *big.Int,
log.Errorf("svc.bridge.WatchMessageSent: %v", err)
}

log.Info("resubscribing to WatchMessageSent events")

return svc.bridge.WatchMessageSent(&bind.WatchOpts{
Context: ctx,
}, sink, nil)
Expand Down Expand Up @@ -96,6 +100,7 @@ func (svc *Service) subscribeMessageStatusChanged(ctx context.Context, chainID *
if err != nil {
log.Errorf("svc.bridge.WatchMessageStatusChanged: %v", err)
}
log.Info("resubscribing to WatchMessageStatusChanged events")

return svc.bridge.WatchMessageStatusChanged(&bind.WatchOpts{
Context: ctx,
Expand Down
22 changes: 17 additions & 5 deletions packages/relayer/message/process_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ func (p *Processor) ProcessMessage(

encodedSignalProof, err := p.prover.EncodedSignalProof(ctx, p.rpc, p.srcSignalServiceAddress, key, latestSyncedHeader)
if err != nil {
log.Errorf("srcChainID: %v, destChainID: %v, txHash: %v: msgHash: %v, from: %v",
log.Errorf("srcChainID: %v, destChainID: %v, txHash: %v: msgHash: %v, from: %v encountered signalProofError %v",
event.Message.SrcChainId,
event.Message.DestChainId,
event.Raw.TxHash.Hex(),
common.Hash(event.MsgHash).Hex(),
event.Message.Owner.Hex(),
err,
)

return errors.Wrap(err, "p.prover.GetEncodedSignalProof")
Expand All @@ -78,7 +79,15 @@ func (p *Processor) ProcessMessage(

// message will fail when we try to process it
if !received {
log.Warnf("msgHash %v not received on dest chain", common.Hash(event.MsgHash).Hex())
log.Warnf(
"msgHash: %v, srcChainId: %v, encodedSignalProof: %v not received on dest chain",
common.Hash(event.MsgHash).Hex(),
event.Message.SrcChainId,
hex.EncodeToString(encodedSignalProof),
)

relayer.MessagesNotReceivedOnDestChain.Inc()

return errors.New("message not received")
}

Expand All @@ -89,8 +98,6 @@ func (p *Processor) ProcessMessage(

relayer.EventsProcessed.Inc()

log.Infof("waiting for tx hash %v", hex.EncodeToString(tx.Hash().Bytes()))

receipt, err := relayer.WaitReceipt(ctx, p.destEthClient, tx.Hash())
if err != nil {
return errors.Wrap(err, "relayer.WaitReceipt")
Expand All @@ -107,7 +114,12 @@ func (p *Processor) ProcessMessage(
return errors.Wrap(err, "p.destBridge.GetMessageStatus")
}

log.Infof("updating message status to: %v", relayer.EventStatus(messageStatus).String())
log.Infof(
"updating message status to: %v for txHash: %v, processed in txHash: %v",
relayer.EventStatus(messageStatus).String(),
event.Raw.TxHash.Hex(),
hex.EncodeToString(tx.Hash().Bytes()),
)

if messageStatus == uint8(relayer.EventStatusRetriable) {
relayer.RetriableEvents.Inc()
Expand Down
1 change: 0 additions & 1 deletion packages/relayer/message/wait_for_confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
)

func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error {
// TODO: make timeout a config var
ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Minute)

defer cancelFunc()
Expand Down
9 changes: 6 additions & 3 deletions packages/relayer/message/wait_header_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ func (p *Processor) waitHeaderSynced(ctx context.Context, event *bridge.BridgeMe
return ctx.Err()
case <-ticker.C:
log.Infof(
"msgHash: %v waiting to be processable. occured in block %v",
"msgHash: %v, txHash: %v is waiting to be processable. occured in block %v",
common.Hash(event.MsgHash).Hex(),
event.Raw.TxHash.Hex(),
event.Raw.BlockNumber,
)
// get latest synced header since not every header is synced from L1 => L2,
Expand All @@ -40,8 +41,9 @@ func (p *Processor) waitHeaderSynced(ctx context.Context, event *bridge.BridgeMe
// header is caught up and processible
if header.Number.Uint64() >= event.Raw.BlockNumber {
log.Infof(
"msgHash: %v is processable. occured in block %v, latestSynced is block %v",
"msgHash: %v, txHash: %v is processable. occured in block %v, latestSynced is block %v",
common.Hash(event.MsgHash).Hex(),
event.Raw.TxHash.Hex(),
event.Raw.BlockNumber,
header.Number.Uint64(),
)
Expand All @@ -50,8 +52,9 @@ func (p *Processor) waitHeaderSynced(ctx context.Context, event *bridge.BridgeMe
}

log.Infof(
"msgHash: %v waiting to be processable. occured in block %v, latestSynced is block %v",
"msgHash: %v, txHash: %v is waiting to be processable. occured in block %v, latestSynced is block %v",
common.Hash(event.MsgHash).Hex(),
event.Raw.TxHash.Hex(),
event.Raw.BlockNumber,
header.Number.Uint64(),
)
Expand Down
8 changes: 8 additions & 0 deletions packages/relayer/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ var (
Name: "events_processed_error_ops_total",
Help: "The total number of processed events that failed due to an error",
})
MessagesNotReceivedOnDestChain = promauto.NewCounter(prometheus.CounterOpts{
Name: "messages_not_received_on_dest_chain_opts_total",
Help: "The total number of messages that were not received on the destination chain",
})
ErrorsEncounteredDuringSubscription = promauto.NewCounter(prometheus.CounterOpts{
Name: "errors_encountered_during_subscription_opts_total",
Help: "The total number of errors that occured during active subscription",
})
)
23 changes: 22 additions & 1 deletion packages/relayer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
log "github.com/sirupsen/logrus"
)

var (
Expand Down Expand Up @@ -37,6 +38,8 @@ func WaitReceipt(ctx context.Context, confirmer confirmer, txHash common.Hash) (
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

log.Infof("waiting for transaction receipt for txHash %v", txHash.Hex())

for {
select {
case <-ctx.Done():
Expand All @@ -51,6 +54,8 @@ func WaitReceipt(ctx context.Context, confirmer confirmer, txHash common.Hash) (
return nil, fmt.Errorf("transaction reverted, hash: %s", txHash)
}

log.Infof("transaction receipt found for txHash %v", txHash.Hex())

return receipt, nil
}
}
Expand All @@ -59,7 +64,10 @@ func WaitReceipt(ctx context.Context, confirmer confirmer, txHash common.Hash) (
// WaitConfirmations won't return before N blocks confirmations have been seen
// on destination chain.
func WaitConfirmations(ctx context.Context, confirmer confirmer, confirmations uint64, txHash common.Hash) error {
ticker := time.NewTicker(time.Second)
log.Infof("txHash %v beginning waiting for confirmations", txHash.Hex())

ticker := time.NewTicker(10 * time.Second)

defer ticker.Stop()

for {
Expand All @@ -73,6 +81,8 @@ func WaitConfirmations(ctx context.Context, confirmer confirmer, confirmations u
continue
}

log.Errorf("txHash: %v encountered error getting receipt: %v", txHash.Hex(), err)

return err
}

Expand All @@ -81,10 +91,21 @@ func WaitConfirmations(ctx context.Context, confirmer confirmer, confirmations u
return err
}

want := receipt.BlockNumber.Uint64() + confirmations
log.Infof(
"txHash: %v waiting for %v confirmations which will happen in block number: %v, latestBlockNumber: %v",
txHash.Hex(),
confirmations,
want,
latest,
)

if latest < receipt.BlockNumber.Uint64()+confirmations {
continue
}

log.Infof("txHash %v received %v confirmations, done", txHash.Hex(), confirmations)

return nil
}
}
Expand Down

0 comments on commit 03bba59

Please sign in to comment.