Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add operator registration metric at node #502

Merged
merged 6 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Config struct {
NodeApiPort string
EnableMetrics bool
MetricsPort string
OnchainMetricsInterval int64
Timeout time.Duration
RegisterNodeAtStart bool
ExpirationPollIntervalSec uint64
Expand Down Expand Up @@ -167,6 +168,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name),
MetricsPort: ctx.GlobalString(flags.MetricsPortFlag.Name),
OnchainMetricsInterval: ctx.GlobalInt64(flags.OnchainMetricsIntervalFlag.Name),
Timeout: timeout,
RegisterNodeAtStart: registerNodeAtStart,
ExpirationPollIntervalSec: expirationPollIntervalSec,
Expand Down
8 changes: 8 additions & 0 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ var (
Value: "9091",
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "METRICS_PORT"),
}
OnchainMetricsIntervalFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "onchain-metrics-interval"),
Usage: "The interval in seconds at which the node polls the onchain state of the operator and update metrics. <=0 means no poll",
Required: false,
Value: "180",
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ONCHAIN_METRICS_INTERVAL"),
}
TimeoutFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "timeout"),
Usage: "Amount of time to wait for GPRC",
Expand Down Expand Up @@ -238,6 +245,7 @@ var requiredFlags = []cli.Flag{
RetrievalPortFlag,
EnableMetricsFlag,
MetricsPortFlag,
OnchainMetricsIntervalFlag,
EnableNodeApiFlag,
NodeApiPortFlag,
TimeoutFlag,
Expand Down
15 changes: 9 additions & 6 deletions node/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
commonmock "github.com/Layr-Labs/eigenda/common/mock"
"github.com/Layr-Labs/eigenda/core"
core_mock "github.com/Layr-Labs/eigenda/core/mock"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/kzg"
"github.com/Layr-Labs/eigenda/encoding/kzg/prover"
Expand Down Expand Up @@ -101,12 +102,7 @@ func newTestServer(t *testing.T, mockValidator bool) *grpc.Server {
}
noopMetrics := metrics.NewNoopMetrics()
reg := prometheus.NewRegistry()
metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090")
store, err := node.NewLevelDBStore(dbPath, logger, metrics, 1e9, 1e9)
if err != nil {
panic("failed to create a new levelDB store")
}
defer os.Remove(dbPath)
tx := &coremock.MockTransactor{}

ratelimiter := &commonmock.NoopRatelimiter{}

Expand Down Expand Up @@ -138,6 +134,13 @@ func newTestServer(t *testing.T, mockValidator bool) *grpc.Server {
val = core.NewShardValidator(v, asn, cst, opID)
}

metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", opID, -1, tx, chainState)
store, err := node.NewLevelDBStore(dbPath, logger, metrics, 1e9, 1e9)
if err != nil {
panic("failed to create a new levelDB store")
}
defer os.Remove(dbPath)

node := &node.Node{
Config: config,
Logger: logger,
Expand Down
95 changes: 85 additions & 10 deletions node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package node

import (
"context"
"math/big"
"sort"
"strconv"
"time"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/eth"
"github.com/Layr-Labs/eigensdk-go/logging"
eigenmetrics "github.com/Layr-Labs/eigensdk-go/metrics"

Expand All @@ -20,8 +24,8 @@ const (
type Metrics struct {
logger logging.Logger

// Whether the node is registered.
Registered prometheus.Gauge
// The quorums the node is registered.
Registered *prometheus.GaugeVec
// Accumulated number of RPC requests received.
AccNumRequests *prometheus.CounterVec
// The latency (in ms) to process the request.
Expand All @@ -40,22 +44,29 @@ type Metrics struct {
registry *prometheus.Registry
// socketAddr is the address at which the metrics server will be listening.
// should be in format ip:port
socketAddr string
socketAddr string
operatorId core.OperatorID
onchainMetricsInterval int64
tx core.Transactor
chainState core.ChainState
}

func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, logger logging.Logger, socketAddr string) *Metrics {
func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, logger logging.Logger, socketAddr string, operatorId core.OperatorID, onchainMetricsInterval int64, tx core.Transactor, chainState core.ChainState) *Metrics {

// Add Go module collectors
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

metrics := &Metrics{
Registered: promauto.With(reg).NewGauge(
// The "type" label have values: stake_share, rank. The "stake_share" is stake share (in basis point),
// and the "rank" is operator's ranking (the operator with highest amount of stake ranked as 1) by stake share in the quorum.
Registered: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Name: "registered",
Help: "indicator about if DA node is registered",
Help: "the quorums the DA node is registered",
},
[]string{"quorum", "type"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait I am very confused with this metrics.
This says Registered which seems, operator is registered or not. So binary 0 and 1

but we are setting this metrics to stake_share and rank. I think it will create confusion and probably not a right use of this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead I think the better way can be - have 3 diff metrics

  1. stake_share and for that label would be quorum
  2. rank and for that label would be quorum
  3. registered for that label would be quorum

so basically you just slice by quorum for all 3 metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means the stake registered for each quorum: one for the precentage info, the other for the rank by the stake
I'm not sure why it's confusing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coz the metrics name is registered so this means- we are emitting the value of registered. but here the value is actually of a label and that's confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like for example

node_eigenda_blobs_total{quorum="0",type="number"} 97605
node_eigenda_blobs_total{quorum="0",type="size"} 1.16567530656e+11

the metrics value is of node_eigenda_blobs_total - and then you can slice by quorum and type to identify more params.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just have 2 time series per quorum: the stake share and rank. The same for node_eigenda_blobs_total.
There is no confusion point, no inefficiency as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
from https://prometheus.io/docs/practices/naming/

Madhur makes good points and I feel it would be clearer if we move these labels into their own metrics time series.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, this may affect a lot of metrics we built

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I know. but I think we should at least start making sure new metrics are correct and eventually migrate other faulty ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you feel strongly about it, feel free to open a PR for it, I'll approve it

),
// The "status" label has values: success, failure.
AccNumRequests: promauto.With(reg).NewCounterVec(
Expand Down Expand Up @@ -108,17 +119,25 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log
Help: "the total number of node's socket address updates",
},
),
EigenMetrics: eigenMetrics,
logger: logger.With("component", "NodeMetrics"),
registry: reg,
socketAddr: socketAddr,
EigenMetrics: eigenMetrics,
logger: logger.With("component", "NodeMetrics"),
registry: reg,
socketAddr: socketAddr,
operatorId: operatorId,
onchainMetricsInterval: onchainMetricsInterval,
tx: tx,
chainState: chainState,
}

return metrics
}

func (g *Metrics) Start() {
_ = g.EigenMetrics.Start(context.Background(), g.registry)

if g.onchainMetricsInterval > 0 {
go g.collectOnchainMetrics()
}
}

func (g *Metrics) RecordRPCRequest(method string, status string) {
Expand Down Expand Up @@ -150,3 +169,59 @@ func (g *Metrics) AcceptBatches(status string, batchSize uint64) {
g.AccuBatches.WithLabelValues("number", status).Inc()
g.AccuBatches.WithLabelValues("size", status).Add(float64(batchSize))
}

func (g *Metrics) collectOnchainMetrics() {
ticker := time.NewTicker(time.Duration(uint64(g.onchainMetricsInterval)))
defer ticker.Stop()

// 3 chain RPC calls in each cycle.
shrimalmadhur marked this conversation as resolved.
Show resolved Hide resolved
for range ticker.C {
ctx := context.Background()
blockNum, err := g.tx.GetCurrentBlockNumber(ctx)
if err != nil {
g.logger.Error("Failed to query chain RPC for current block number", "err", err)
continue
}
bitmaps, err := g.tx.GetQuorumBitmapForOperatorsAtBlockNumber(ctx, []core.OperatorID{g.operatorId}, blockNum)
if err != nil {
g.logger.Error("Failed to query chain RPC for quorum bitmap", "blockNumber", blockNum, "err", err)
continue
}
quorumIds := eth.BitmapToQuorumIds(bitmaps[0])
if len(quorumIds) == 0 {
g.logger.Warn("This node is currently not in any quorum", "blockNumber", blockNum, "operatorId", g.operatorId.Hex())
continue
}
state, err := g.chainState.GetOperatorState(ctx, uint(blockNum), quorumIds)
if err != nil {
g.logger.Error("Failed to query chain RPC for operator state", "blockNumber", blockNum, "quorumIds", quorumIds, "err", err)
continue
}
type OperatorStakeShare struct {
operatorId core.OperatorID
stakeShare float64
}
for q, operators := range state.Operators {
operatorStakeShares := make([]*OperatorStakeShare, 0)
for opId, opInfo := range operators {
share, _ := new(big.Int).Div(new(big.Int).Mul(opInfo.Stake, big.NewInt(10000)), state.Totals[q].Stake).Float64()
operatorStakeShares = append(operatorStakeShares, &OperatorStakeShare{operatorId: opId, stakeShare: share})
}
// Descending order by stake share in the quorum.
sort.Slice(operatorStakeShares, func(i, j int) bool {
if operatorStakeShares[i].stakeShare == operatorStakeShares[j].stakeShare {
return operatorStakeShares[i].operatorId.Hex() < operatorStakeShares[j].operatorId.Hex()
}
return operatorStakeShares[i].stakeShare > operatorStakeShares[j].stakeShare
})
for i, op := range operatorStakeShares {
if op.operatorId == g.operatorId {
g.Registered.WithLabelValues(string(q), "stake_share").Set(op.stakeShare)
shrimalmadhur marked this conversation as resolved.
Show resolved Hide resolved
g.Registered.WithLabelValues(string(q), "rank").Set(float64(i + 1))
g.logger.Info("Current operator registration onchain", "operatorId", g.operatorId.Hex(), "blockNumber", blockNum, "quorumId", q, "stakeShare (basis point)", op.stakeShare, "rank", i+1)
break
}
}
}
}
}
4 changes: 2 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger

promReg := prometheus.NewRegistry()
eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, promReg, logger.With("component", "EigenMetrics"))

metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort)
rpcCallsCollector := rpccalls.NewCollector(AppName, promReg)

// Generate BLS keys
Expand Down Expand Up @@ -110,6 +108,8 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger
// Setup Node Api
nodeApi := nodeapi.NewNodeApi(AppName, SemVer, ":"+config.NodeApiPort, logger.With("component", "NodeApi"))

metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst)

// Make validator
v, err := verifier.NewVerifier(&config.EncoderConfig, false)
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion node/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
pb "github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/mock"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/ethereum/go-ethereum/common/hexutil"

"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand Down Expand Up @@ -177,7 +180,13 @@ func TestStoringBlob(t *testing.T) {
noopMetrics := metrics.NewNoopMetrics()
reg := prometheus.NewRegistry()
logger := logging.NewNoopLogger()
s, _ := node.NewLevelDBStore(t.TempDir(), logger, node.NewMetrics(noopMetrics, reg, logger, ":9090"), staleMeasure, storeDuration)
operatorId := [32]byte(hexutil.MustDecode("0x3fbfefcdc76462d2cdb7d0cea75f27223829481b8b4aa6881c94cb2126a316ad"))
tx := &coremock.MockTransactor{}
dat, _ := mock.MakeChainDataMock(map[uint8]int{
0: 6,
1: 3,
})
s, _ := node.NewLevelDBStore(t.TempDir(), logger, node.NewMetrics(noopMetrics, reg, logger, ":9090", operatorId, -1, tx, dat), staleMeasure, storeDuration)
ctx := context.Background()

// Empty store
Expand Down
16 changes: 8 additions & 8 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,6 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging
_, v0 := mustMakeTestComponents()
val := core.NewShardValidator(v0, asn, cst, id)

noopMetrics := metrics.NewNoopMetrics()
reg := prometheus.NewRegistry()
metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090")
store, err := node.NewLevelDBStore(config.DbPath+"/chunk", logger, metrics, 1e9, 1e9)
if err != nil {
t.Fatal(err)
}

tx := &coremock.MockTransactor{}
tx.On("RegisterBLSPublicKey").Return(nil)
tx.On("RegisterOperator").Return(nil)
Expand All @@ -282,6 +274,14 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging
tx.On("GetBlockStaleMeasure").Return(nil)
tx.On("GetStoreDurationBlocks").Return(nil)

noopMetrics := metrics.NewNoopMetrics()
reg := prometheus.NewRegistry()
metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", config.ID, -1, tx, cst)
store, err := node.NewLevelDBStore(config.DbPath+"/chunk", logger, metrics, 1e9, 1e9)
if err != nil {
t.Fatal(err)
}

mockOperatorSocketsFilterer := &coremock.MockOperatorSocketsFilterer{}

mockSocketChan := make(chan string)
Expand Down
Loading