Skip to content

Commit

Permalink
RFQ: add prometheus metrics (#2503)
Browse files Browse the repository at this point in the history
* Feat: add GetTokenMetadata() getter

* Feat: add prom metrics for quote generation

* Fix: mocks

* Feat: add registerBalance() call upon refreshing balances

* Refactor: HasPendingRebalance -> GetPendingRebalances

* Feat: add OriginTokenAddr to Rebalance models

* Feat: add registerPendingRebalance() helper

* Cleanup: lint

* [goreleaser]

* Cleanup: lint

* [goreleaser] wrap http

* Feat: re-use meter

* [goreleaser]

* [goreleaser]

* Fix: re-use hists

* [goreleaser]

* Fix: re-use hist for quoter

* [goreleaser]

* Fix: remove lock constraint from registerBalance()

* [goreleaser]

* Feat: use token name instead of addr as label

* [goreleaser]

* Feat: log raw_balance

* [goreleaser]

* Feat: add relayer to metrics

* [goreleaser]

* Fix: balance registration

* [goreleaser]

* Submitter: add prometheus metrics (#2505)

* Feat: add registerNumPendingTxes in submitter

* Feat: add registerBumpTx()

* Feat: add registerCurrentNonce()

* Cleanup: lint

* [goreleaser]

* Feat: use hists instead of callbacks

* [goreleaser]

* Feat: simplify bump metric

* [goreleaser]

* Feat: add wallet attribute

* Feat: remove global vars for metrics

* [goreleaser]

* [goreleaser]

* Cleanup: lint

* [goreleaser]

* Fix: tests

* [goreleaser]

* Cleanup: lint

---------

Co-authored-by: Trajan0x <[email protected]>
  • Loading branch information
dwasse and trajan0x authored Apr 30, 2024
1 parent 42da9ee commit bfd72d6
Show file tree
Hide file tree
Showing 12 changed files with 326 additions and 85 deletions.
69 changes: 69 additions & 0 deletions ethergo/submitter/chain_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/synapsecns/sanguine/ethergo/client"
"github.com/synapsecns/sanguine/ethergo/submitter/db"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -65,6 +66,10 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID *
return fmt.Errorf("could not get nonce: %w", err)
}
span.SetAttributes(attribute.Int("nonce", int(currentNonce)))
registerErr := t.registerCurrentNonce(ctx, currentNonce, int(chainID.Int64()))
if registerErr != nil {
span.AddEvent("could not register nonce", trace.WithAttributes(attribute.String("error", registerErr.Error())))
}

g, gCtx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -110,6 +115,34 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID *

cq.storeAndSubmit(ctx, calls, span)

registerErr = cq.registerNumPendingTXes(ctx, len(cq.reprocessQueue), int(chainID.Int64()))
if registerErr != nil {
span.AddEvent("could not register pending txes", trace.WithAttributes(attribute.String("error", registerErr.Error())))
}

return nil
}

var meter metric.Meter

func getMeter(handler metrics.Handler) metric.Meter {
if meter == nil {
meter = handler.Meter(meterName)
}
return meter
}

func (t *txSubmitterImpl) registerCurrentNonce(ctx context.Context, nonce uint64, chainID int) (err error) {
meter := getMeter(t.metrics)
nonceHist, err := meter.Int64Histogram("current_nonce")
if err != nil {
return fmt.Errorf("error creating nonce histogram: %w", err)
}
attributes := attribute.NewSet(
attribute.Int(metrics.ChainID, chainID),
attribute.String("wallet", t.signer.Address().Hex()),
)
nonceHist.Record(ctx, int64(nonce), metric.WithAttributeSet(attributes))
return nil
}

Expand Down Expand Up @@ -148,6 +181,22 @@ func (c *chainQueue) storeAndSubmit(ctx context.Context, calls []w3types.Caller,
wg.Wait()
}

const meterName = "github.com/synapsecns/sanguine/ethergo/submitter"

func (c *chainQueue) registerNumPendingTXes(ctx context.Context, num, chainID int) (err error) {
meter := getMeter(c.metrics)
numPendingHist, err := meter.Int64Histogram("num_pending_txes")
if err != nil {
return fmt.Errorf("error creating num pending txes histogram: %w", err)
}
attributes := attribute.NewSet(
attribute.Int(metrics.ChainID, chainID),
attribute.String("wallet", c.signer.Address().Hex()),
)
numPendingHist.Record(ctx, int64(num), metric.WithAttributeSet(attributes))
return nil
}

