From e5cd90821a3f7e17eeb60f115128ade9d5165aa0 Mon Sep 17 00:00:00 2001 From: vro <168573323+golangisfun123@users.noreply.github.com> Date: Sun, 28 Jul 2024 11:25:17 -0500 Subject: [PATCH] refactor(promexporter): use `otelRecorder` observer pattern (#2871) --- contrib/promexporter/config.yml | 0 contrib/promexporter/exporters/bridge.go | 269 +++------------ contrib/promexporter/exporters/dfk.go | 38 +++ contrib/promexporter/exporters/exporter.go | 189 +++-------- contrib/promexporter/exporters/otel.go | 312 ++++++++++++++++++ .../promexporter/exporters/otel_generated.go | 16 + contrib/promexporter/exporters/submitter.go | 58 ++++ contrib/promexporter/exporters/util.go | 135 ++++++++ contrib/promexporter/go.mod | 3 +- .../internal/decoders/gettokenbyid.go | 5 +- 10 files changed, 655 insertions(+), 370 deletions(-) create mode 100644 contrib/promexporter/config.yml create mode 100644 contrib/promexporter/exporters/dfk.go create mode 100644 contrib/promexporter/exporters/otel.go create mode 100644 contrib/promexporter/exporters/otel_generated.go create mode 100644 contrib/promexporter/exporters/submitter.go create mode 100644 contrib/promexporter/exporters/util.go diff --git a/contrib/promexporter/config.yml b/contrib/promexporter/config.yml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contrib/promexporter/exporters/bridge.go b/contrib/promexporter/exporters/bridge.go index 36ad5572ee..3fd4e0cc61 100644 --- a/contrib/promexporter/exporters/bridge.go +++ b/contrib/promexporter/exporters/bridge.go @@ -4,26 +4,22 @@ import ( "context" "errors" "fmt" + "math/big" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/params" "github.com/lmittmann/w3/module/eth" "github.com/lmittmann/w3/w3types" "github.com/synapsecns/sanguine/contrib/promexporter/internal/decoders" "github.com/synapsecns/sanguine/core" - "github.com/synapsecns/sanguine/core/metrics" - ethergoClient "github.com/synapsecns/sanguine/ethergo/client" "github.com/synapsecns/sanguine/services/explorer/contracts/bridge" "github.com/synapsecns/sanguine/services/explorer/contracts/bridgeconfig" "github.com/synapsecns/sanguine/services/explorer/contracts/swap" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/trace" - "golang.org/x/sync/errgroup" - "math/big" - "time" ) func (e *exporter) getBridgeConfig(ctx context.Context) (*bridgeconfig.BridgeConfigRef, error) { + // client, err := e.omnirpcClient.GetClient(ctx, big.NewInt(int64(e.cfg.BridgeConfig.ChainID))) client, err := e.omnirpcClient.GetConfirmationsClient(ctx, e.cfg.BridgeConfig.ChainID, 1) if err != nil { return nil, fmt.Errorf("could not get confirmations client: %w", err) @@ -40,14 +36,6 @@ func (e *exporter) getBridgeConfig(ctx context.Context) (*bridgeconfig.BridgeCon // Will be a lot faster w/: https://github.com/open-telemetry/opentelemetry-go/issues/3034 // nolint: cyclop func (e *exporter) vpriceStats(ctx context.Context, chainID int, tokenID string) error { - meter := e.metrics.Meter(meterName) - vpriceMetric, err := meter.Float64ObservableGauge("vpriceMetric") - if err != nil { - return fmt.Errorf("could not create gauge: %w", err) - } - - attributes := attribute.NewSet(attribute.Int(metrics.ChainID, chainID), attribute.String("tokenID", tokenID)) - client, err := e.omnirpcClient.GetConfirmationsClient(ctx, chainID, 1) if err != nil { return fmt.Errorf("could not get confirmations client: %w", err) @@ -63,7 +51,11 @@ func (e *exporter) vpriceStats(ctx context.Context, chainID int, tokenID string) return fmt.Errorf("could not get token: %w", err) } - poolConfig, err := bridgeConfig.GetPoolConfig(&bind.CallOpts{Context: ctx}, common.HexToAddress(token.TokenAddress), big.NewInt(int64(chainID))) + poolConfig, err := bridgeConfig.GetPoolConfig( + &bind.CallOpts{Context: ctx}, + common.HexToAddress(token.TokenAddress), + big.NewInt(int64(chainID)), + ) if err != nil { return errPoolNotExist } @@ -77,7 +69,6 @@ func (e *exporter) vpriceStats(ctx context.Context, chainID int, tokenID string) if err != nil { return fmt.Errorf("could not get tokenID contract: %w", err) } - decimals, err := tokenContract.Decimals(&bind.CallOpts{Context: ctx}) if err != nil { return fmt.Errorf("could not get decimals: %w", err) @@ -88,248 +79,64 @@ func (e *exporter) vpriceStats(ctx context.Context, chainID int, tokenID string) return fmt.Errorf("could not get iswap contract: %w", err) } - if _, err := meter.RegisterCallback(func(parentCtx context.Context, o metric.Observer) (err error) { - ctx, span := e.metrics.Tracer().Start(parentCtx, "vprice_stats", trace.WithAttributes( - attribute.Int(metrics.ChainID, chainID), attribute.String("tokenID", tokenID), - )) - - defer func() { - metrics.EndSpanWithErr(span, err) - }() - - ctx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - - realvPrice, err := iswap.GetVirtualPrice(&bind.CallOpts{Context: ctx}) - if err != nil { - return fmt.Errorf("could not get virtual price: %w", err) - } - - // Use floatVPrice as required - o.ObserveFloat64(vpriceMetric, core.BigToDecimals(realvPrice, decimals), metric.WithAttributeSet(attributes)) - - return nil - }, vpriceMetric); err != nil { - return fmt.Errorf("registering callback on instruments: %w", err) + realvPrice, err := iswap.GetVirtualPrice(&bind.CallOpts{Context: ctx}) + if err != nil { + return fmt.Errorf("could not get virtual price: %w", err) } + e.otelRecorder.RecordVPrice(chainID, core.BigToDecimals(realvPrice, decimals)) + return nil } var errPoolNotExist = errors.New("pool does not exist") // nolint: cyclop -func (e *exporter) getTokenBalances(ctx context.Context) error { +func (e *exporter) getTokenBalancesStats(ctx context.Context) error { allTokens, err := e.getAllTokens(ctx) + if err != nil { return fmt.Errorf("could not get all tokens: %w", err) } for chainID, bridgeContract := range e.cfg.BridgeChecks { - chainID := chainID - bridgeContract := bridgeContract // capture func literals - meter := e.metrics.Meter(meterName) - - bridgeBalanceMetric, err := meter.Float64ObservableGauge("bridgeBalanceMetric") - if err != nil { - return fmt.Errorf("could not create gauge: %w", err) - } - - feeBalanceMetric, err := meter.Float64ObservableCounter("feeBalance") - if err != nil { - return fmt.Errorf("could not create counter: %w", err) - } - - totalSupplyMetric, err := meter.Float64ObservableGauge("totalSupply") + client, err := e.omnirpcClient.GetConfirmationsClient(ctx, chainID, 1) if err != nil { - return fmt.Errorf("could not create gauge: %w", err) + return fmt.Errorf("could not get confirmations client: %w", err) } - gasBalanceMetric, err := meter.Float64ObservableGauge("gasBalance") - if err != nil { - return fmt.Errorf("could not create gauge: %w", err) + var realGasBalance big.Int + calls := []w3types.Caller{ + eth.Balance(common.HexToAddress(bridgeContract), nil).Returns(&realGasBalance), } - if _, err := meter.RegisterCallback(func(parentCtx context.Context, o metric.Observer) (err error) { - ctx, span := e.metrics.Tracer().Start(parentCtx, "tokenbalances", trace.WithAttributes( - attribute.Int(metrics.ChainID, chainID), - )) - - defer func() { - metrics.EndSpanWithErr(span, err) - }() + allTokenData := make([]tokenData, len(allTokens.GetForChainID(chainID))) - client, err := e.omnirpcClient.GetConfirmationsClient(ctx, chainID, 1) - if err != nil { - return fmt.Errorf("could not get confirmations client: %w", err) + for i, tokenConfig := range allTokens.GetForChainID(chainID) { + // initialize empty struct + allTokenData[i] = tokenData{ + metadata: tokenConfig, + contractBalance: new(big.Int), + totalSuppply: new(big.Int), + feeBalance: new(big.Int), } - var realGasBalance big.Int - calls := []w3types.Caller{ - eth.Balance(common.HexToAddress(bridgeContract), nil).Returns(&realGasBalance), - } - - type tokenData struct { - metadata TokenConfig - contractBalance *big.Int - totalSuppply *big.Int - feeBalance *big.Int - } - - allTokenData := make([]tokenData, len(allTokens.GetForChainID(chainID))) - - for i, tokenConfig := range allTokens.GetForChainID(chainID) { - // initialize empty struct - allTokenData[i] = tokenData{ - metadata: tokenConfig, - contractBalance: new(big.Int), - totalSuppply: new(big.Int), - feeBalance: new(big.Int), - } + calls = append(calls, + eth.CallFunc(decoders.FuncBalanceOf(), tokenConfig.TokenAddress, common.HexToAddress(bridgeContract)).Returns(allTokenData[i].contractBalance), + eth.CallFunc(decoders.FuncTotalSupply(), tokenConfig.TokenAddress).Returns(allTokenData[i].totalSuppply), + eth.CallFunc(decoders.FuncFeeBalance(), common.HexToAddress(bridgeContract), tokenConfig.TokenAddress).Returns(allTokenData[i].feeBalance), + ) - calls = append(calls, - eth.CallFunc(decoders.FuncBalanceOf(), tokenConfig.TokenAddress, common.HexToAddress(bridgeContract)).Returns(allTokenData[i].contractBalance), - eth.CallFunc(decoders.FuncTotalSupply(), tokenConfig.TokenAddress).Returns(allTokenData[i].totalSuppply), - eth.CallFunc(decoders.FuncFeeBalance(), common.HexToAddress(bridgeContract), tokenConfig.TokenAddress).Returns(allTokenData[i].feeBalance), - ) - } - - err = e.batchCalls(ctx, client, calls) - if err != nil { - return fmt.Errorf("could not get token balances: %w", err) - } - - // eth is always 18 decimals - o.ObserveFloat64(gasBalanceMetric, core.BigToDecimals(&realGasBalance, 18), metric.WithAttributes(attribute.Int(metrics.ChainID, chainID))) - - for _, td := range allTokenData { - tokenAttributes := attribute.NewSet(attribute.String("tokenID", td.metadata.TokenID), attribute.Int(metrics.ChainID, td.metadata.ChainID)) - o.ObserveFloat64(bridgeBalanceMetric, core.BigToDecimals(td.contractBalance, td.metadata.TokenDecimals), metric.WithAttributeSet(tokenAttributes)) - o.ObserveFloat64(feeBalanceMetric, core.BigToDecimals(td.feeBalance, td.metadata.TokenDecimals), metric.WithAttributeSet(tokenAttributes)) - o.ObserveFloat64(totalSupplyMetric, core.BigToDecimals(td.totalSuppply, td.metadata.TokenDecimals), metric.WithAttributeSet(tokenAttributes)) - } - - return nil - }, bridgeBalanceMetric, feeBalanceMetric, totalSupplyMetric, gasBalanceMetric); err != nil { - return fmt.Errorf("could not register") } - } + _ = e.batchCalls(ctx, client, calls) - return nil -} + e.otelRecorder.RecordBridgeGasBalance(chainID, core.BigToDecimals(&realGasBalance, 18)*params.Ether) -func (e *exporter) getAllTokens(parentCtx context.Context) (allTokens Tokens, err error) { - allTokens = []TokenConfig{} - - ctx, span := e.metrics.Tracer().Start(parentCtx, "get_all_tokens") - - defer func() { - metrics.EndSpanWithErr(span, err) - }() - - bridgeConfig, err := e.getBridgeConfig(ctx) - if err != nil { - return nil, fmt.Errorf("could not get bridge config: %w", err) - } - - // TODO: multicall is preferable here, but I ain't got time for that - tokenIDs, err := bridgeConfig.GetAllTokenIDs(&bind.CallOpts{Context: ctx}) - if err != nil { - return nil, fmt.Errorf("could not get all token ids: %w", err) - } - - bridgeConfigClient, err := e.omnirpcClient.GetConfirmationsClient(ctx, e.cfg.BridgeConfig.ChainID, 1) - if err != nil { - return nil, fmt.Errorf("could not get confirmations client: %w", err) - } - - bridgeTokens := make([]*bridgeconfig.BridgeConfigV3Token, len(tokenIDs)*len(e.cfg.BridgeChecks)) - tokenIDS := make([]string, len(tokenIDs)*len(e.cfg.BridgeChecks)) - - var calls []w3types.Caller - - i := 0 - for _, tokenID := range tokenIDs { - for chainID := range e.cfg.BridgeChecks { - token := &bridgeconfig.BridgeConfigV3Token{} - calls = append(calls, eth.CallFunc(decoders.TokenConfigGetToken(), bridgeConfig.Address(), tokenID, big.NewInt(int64(chainID))).Returns(token)) - bridgeTokens[i] = token - tokenIDS[i] = tokenID - i++ + for _, td := range allTokenData { + e.otelRecorder.RecordTokenBalance(chainID, td) } - } - // TODO: once go 1.21 is introduced do min(cfg.BatchCallLimit, 2) - err = e.batchCalls(ctx, bridgeConfigClient, calls) - if err != nil { - return nil, fmt.Errorf("could not get token balances: %w", err) - } - - for i, token := range bridgeTokens { - tokenID := tokenIDS[i] - - if token.TokenAddress == "" { - continue - } - - allTokens = append(allTokens, TokenConfig{ - TokenID: tokenID, - ChainID: int(token.ChainId.Int64()), - TokenAddress: common.HexToAddress(token.TokenAddress), - TokenDecimals: token.TokenDecimals, - HasUnderlying: token.HasUnderlying, - IsUnderlying: token.IsUnderlying, - }) - } - - return allTokens, nil -} - -func (e *exporter) batchCalls(ctx context.Context, evmClient ethergoClient.EVM, calls []w3types.Caller) (err error) { - tasks := core.ChunkSlice(calls, e.cfg.BatchCallLimit) - - g, ctx := errgroup.WithContext(ctx) - for _, task := range tasks { - task := task // capture func literal - g.Go(func() error { - err = evmClient.BatchWithContext(ctx, task...) - if err != nil { - return fmt.Errorf("could not batch calls: %w", err) - } - - return nil - }) - } - - err = g.Wait() - if err != nil { - return fmt.Errorf("could not get token balances: %w", err) } return nil } - -// Tokens is a list of token configs. -type Tokens []TokenConfig - -// GetForChainID returns all tokens for a given chainID. -func (t Tokens) GetForChainID(chainID int) Tokens { - var chainTokens []TokenConfig - for _, token := range t { - if token.ChainID == chainID { - chainTokens = append(chainTokens, token) - } - } - - return chainTokens -} - -// TokenConfig is a cleaned up token config. -type TokenConfig struct { - TokenID string - ChainID int - TokenAddress common.Address - TokenDecimals uint8 - HasUnderlying bool - IsUnderlying bool -} diff --git a/contrib/promexporter/exporters/dfk.go b/contrib/promexporter/exporters/dfk.go new file mode 100644 index 0000000000..8345e96a01 --- /dev/null +++ b/contrib/promexporter/exporters/dfk.go @@ -0,0 +1,38 @@ +package exporters + +import ( + "context" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/synapsecns/sanguine/contrib/promexporter/internal/gql/dfk" + "github.com/synapsecns/sanguine/core" + "github.com/synapsecns/sanguine/core/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +func (e *exporter) stuckHeroCountStats(parentCtx context.Context, owner common.Address, chainName string) (err error) { + ctx, span := e.metrics.Tracer().Start(parentCtx, "dfk_stats", trace.WithAttributes( + attribute.String("chain_name", chainName), + )) + + defer func() { + metrics.EndSpanWithErr(span, err) + }() + + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + dfkClient := dfk.NewClient(e.client, e.cfg.DFKUrl) + + stuckHeroes, err := dfkClient.StuckHeroes(ctx, core.PtrTo[int64](0), core.PtrTo(owner.String())) + if err != nil { + return fmt.Errorf("could not get stuck hero count: %w", err) + } + + e.otelRecorder.RecordStuckHeroCount(int64(len(stuckHeroes.Heroes)), chainName) + + return nil +} diff --git a/contrib/promexporter/exporters/exporter.go b/contrib/promexporter/exporters/exporter.go index 5255d3da27..01010bc52e 100644 --- a/contrib/promexporter/exporters/exporter.go +++ b/contrib/promexporter/exporters/exporter.go @@ -5,28 +5,21 @@ import ( "context" "errors" "fmt" + "net" + "net/http" + "os" + "time" + "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/params" "github.com/gin-gonic/gin" "github.com/ipfs/go-log" - "github.com/lmittmann/w3/module/eth" "github.com/synapsecns/sanguine/contrib/promexporter/config" - "github.com/synapsecns/sanguine/contrib/promexporter/internal/gql/dfk" - "github.com/synapsecns/sanguine/core" "github.com/synapsecns/sanguine/core/ginhelper" "github.com/synapsecns/sanguine/core/metrics" "github.com/synapsecns/sanguine/core/metrics/instrumentation/httpcapture" "github.com/synapsecns/sanguine/core/retry" omnirpcClient "github.com/synapsecns/sanguine/services/omnirpc/client" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" - "math/big" - "net" - "net/http" - "os" - "time" ) var logger = log.Logger("proxy-logger") @@ -51,6 +44,8 @@ type exporter struct { metrics metrics.Handler cfg config.Config omnirpcClient omnirpcClient.RPCClient + + otelRecorder iOtelRecorder } // StartExporterServer starts the exporter server. @@ -88,52 +83,16 @@ func StartExporterServer(ctx context.Context, handler metrics.Handler, cfg confi metrics: handler, cfg: cfg, omnirpcClient: omnirpcClient.NewOmnirpcClient(cfg.OmnirpcURL, handler, omnirpcClient.WithCaptureReqRes()), + otelRecorder: newOtelRecorder(handler), } - // register dfk metrics - for _, pending := range cfg.DFKPending { - // heroes on both chains - err = exp.stuckHeroCount(common.HexToAddress(pending.Owner), pending.ChainName) + g.Go(func() error { + err := exp.recordMetrics(ctx) if err != nil { - return fmt.Errorf("could setup metric: %w", err) - } - } - - // register gas check metrics - for _, gasCheck := range cfg.SubmitterChecks { - for _, chainID := range gasCheck.ChainIDs { - err := exp.submitterStats(common.HexToAddress(gasCheck.Address), chainID, gasCheck.Name) - if err != nil { - return fmt.Errorf("could setup metric: %w", err) - } - } - } - - for chainID := range cfg.BridgeChecks { - for _, token := range cfg.VpriceCheckTokens { - chainID := chainID - token := token // capture func literals - - g.Go(func() error { - //nolint: wrapcheck - return retry.WithBackoff(ctx, func(ctx context.Context) error { - err = exp.vpriceStats(ctx, chainID, token) - if errors.Is(err, errPoolNotExist) { - return nil - } - - if err != nil { - return fmt.Errorf("error starting vprice:%w", err) - } - - return nil - }, retry.WithMaxAttempts(-1), retry.WithMaxAttemptTime(time.Second*10), retry.WithMaxTotalTime(-1)) - }) + return fmt.Errorf("could not record metrics: %w", err) } - } - g.Go(func() error { - return exp.getTokenBalances(ctx) + return nil }) if err := g.Wait(); err != nil { @@ -143,101 +102,61 @@ func StartExporterServer(ctx context.Context, handler metrics.Handler, cfg confi return nil } -func (e *exporter) stuckHeroCount(owner common.Address, chainName string) error { - meter := e.metrics.Meter(meterName) - attributes := attribute.NewSet(attribute.String("chain_name", chainName)) - - stuckCount, err := meter.Int64ObservableGauge(stuckHeroMetric) - if err != nil { - return fmt.Errorf("could not create gauge: %w", err) - } - - if _, err := meter.RegisterCallback(func(parentCtx context.Context, o metric.Observer) (err error) { - ctx, span := e.metrics.Tracer().Start(parentCtx, "dfk_stats", trace.WithAttributes( - attribute.String("chain_name", chainName), - )) - - defer func() { - metrics.EndSpanWithErr(span, err) - }() - - ctx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - - dfkClient := dfk.NewClient(e.client, e.cfg.DFKUrl) +const defaultMetricsInterval = 10 - stuckHeroes, err := dfkClient.StuckHeroes(ctx, core.PtrTo[int64](0), core.PtrTo(owner.String())) - if err != nil { - return fmt.Errorf("could not get stuck hero count: %w", err) +func (e *exporter) recordMetrics(ctx context.Context) (err error) { + for { + select { + case <-ctx.Done(): + return fmt.Errorf("could not record metrics: %w", ctx.Err()) + case <-time.After(defaultMetricsInterval * time.Second): + err = e.collectMetrics(ctx) + if err != nil { + logger.Errorf("could not collect metrics: %v", err) + } } - - // TODO: this maxes out at 100 now. Need binary search or something. - o.ObserveInt64(stuckCount, int64(len(stuckHeroes.Heroes)), metric.WithAttributeSet(attributes)) - - return nil - }, stuckCount); err != nil { - return fmt.Errorf("registering callback on instruments: %w", err) } - - return nil } -const stuckHeroMetric = "dfk_pending_heroes" -const gasBalance = "gas_balance" -const nonce = "nonce" - -// note: this kind of check should be deprecated in favor of submitter metrics once everything has been moved over. -func (e *exporter) submitterStats(address common.Address, chainID int, name string) error { - meter := e.metrics.Meter(fmt.Sprintf("%s_%d", meterName, chainID)) - - balanceGauge, err := meter.Float64ObservableGauge(gasBalance) - if err != nil { - return fmt.Errorf("could not create gauge: %w", err) - } - - nonceGauge, err := meter.Int64ObservableGauge(nonce) - if err != nil { - return fmt.Errorf("could not create gauge: %w", err) +// nolint: cyclop +func (e *exporter) collectMetrics(ctx context.Context) error { + var errs []error + if err := e.getTokenBalancesStats(ctx); err != nil { + errs = append(errs, fmt.Errorf("could not get token balances: %w", err)) } - attributes := attribute.NewSet(attribute.Int(metrics.ChainID, chainID), attribute.String(metrics.EOAAddress, address.String()), attribute.String("name", name)) + // TODO: parallelize - if _, err := meter.RegisterCallback(func(parentCtx context.Context, o metric.Observer) (err error) { - ctx, span := e.metrics.Tracer().Start(parentCtx, "relayer_stats", trace.WithAttributes( - attribute.Int(metrics.ChainID, chainID), - attribute.String(metrics.EOAAddress, address.String()), - )) - - defer func() { - metrics.EndSpanWithErr(span, err) - }() - - client, err := e.omnirpcClient.GetConfirmationsClient(ctx, chainID, 1) - if err != nil { - return fmt.Errorf("could not get confirmations client: %w", err) + for _, pending := range e.cfg.DFKPending { + if err := e.stuckHeroCountStats(ctx, common.HexToAddress(pending.Owner), pending.ChainName); err != nil { + errs = append(errs, fmt.Errorf("could not get stuck hero count: %w", err)) } + } - var nonce uint64 - var balance big.Int - - err = client.BatchWithContext(ctx, - eth.Nonce(address, nil).Returns(&nonce), - eth.Balance(address, nil).Returns(&balance), - ) - - if err != nil { - return fmt.Errorf("could not get balance: %w", err) + for _, gasCheck := range e.cfg.SubmitterChecks { + for _, chainID := range gasCheck.ChainIDs { + if err := e.submitterStats(common.HexToAddress(gasCheck.Address), chainID, gasCheck.Name); err != nil { + errs = append(errs, fmt.Errorf("could setup metric: %w", err)) + } } + } - ethBalance := new(big.Float).Quo(new(big.Float).SetInt(&balance), new(big.Float).SetInt64(params.Ether)) - truncEthBalance, _ := ethBalance.Float64() - - o.ObserveFloat64(balanceGauge, truncEthBalance, metric.WithAttributeSet(attributes)) - o.ObserveInt64(nonceGauge, int64(nonce), metric.WithAttributeSet(attributes)) + for chainID := range e.cfg.BridgeChecks { + for _, token := range e.cfg.VpriceCheckTokens { + //nolint: wrapcheck + return retry.WithBackoff(ctx, func(ctx context.Context) error { + err := e.vpriceStats(ctx, chainID, token) + if err != nil && !errors.Is(err, errPoolNotExist) { + errs = append(errs, fmt.Errorf("stuck hero stats: %w", err)) + } + + return nil + }, retry.WithMaxAttempts(-1), retry.WithMaxAttemptTime(time.Second*10), retry.WithMaxTotalTime(-1)) + } + } - return nil - }, balanceGauge, nonceGauge); err != nil { - return fmt.Errorf("registering callback on instruments: %w", err) + if len(errs) > 0 { + return fmt.Errorf("could not collect metrics: %v", errs) } return nil diff --git a/contrib/promexporter/exporters/otel.go b/contrib/promexporter/exporters/otel.go new file mode 100644 index 0000000000..30df77544e --- /dev/null +++ b/contrib/promexporter/exporters/otel.go @@ -0,0 +1,312 @@ +package exporters + +import ( + "context" + "math/big" + + "github.com/cornelk/hashmap" + "github.com/ethereum/go-ethereum/common" + "github.com/hedzr/log" + "github.com/synapsecns/sanguine/core" + "github.com/synapsecns/sanguine/core/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type submitterMetadata struct { + address common.Address + name string + nonce int64 + balance float64 +} + +//go:generate go run github.com/vburenin/ifacemaker -f otel.go -s otelRecorder -i iOtelRecorder -p exporters -o otel_generated.go -c "autogenerated file" +type otelRecorder struct { + metrics metrics.Handler + meter metric.Meter + + // VPRICE + vPrice *hashmap.Map[int, float64] + vpriceGauge metric.Float64ObservableGauge + + // BRIDGE + // chainID -> []tokenData + td *hashmap.Map[int, []tokenData] + // How much gas is left on the bridge. + gasBalance *hashmap.Map[int, float64] + // tokenData field gauges + gasBalanceGauge metric.Float64ObservableGauge + bridgeBalanceGauge metric.Float64ObservableGauge + feeBalanceGauge metric.Float64ObservableGauge + totalSupplyGauge metric.Float64ObservableGauge + + // dfk stats + stuckHeroes *hashmap.Map[string, int64] + stuckHeroesGauge metric.Int64ObservableGauge + + // submitter stats + submitters *hashmap.Map[int, []submitterMetadata] + balanceGauge metric.Float64ObservableGauge + nonceGauge metric.Int64ObservableGauge +} + +// TODO: unexport all methods. +// nolint: cyclop +func newOtelRecorder(meterHandler metrics.Handler) iOtelRecorder { + otr := otelRecorder{ + metrics: meterHandler, + meter: meterHandler.Meter(meterName), + stuckHeroes: hashmap.New[string, int64](), + vPrice: hashmap.New[int, float64](), + gasBalance: hashmap.New[int, float64](), + td: hashmap.New[int, []tokenData](), + submitters: hashmap.New[int, []submitterMetadata](), + } + + var err error + if otr.vpriceGauge, err = otr.meter.Float64ObservableGauge("vpriceMetric"); err != nil { + log.Warnf("failed to create vprice gauge: %v", err) + } + + if otr.bridgeBalanceGauge, err = otr.meter.Float64ObservableGauge("bridgeBalanceMetric"); err != nil { + log.Warnf("failed to create bridgeBalance gauge: %v", err) + } + + if otr.feeBalanceGauge, err = otr.meter.Float64ObservableGauge("feeBalance_total"); err != nil { + log.Warnf("failed to create feeBalance gauge: %v", err) + } + + if otr.totalSupplyGauge, err = otr.meter.Float64ObservableGauge("totalSupply"); err != nil { + log.Warnf("failed to create totalSupply gauge: %v", err) + } + + if otr.gasBalanceGauge, err = otr.meter.Float64ObservableGauge("gasBalance"); err != nil { + log.Warnf("failed to create gasBalance gauge: %v", err) + } + + if otr.balanceGauge, err = otr.meter.Float64ObservableGauge("gas_balance"); err != nil { + log.Warnf("failed to create balance gauge: %v", err) + } + + if otr.nonceGauge, err = otr.meter.Int64ObservableGauge("nonce"); err != nil { + log.Warnf("failed to create nonce gauge: %v", err) + } + + if otr.stuckHeroesGauge, err = otr.meter.Int64ObservableGauge("dfk_pending_heroes"); err != nil { + log.Warnf("failed to create stuckHeroes gauge: %v", err) + } + + // Register VPrice callback + if _, err = otr.meter.RegisterCallback(otr.recordVpriceGauge, otr.vpriceGauge); err != nil { + log.Warnf("failed to register callback for vprice metrics: %v", err) + } + + // Register DFK Stuck Heroes Callback + if _, err = otr.meter.RegisterCallback(otr.recordStuckHeroCount, otr.stuckHeroesGauge); err != nil { + log.Warnf("failed to register callback for dfk stuck heroes metrics: %v", err) + } + + // Register Token Balance Callback + if _, err = otr.meter.RegisterCallback( + otr.recordTokenBalance, + otr.bridgeBalanceGauge, + otr.feeBalanceGauge, + otr.totalSupplyGauge, + ); err != nil { + log.Warnf("failed to register callback for bridge metrics : %v", err) + } + + // Register Submitter Stats Callback + if _, err = otr.meter.RegisterCallback(otr.recordSubmitterStats, otr.balanceGauge, otr.nonceGauge); err != nil { + log.Warnf("failed to register callback for submitter metrics: %v", err) + } + + if _, err = otr.meter.RegisterCallback(otr.recordBridgeGasBalance, otr.gasBalanceGauge); err != nil { + log.Warnf("failed to register callback for bridge gas balance metrics: %v", err) + } + + return &otr +} + +// Virtual Price Metrics. +func (o *otelRecorder) RecordVPrice(chainid int, vPrice float64) { + o.vPrice.Set(chainid, vPrice) +} + +func (o *otelRecorder) recordVpriceGauge( + _ context.Context, + observer metric.Observer, +) (err error) { + if o.metrics == nil || o.vpriceGauge == nil { + return nil + } + + o.vPrice.Range( + func(chainid int, vprice float64) bool { + observer.ObserveFloat64( + o.vpriceGauge, + vprice, + metric.WithAttributes(attribute.Int(metrics.ChainID, chainid)), + ) + + return true + }, + ) + + return nil +} + +// Token Balance Metrics. +func (o *otelRecorder) RecordBridgeGasBalance(chainid int, gasBalance float64) { + o.gasBalance.Set(chainid, gasBalance) +} + +func (o *otelRecorder) recordBridgeGasBalance(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.bridgeBalanceGauge == nil { + return nil + } + + o.gasBalance.Range(func(chainID int, gasBalance float64) bool { + observer.ObserveFloat64( + o.gasBalanceGauge, + gasBalance, + metric.WithAttributes(attribute.Int(metrics.ChainID, chainID)), + ) + + return true + }) + + return nil +} + +type tokenData struct { + metadata TokenConfig + contractBalance *big.Int + totalSuppply *big.Int + feeBalance *big.Int +} + +func (o *otelRecorder) RecordTokenBalance( + chainID int, + tokenData tokenData, +) { + td, _ := o.td.Get(chainID) + td = append(td, tokenData) + + o.td.Set(chainID, td) +} +func (o *otelRecorder) recordTokenBalance( + _ context.Context, + observer metric.Observer, +) (err error) { + if o.metrics == nil || o.bridgeBalanceGauge == nil { + return nil + } + + o.td.Range(func(_ int, td []tokenData) bool { + for _, token := range td { + tokenAttributes := attribute.NewSet( + attribute.String("tokenID", token.metadata.TokenID), + attribute.Int(metrics.ChainID, token.metadata.ChainID), + ) + + bridgeBalance := core.BigToDecimals(token.contractBalance, token.metadata.TokenDecimals) + + observer.ObserveFloat64( + o.bridgeBalanceGauge, + bridgeBalance, + metric.WithAttributeSet(tokenAttributes), + ) + + feeBalance := core.BigToDecimals(token.feeBalance, token.metadata.TokenDecimals) + + observer.ObserveFloat64( + o.feeBalanceGauge, + feeBalance, + metric.WithAttributeSet(tokenAttributes), + ) + + totalSupply := core.BigToDecimals(token.totalSuppply, token.metadata.TokenDecimals) + + observer.ObserveFloat64( + o.totalSupplyGauge, + totalSupply, + metric.WithAttributeSet(tokenAttributes), + ) + } + + return true + }) + + return nil +} + +// DFK Metrics. +func (o *otelRecorder) RecordStuckHeroCount(stuckHeroes int64, chainname string) { + o.stuckHeroes.Set(chainname, stuckHeroes) +} + +func (o *otelRecorder) recordStuckHeroCount( + _ context.Context, + observer metric.Observer, +) (err error) { + if o.metrics == nil || o.stuckHeroesGauge == nil { + return nil + } + + o.stuckHeroes.Range( + func(chainName string, stuckHeroes int64) bool { + observer.ObserveInt64( + o.stuckHeroesGauge, + stuckHeroes, + metric.WithAttributes( + attribute.String("chain_name", chainName), + ), + ) + + return true + }) + + return nil +} + +// Submitter stats. +func (o *otelRecorder) RecordSubmitterStats(chainid int, metadata submitterMetadata) { + submitters, _ := o.submitters.Get(chainid) + submitters = append(submitters, metadata) + o.submitters.Set(chainid, submitters) +} + +func (o *otelRecorder) recordSubmitterStats( + _ context.Context, + observer metric.Observer, +) (err error) { + if o.metrics == nil || o.nonceGauge == nil { + return nil + } + + o.submitters.Range(func(chainID int, submitters []submitterMetadata) bool { + for _, submitter := range submitters { + observer.ObserveInt64( + o.nonceGauge, + submitter.nonce, + metric.WithAttributes(attribute.Int(metrics.ChainID, chainID), + attribute.String(metrics.EOAAddress, submitter.address.String()), + attribute.String("name", submitter.name), + ), + ) + + observer.ObserveFloat64( + o.balanceGauge, + submitter.balance, + metric.WithAttributes( + attribute.Int(metrics.ChainID, chainID), + attribute.String(metrics.EOAAddress, submitter.address.String()), + attribute.String("name", submitter.name), + ), + ) + } + return true + }) + return nil +} diff --git a/contrib/promexporter/exporters/otel_generated.go b/contrib/promexporter/exporters/otel_generated.go new file mode 100644 index 0000000000..1c42e04db0 --- /dev/null +++ b/contrib/promexporter/exporters/otel_generated.go @@ -0,0 +1,16 @@ +// autogenerated file + +package exporters + +// iOtelRecorder ... +type iOtelRecorder interface { + // Virtual Price Metrics. + RecordVPrice(chainid int, vPrice float64) + // Token Balance Metrics. + RecordBridgeGasBalance(chainid int, gasBalance float64) + RecordTokenBalance(chainID int, tokenData tokenData) + // DFK Metrics. + RecordStuckHeroCount(stuckHeroes int64, chainname string) + // Submitter stats. + RecordSubmitterStats(chainid int, metadata submitterMetadata) +} diff --git a/contrib/promexporter/exporters/submitter.go b/contrib/promexporter/exporters/submitter.go new file mode 100644 index 0000000000..b394ed5440 --- /dev/null +++ b/contrib/promexporter/exporters/submitter.go @@ -0,0 +1,58 @@ +package exporters + +import ( + "context" + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/params" + "github.com/lmittmann/w3/module/eth" + "github.com/synapsecns/sanguine/core/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// note: this kind of check should be deprecated in favor of submitter metrics once everything has been moved over. +func (e *exporter) submitterStats(address common.Address, chainID int, name string) (err error) { + ctx, span := e.metrics.Tracer().Start( + context.Background(), + "submitter_stats", + trace.WithAttributes( + attribute.Int(metrics.ChainID, chainID), + attribute.String(metrics.EOAAddress, address.String()), + )) + + defer func() { + metrics.EndSpanWithErr(span, err) + }() + + client, err := e.omnirpcClient.GetConfirmationsClient(ctx, chainID, 1) + if err != nil { + return fmt.Errorf("could not get confirmations client: %w", err) + } + + var nonce uint64 + var balance big.Int + + if err = client.BatchWithContext(ctx, + eth.Nonce(address, nil).Returns(&nonce), + eth.Balance(address, nil).Returns(&balance), + ); err != nil { + return fmt.Errorf("could not get balance: %w", err) + } + + ethBalance := new(big.Float).Quo(new(big.Float).SetInt(&balance), new(big.Float).SetInt64(params.Ether)) + truncEthBalance, _ := ethBalance.Float64() + + submitterMetadata := submitterMetadata{ + address: address, + name: name, + nonce: int64(nonce), + balance: truncEthBalance, + } + + e.otelRecorder.RecordSubmitterStats(chainID, submitterMetadata) + + return nil +} diff --git a/contrib/promexporter/exporters/util.go b/contrib/promexporter/exporters/util.go new file mode 100644 index 0000000000..afcbf27f0a --- /dev/null +++ b/contrib/promexporter/exporters/util.go @@ -0,0 +1,135 @@ +package exporters + +import ( + "context" + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/lmittmann/w3/module/eth" + "github.com/lmittmann/w3/w3types" + "github.com/synapsecns/sanguine/contrib/promexporter/internal/decoders" + "github.com/synapsecns/sanguine/core" + "github.com/synapsecns/sanguine/core/metrics" + ethergoClient "github.com/synapsecns/sanguine/ethergo/client" + "github.com/synapsecns/sanguine/services/explorer/contracts/bridgeconfig" + "golang.org/x/sync/errgroup" +) + +func (e *exporter) getAllTokens(parentCtx context.Context) (allTokens Tokens, err error) { + allTokens = []TokenConfig{} + + ctx, span := e.metrics.Tracer().Start(parentCtx, "get_all_tokens") + + defer func() { + metrics.EndSpanWithErr(span, err) + }() + + bridgeConfig, err := e.getBridgeConfig(ctx) + if err != nil { + return nil, fmt.Errorf("could not get bridge config: %w", err) + } + + // TODO: multicall is preferable here, but I ain't got time for that + tokenIDs, err := bridgeConfig.GetAllTokenIDs(&bind.CallOpts{Context: ctx}) + if err != nil { + return nil, fmt.Errorf("could not get all token ids: %w", err) + } + + bridgeConfigClient, err := e.omnirpcClient.GetConfirmationsClient(ctx, e.cfg.BridgeConfig.ChainID, 1) + if err != nil { + return nil, fmt.Errorf("could not get confirmations client: %w", err) + } + + bridgeTokens := make([]*bridgeconfig.BridgeConfigV3Token, len(tokenIDs)*len(e.cfg.BridgeChecks)) + + //nolint: revive + tokenIDS := make([]string, len(tokenIDs)*len(e.cfg.BridgeChecks)) + + var calls []w3types.Caller + + i := 0 + for _, tokenID := range tokenIDs { + for chainID := range e.cfg.BridgeChecks { + token := &bridgeconfig.BridgeConfigV3Token{} + calls = append(calls, eth.CallFunc(decoders.TokenConfigGetToken(), bridgeConfig.Address(), tokenID, big.NewInt(int64(chainID))).Returns(token)) + bridgeTokens[i] = token + tokenIDS[i] = tokenID + i++ + } + } + + // TODO: once go 1.21 is introduced do min(cfg.BatchCallLimit, 2) + err = e.batchCalls(ctx, bridgeConfigClient, calls) + if err != nil { + return nil, fmt.Errorf("could not get token balances: %w", err) + } + + for i, token := range bridgeTokens { + tokenID := tokenIDS[i] + + if token.TokenAddress == "" { + continue + } + + allTokens = append(allTokens, TokenConfig{ + TokenID: tokenID, + ChainID: int(token.ChainId.Int64()), + TokenAddress: common.HexToAddress(token.TokenAddress), + TokenDecimals: token.TokenDecimals, + HasUnderlying: token.HasUnderlying, + IsUnderlying: token.IsUnderlying, + }) + } + + return allTokens, nil +} + +func (e *exporter) batchCalls(ctx context.Context, evmClient ethergoClient.EVM, calls []w3types.Caller) (err error) { + tasks := core.ChunkSlice(calls, e.cfg.BatchCallLimit) + + g, ctx := errgroup.WithContext(ctx) + for _, task := range tasks { + g.Go(func() error { + err = evmClient.BatchWithContext(ctx, task...) + if err != nil { + return fmt.Errorf("could not batch calls: %w", err) + } + + return nil + }) + } + + err = g.Wait() + if err != nil { + return fmt.Errorf("could not get token balances: %w", err) + } + + return nil +} + +// Tokens is a list of token configs. +type Tokens []TokenConfig + +// GetForChainID returns all tokens for a given chainID. +func (t Tokens) GetForChainID(chainID int) Tokens { + var chainTokens []TokenConfig + for _, token := range t { + if token.ChainID == chainID { + chainTokens = append(chainTokens, token) + } + } + + return chainTokens +} + +// TokenConfig is a cleaned up token config. +type TokenConfig struct { + TokenID string + ChainID int + TokenAddress common.Address + TokenDecimals uint8 + HasUnderlying bool + IsUnderlying bool +} diff --git a/contrib/promexporter/go.mod b/contrib/promexporter/go.mod index c98cee7ae8..a9d9371ebf 100644 --- a/contrib/promexporter/go.mod +++ b/contrib/promexporter/go.mod @@ -21,9 +21,11 @@ require ( github.com/99designs/gqlgen v0.17.36 github.com/Flaque/filet v0.0.0-20201012163910-45f684403088 github.com/Yamashou/gqlgenc v0.10.0 + github.com/cornelk/hashmap v1.0.8 github.com/creasty/defaults v1.7.0 github.com/ethereum/go-ethereum v1.13.8 github.com/gin-gonic/gin v1.10.0 + github.com/hedzr/log v1.6.3 github.com/integralist/go-findroot v0.0.0-20160518114804-ac90681525dc github.com/ipfs/go-log v1.0.5 github.com/jftuga/ellipsis v1.0.0 @@ -81,7 +83,6 @@ require ( github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/consensys/bavard v0.1.13 // indirect github.com/consensys/gnark-crypto v0.12.1 // indirect - github.com/cornelk/hashmap v1.0.8 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/crate-crypto/go-ipa v0.0.0-20231025140028-3c0104f4b233 // indirect github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect diff --git a/contrib/promexporter/internal/decoders/gettokenbyid.go b/contrib/promexporter/internal/decoders/gettokenbyid.go index f37db14b69..1141d9d703 100644 --- a/contrib/promexporter/internal/decoders/gettokenbyid.go +++ b/contrib/promexporter/internal/decoders/gettokenbyid.go @@ -2,6 +2,7 @@ package decoders import ( "fmt" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/lmittmann/w3" "github.com/lmittmann/w3/w3types" @@ -33,9 +34,7 @@ func (c *customDecodedFunc) DecodeReturns(output []byte, returns ...any) (err er } abi.ConvertType(raw[0], returns[0]) - if err != nil { - return fmt.Errorf("could not decode returns: %w", err) - } + return nil }