diff --git a/ethergo/listener/listener.go b/ethergo/listener/listener.go index 63cef6e48e..25ed10d1d0 100644 --- a/ethergo/listener/listener.go +++ b/ethergo/listener/listener.go @@ -51,6 +51,8 @@ type chainListener struct { newBlockHandler NewBlockHandler finalityMode rpc.BlockNumber blockWait uint64 + // otelRecorder is the recorder for the otel metrics. + otelRecorder iOtelRecorder } var ( @@ -77,6 +79,12 @@ func NewChainListener(omnirpcClient client.EVM, store listenerDB.ChainListenerDB option(c) } + var err error + c.otelRecorder, err = newOtelRecorder(handler, int(c.chainID)) + if err != nil { + return nil, fmt.Errorf("could not create otel recorder: %w", err) + } + return c, nil } @@ -183,6 +191,7 @@ func (c *chainListener) doPoll(parentCtx context.Context, handler HandleLog) (er if err != nil { return fmt.Errorf("could not put latest block: %w", err) } + c.otelRecorder.RecordLastBlock(endBlock) c.startBlock = lastUnconfirmedBlock return nil diff --git a/ethergo/listener/otel.go b/ethergo/listener/otel.go new file mode 100644 index 0000000000..d308b466cd --- /dev/null +++ b/ethergo/listener/otel.go @@ -0,0 +1,118 @@ +package listener + +import ( + "context" + "fmt" + "time" + + "github.com/synapsecns/sanguine/core/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const meterName = "github.com/synapsecns/sanguine/ethergo/listener" + +// generate an interface for otelRecorder that exports the public method. +// this allows us to avoid using recordX externally anad makes the package less confusing. +// +// ============================================================================= +// ============================================================================= +// IMPORTANT: DO NOT REMOVE THIS COMMENT. +// NOTICE: PLEASE MAKE SURE YOU UPDATE BOTH THE DOCS AND THE GRAFANA DASHBOARD (IF NEEDED) AFTER UPDATING METRICS. +// ============================================================================= +// ============================================================================= +// +//go:generate go run github.com/vburenin/ifacemaker -f otel.go -s otelRecorder -i iOtelRecorder -p listener -o otel_generated.go -c "autogenerated file" +type otelRecorder struct { + metrics metrics.Handler + // meter is the metrics meter. + meter metric.Meter + // lastBlockGauge is the gauge for the last block. + lastBlockGauge metric.Int64ObservableGauge + // lastFetchedBlockAgeGauge is the gauge for the last block age. + lastFetchedBlockAgeGauge metric.Float64ObservableGauge + // lastBlock is the last block processed by the listener. + lastBlock *uint64 + // lastBlockFetchTime is the time the last block was fetched (used to calculate last block age). + lastBlockFetchTime *time.Time + // chainID is the chain ID for the listener. + chainID int +} + +func newOtelRecorder(meterHandler metrics.Handler, chainID int) (_ iOtelRecorder, err error) { + or := otelRecorder{ + metrics: meterHandler, + meter: meterHandler.Meter(meterName), + lastBlock: nil, + lastBlockFetchTime: nil, + chainID: chainID, + } + + or.lastBlockGauge, err = or.meter.Int64ObservableGauge("last_block") + if err != nil { + return nil, fmt.Errorf("could not create last block gauge") + } + + or.lastFetchedBlockAgeGauge, err = or.meter.Float64ObservableGauge("last_block_age") + if err != nil { + return nil, fmt.Errorf("could not create last block age gauge") + } + + _, err = or.meter.RegisterCallback(or.recordLastBlock, or.lastBlockGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback for last block gauge") + } + + _, err = or.meter.RegisterCallback(or.recordLastFetchedBlockAge, or.lastFetchedBlockAgeGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback for last block age gauge") + } + + return &or, nil +} + +func (o *otelRecorder) recordLastBlock(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.lastBlockGauge == nil || o.lastBlock == nil { + return nil + } + + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, o.chainID), + ) + observer.ObserveInt64(o.lastBlockGauge, int64(*o.lastBlock), opts) + + return nil +} + +func (o *otelRecorder) recordLastFetchedBlockAge(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.lastFetchedBlockAgeGauge == nil || o.lastBlockFetchTime == nil { + return nil + } + + age := time.Since(*o.lastBlockFetchTime).Seconds() + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, o.chainID), + ) + observer.ObserveFloat64(o.lastFetchedBlockAgeGauge, age, opts) + + return nil +} + +// RecordLastBlock records the last block processed by the listener. +func (o *otelRecorder) RecordLastBlock(lastBlock uint64) { + // verify if the last block has changed + var hasChanged bool + if o.lastBlock == nil { + hasChanged = true + } else { + hasChanged = *o.lastBlock != lastBlock + } + if !hasChanged { + return + } + + // record the block + o.lastBlock = &lastBlock + fetchTime := time.Now() + o.lastBlockFetchTime = &fetchTime +} diff --git a/ethergo/listener/otel_generated.go b/ethergo/listener/otel_generated.go new file mode 100644 index 0000000000..2da743e466 --- /dev/null +++ b/ethergo/listener/otel_generated.go @@ -0,0 +1,9 @@ +// autogenerated file + +package listener + +// iOtelRecorder ... +type iOtelRecorder interface { + // RecordLastBlock records the last block processed by the listener. + RecordLastBlock(lastBlock uint64) +} diff --git a/services/rfq/relayer/reldb/base/quote.go b/services/rfq/relayer/reldb/base/quote.go index c01556a939..8594f9320b 100644 --- a/services/rfq/relayer/reldb/base/quote.go +++ b/services/rfq/relayer/reldb/base/quote.go @@ -93,7 +93,7 @@ func (s Store) UpdateQuoteRequestStatus(ctx context.Context, id [32]byte, status if prevStatus == nil { req, err := s.GetQuoteRequestByID(ctx, id) if err != nil { - return fmt.Errorf("could not get quote: %w", err) + return fmt.Errorf("could not get quote: %w", reldb.ErrNoQuoteForID) } prevStatus = &req.Status } diff --git a/services/rfq/relayer/service/chainindexer.go b/services/rfq/relayer/service/chainindexer.go index b0aee18557..ef5c47c31c 100644 --- a/services/rfq/relayer/service/chainindexer.go +++ b/services/rfq/relayer/service/chainindexer.go @@ -2,7 +2,9 @@ package service import ( "context" + "errors" "fmt" + "strings" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -85,7 +87,7 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error) // it wasn't me if event.Relayer != r.signer.Address() { //nolint: wrapcheck - return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost, nil) + return r.setRelayRaceLost(ctx, event.TransactionId) } err = r.handleRelayLog(ctx, event) @@ -99,7 +101,7 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error) // it wasn't me if event.Relayer != r.signer.Address() { //nolint: wrapcheck - return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost, nil) + return r.setRelayRaceLost(ctx, event.TransactionId) } err = r.handleProofProvided(ctx, event) @@ -113,7 +115,7 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error) // it wasn't me if event.Relayer != r.signer.Address() { //nolint: wrapcheck - return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost, nil) + return r.setRelayRaceLost(ctx, event.TransactionId) } err = r.handleDepositClaimed(ctx, event) @@ -212,3 +214,15 @@ func (r *Relayer) handleDepositClaimed(ctx context.Context, event *fastbridge.Fa } return nil } + +func (r *Relayer) setRelayRaceLost(ctx context.Context, transactionID [32]byte) error { + err := r.db.UpdateQuoteRequestStatus(ctx, transactionID, reldb.RelayRaceLost, nil) + // quote does not exist, no need to update status + if err != nil && (errors.Is(err, reldb.ErrNoQuoteForID) || strings.Contains(err.Error(), reldb.ErrNoQuoteForID.Error())) { + return nil + } + if err != nil { + return fmt.Errorf("could not set relay race lost: %w", err) + } + return nil +} diff --git a/services/rfq/relayer/service/relayer.go b/services/rfq/relayer/service/relayer.go index 22d0b3a91b..4a7ffa3569 100644 --- a/services/rfq/relayer/service/relayer.go +++ b/services/rfq/relayer/service/relayer.go @@ -387,13 +387,7 @@ func (r *Relayer) processDB(ctx context.Context, serial bool, matchStatuses ...r } else { // process in parallel (new goroutine) request := req // capture func literal - ok := r.semaphore.TryAcquire(1) - if !ok { - span.AddEvent("could not acquire semaphore", trace.WithAttributes( - attribute.String("transaction_id", hexutil.Encode(request.TransactionID[:])), - )) - continue - } + err = r.semaphore.Acquire(ctx, 1) if err != nil { return fmt.Errorf("could not acquire semaphore: %w", err) }