// nolint: cyclop
func (c *chainQueue) bumpTX(parentCtx context.Context, ogTx db.TX) {
c.g.Go(func() (err error) {
Expand Down Expand Up @@ -225,6 +274,11 @@ func (c *chainQueue) bumpTX(parentCtx context.Context, ogTx db.TX) {
Status: db.Stored,
})

registerErr := c.registerBumpTx(ctx, tx)
if registerErr != nil {
span.AddEvent("could not register bump tx", trace.WithAttributes(attribute.String("error", registerErr.Error())))
}

return nil
})
}
Expand All @@ -245,6 +299,21 @@ func (c *chainQueue) isBumpIntervalElapsed(tx db.TX) bool {
return elapsedSeconds >= 0
}

func (c *chainQueue) registerBumpTx(ctx context.Context, tx *types.Transaction) (err error) {
meter := getMeter(c.metrics)
bumpCountGauge, err := meter.Int64Counter("bump_count")
if err != nil {
return fmt.Errorf("error creating bump count gauge: %w", err)
}
attributes := attribute.NewSet(
attribute.Int64(metrics.ChainID, tx.ChainId().Int64()),
attribute.Int64(metrics.Nonce, int64(tx.Nonce())),
attribute.String("wallet", c.signer.Address().Hex()),
)
bumpCountGauge.Add(ctx, 1, metric.WithAttributeSet(attributes))
return nil
}

// updateOldTxStatuses updates the status of txes that are before the current nonce
// this will only run if we have txes that have confirmed.
func (c *chainQueue) updateOldTxStatuses(parentCtx context.Context) {
Expand Down
8 changes: 4 additions & 4 deletions services/rfq/e2e/rfq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ func (i *IntegrationSuite) TestUSDCtoUSDC() {

// check to see if there is a pending rebalance from the destination back to origin
// TODO: validate more of the rebalance- expose in db interface just for testing?
destPending, err := i.store.HasPendingRebalance(i.GetTestContext(), uint64(i.destBackend.GetChainID()))
destPendingRebals, err := i.store.GetPendingRebalances(i.GetTestContext(), uint64(i.destBackend.GetChainID()))
i.NoError(err)
if !destPending {
if len(destPendingRebals) == 0 {
return false
}
originPending, err := i.store.HasPendingRebalance(i.GetTestContext(), uint64(i.originBackend.GetChainID()))
originPendingRebals, err := i.store.GetPendingRebalances(i.GetTestContext(), uint64(i.originBackend.GetChainID()))
i.NoError(err)
return originPending
return len(originPendingRebals) > 0
})
}

Expand Down
9 changes: 5 additions & 4 deletions services/rfq/relayer/inventory/circle.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,11 @@ func (c *rebalanceManagerCircleCCTP) handleDepositForBurn(ctx context.Context, l
)
origin := uint64(chainID)
rebalanceModel := reldb.Rebalance{
RebalanceID: &requestID,
Origin: origin,
OriginTxHash: log.TxHash,
Status: reldb.RebalancePending,
RebalanceID: &requestID,
Origin: origin,
OriginTxHash: log.TxHash,
OriginTokenAddr: event.BurnToken,
Status: reldb.RebalancePending,
}
err = c.db.UpdateRebalance(ctx, rebalanceModel, true)
if err != nil {
Expand Down
130 changes: 103 additions & 27 deletions services/rfq/relayer/inventory/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Manager interface {
// Rebalance checks whether a given token should be rebalanced, and
// executes the rebalance if necessary.
Rebalance(ctx context.Context, chainID int, token common.Address) error
// GetTokenMetadata gets the metadata for a token.
GetTokenMetadata(chainID int, token common.Address) (*TokenMetadata, error)
}

type inventoryManagerImpl struct {
Expand All @@ -72,6 +74,12 @@ type inventoryManagerImpl struct {
rebalanceManagers map[relconfig.RebalanceMethod]RebalanceManager
// db is the database
db reldb.Service
// meter is the metrics meter for this package
meter metric.Meter
// balanceHist is the histogram for balance
balanceHist metric.Float64Histogram
// pendingHist is the histogram for pending rebalances
pendingHist metric.Float64Histogram
}

// GetCommittableBalance gets the committable balances.
Expand Down Expand Up @@ -160,6 +168,7 @@ var (

// TODO: replace w/ config.
const defaultPollPeriod = 5
const meterName = "github.com/synapsecns/sanguine/services/rfq/relayer/inventory"

// NewInventoryManager creates a new inventory manager.
// TODO: too many args here.
Expand All @@ -183,6 +192,16 @@ func NewInventoryManager(ctx context.Context, clientFetcher submitter.ClientFetc
}
}

meter := handler.Meter(meterName)
balanceHist, err := meter.Float64Histogram("inventory_balance")
if err != nil {
return nil, fmt.Errorf("could not create balance histogram: %w", err)
}
pendingHist, err := meter.Float64Histogram("pending_rebalance_amount")
if err != nil {
return nil, fmt.Errorf("could not create pending rebalance histogram: %w", err)
}

i := inventoryManagerImpl{
relayerAddress: relayer,
handler: handler,
Expand All @@ -191,6 +210,9 @@ func NewInventoryManager(ctx context.Context, clientFetcher submitter.ClientFetc
txSubmitter: txSubmitter,
rebalanceManagers: rebalanceManagers,
db: db,
meter: meter,
balanceHist: balanceHist,
pendingHist: pendingHist,
}

err = i.initializeTokens(ctx, cfg)
Expand Down Expand Up @@ -401,6 +423,8 @@ func (i *inventoryManagerImpl) HasSufficientGas(parentCtx context.Context, chain
// Rebalance checks whether a given token should be rebalanced, and executes the rebalance if necessary.
// Note that if there are multiple tokens whose balance is below the maintenance balance, only the lowest balance
// will be rebalanced.
//
//nolint:cyclop
func (i *inventoryManagerImpl) Rebalance(parentCtx context.Context, chainID int, token common.Address) error {
// evaluate the rebalance method
method, err := i.cfg.GetRebalanceMethod(chainID, token.Hex())
Expand Down Expand Up @@ -434,14 +458,21 @@ func (i *inventoryManagerImpl) Rebalance(parentCtx context.Context, chainID int,
)

// make sure there are no pending rebalances that touch the given path
pending, err := i.db.HasPendingRebalance(ctx, uint64(rebalance.OriginMetadata.ChainID), uint64(rebalance.DestMetadata.ChainID))
pendingRebalances, err := i.db.GetPendingRebalances(ctx, uint64(rebalance.OriginMetadata.ChainID), uint64(rebalance.DestMetadata.ChainID))
if err != nil {
return fmt.Errorf("could not check pending rebalance: %w", err)
}
pending := len(pendingRebalances) > 0
span.SetAttributes(attribute.Bool("rebalance_pending", pending))
if pending {
return nil
}
for _, pendingReb := range pendingRebalances {
registerErr := i.registerPendingRebalance(ctx, pendingReb)
if registerErr != nil {
span.AddEvent("could not register pending rebalance", trace.WithAttributes(attribute.String("error", registerErr.Error())))
}
}

// execute the rebalance
manager, ok := i.rebalanceManagers[method]
Expand All @@ -455,6 +486,26 @@ func (i *inventoryManagerImpl) Rebalance(parentCtx context.Context, chainID int,
return nil
}

// registerPendingRebalance registers a callback to update the pending rebalance amount gauge.
func (i *inventoryManagerImpl) registerPendingRebalance(ctx context.Context, rebalance *reldb.Rebalance) (err error) {
if rebalance == nil || i.meter == nil || i.pendingHist == nil {
return nil
}

attributes := attribute.NewSet(
attribute.Int(metrics.Origin, int(rebalance.Origin)),
attribute.Int(metrics.Destination, int(rebalance.Destination)),
attribute.String("status", rebalance.Status.String()),
attribute.String("relayer", i.relayerAddress.Hex()),
)
tokenMetadata, err := i.GetTokenMetadata(int(rebalance.Origin), rebalance.OriginTokenAddr)
if err != nil {
return fmt.Errorf("could not get token metadata: %w", err)
}
i.pendingHist.Record(ctx, core.BigToDecimals(rebalance.OriginAmount, tokenMetadata.Decimals), metric.WithAttributeSet(attributes))
return nil
}

//nolint:cyclop,gocognit
func getRebalance(span trace.Span, cfg relconfig.Config, tokens map[int]map[common.Address]*TokenMetadata, chainID int, token common.Address) (rebalance *RebalanceData, err error) {
maintenancePct, err := cfg.GetMaintenanceBalancePct(chainID, token.Hex())
Expand Down Expand Up @@ -555,6 +606,16 @@ func getRebalance(span trace.Span, cfg relconfig.Config, tokens map[int]map[comm
return rebalance, nil
}

func (i *inventoryManagerImpl) GetTokenMetadata(chainID int, token common.Address) (*TokenMetadata, error) {
i.mux.RLock()
defer i.mux.RUnlock()
tokenData, ok := i.tokens[chainID][token]
if !ok {
return nil, fmt.Errorf("token not found")
}
return tokenData, nil
}

// initializeTokens converts the configuration into a data structure we can use to determine inventory
// it gets metadata like name, decimals, etc once and exports these to prometheus for ease of debugging.
func (i *inventoryManagerImpl) initializeTokens(parentCtx context.Context, cfg relconfig.Config) (err error) {
Expand All @@ -569,8 +630,6 @@ func (i *inventoryManagerImpl) initializeTokens(parentCtx context.Context, cfg r
metrics.EndSpanWithErr(span, err)
}(err)

meter := i.handler.Meter("github.com/synapsecns/sanguine/services/rfq/relayer/inventory")

// TODO: this needs to be a struct bound variable otherwise will be stuck.
i.tokens = make(map[int]map[common.Address]*TokenMetadata)
i.gasBalances = make(map[int]*big.Int)
Expand Down Expand Up @@ -650,8 +709,8 @@ func (i *inventoryManagerImpl) initializeTokens(parentCtx context.Context, cfg r

chainID := chainID // capture func literal
deferredRegisters = append(deferredRegisters, func() error {
//nolint:wrapcheck
return i.registerMetric(meter, chainID, token)
//nolint:wrapcheck,scopelint
return i.registerBalance(ctx, chainID, token)
})
}
}
Expand Down Expand Up @@ -705,6 +764,11 @@ func (i *inventoryManagerImpl) refreshBalances(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(len(i.tokens))

type registerCall func() error
// TODO: this can be pre-capped w/ len(cfg.Tokens) for each chain id.
// here we register metrics for exporting through otel. We wait to call these functions until are tokens have been initialized to avoid nil issues.
var deferredRegisters []registerCall

for chainID, tokenMap := range i.tokens {
chainClient, err := i.chainClient.GetClient(ctx, big.NewInt(int64(chainID)))
if err != nil {
Expand All @@ -715,12 +779,20 @@ func (i *inventoryManagerImpl) refreshBalances(ctx context.Context) error {
deferredCalls := []w3types.Caller{
eth.Balance(i.relayerAddress, nil).Returns(i.gasBalances[chainID]),
}
deferredRegisters = append(deferredRegisters, func() error {
//nolint:wrapcheck,scopelint
return i.registerBalance(ctx, chainID, chain.EthAddress)
})

// queue token balance fetches
for tokenAddress, token := range tokenMap {
// TODO: make sure Returns does nothing on error
if !token.IsGasToken {
deferredCalls = append(deferredCalls, eth.CallFunc(funcBalanceOf, tokenAddress, i.relayerAddress).Returns(token.Balance))
deferredRegisters = append(deferredRegisters, func() error {
//nolint:wrapcheck,scopelint
return i.registerBalance(ctx, chainID, tokenAddress)
})
}
}

Expand All @@ -734,35 +806,39 @@ func (i *inventoryManagerImpl) refreshBalances(ctx context.Context) error {
}()
}
wg.Wait()
return nil
}

func (i *inventoryManagerImpl) registerMetric(meter metric.Meter, chainID int, token common.Address) error {
balanceGauge, err := meter.Float64ObservableGauge("inventory_balance")
if err != nil {
return fmt.Errorf("could not create gauge: %w", err)
for _, register := range deferredRegisters {
err := register()
if err != nil {
logger.Warnf("could not register func: %v", err)
}
}

if _, err := meter.RegisterCallback(func(ctx context.Context, observer metric.Observer) error {
i.mux.RLock()
defer i.mux.RUnlock()
return nil
}

// TODO: make sure this doesn't get called until we're done
tokenData, ok := i.tokens[chainID][token]
if !ok {
return fmt.Errorf("could not find token in chainTokens for chainID: %d, token: %s", chainID, token)
}
func (i *inventoryManagerImpl) registerBalance(ctx context.Context, chainID int, token common.Address) (err error) {
if i.meter == nil || i.balanceHist == nil {
return nil
}

attributes := attribute.NewSet(attribute.Int(metrics.ChainID, chainID), attribute.String("relayer_address", i.relayerAddress.String()),
attribute.String("token_name", tokenData.Name), attribute.Int("decimals", int(tokenData.Decimals)),
attribute.String("token_address", token.String()))
// TODO: make sure this doesn't get called until we're done
tokenData, ok := i.tokens[chainID][token]
if !ok {
return fmt.Errorf("could not find token in chainTokens for chainID: %d, token: %s", chainID, token)
}

observer.ObserveFloat64(balanceGauge, core.BigToDecimals(tokenData.Balance, tokenData.Decimals), metric.WithAttributeSet(attributes))
attributes := attribute.NewSet(
attribute.Int(metrics.ChainID, chainID),
attribute.String("relayer_address", i.relayerAddress.String()),
attribute.String("token_name", tokenData.Name),
attribute.Int("decimals", int(tokenData.Decimals)),
attribute.String("token_address", token.String()),
attribute.String("raw_balance", tokenData.Balance.String()),
attribute.String("relayer", i.relayerAddress.Hex()),
)

return nil
}, balanceGauge); err != nil {
return fmt.Errorf("could not register callback: %w", err)
}
i.balanceHist.Record(ctx, core.BigToDecimals(tokenData.Balance, tokenData.Decimals), metric.WithAttributeSet(attributes))
return nil
}

Expand Down
Loading

0 comments on commit bfd72d6

Please sign in to comment.