Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small metrics cleanup #3088

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions snow/networking/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,25 @@ func New(
return nil, fmt.Errorf("initializing handler metrics errored with: %w", err)
}
cpuTracker := resourceTracker.CPUTracker()
h.syncMessageQueue, err = NewMessageQueue(h.ctx, h.validators, cpuTracker, "handler")
h.syncMessageQueue, err = NewMessageQueue(
h.ctx.Log,
h.ctx.SubnetID,
h.validators,
cpuTracker,
"handler",
h.ctx.Registerer,
)
if err != nil {
return nil, fmt.Errorf("initializing sync message queue errored with: %w", err)
}
h.asyncMessageQueue, err = NewMessageQueue(h.ctx, h.validators, cpuTracker, "handler_async")
h.asyncMessageQueue, err = NewMessageQueue(
h.ctx.Log,
h.ctx.SubnetID,
h.validators,
cpuTracker,
"handler_async",
h.ctx.Registerer,
)
if err != nil {
return nil, fmt.Errorf("initializing async message queue errored with: %w", err)
}
Expand Down
28 changes: 16 additions & 12 deletions snow/networking/handler/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/snow/networking/tracker"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/buffer"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
)

Expand Down Expand Up @@ -60,7 +60,8 @@ type messageQueue struct {
clock mockable.Clock
metrics messageQueueMetrics

ctx *snow.ConsensusContext
log logging.Logger
subnetID ids.ID
// Validator set for the chain associated with this
vdrs validators.Manager
// Tracks CPU utilization of each node
Expand All @@ -75,20 +76,23 @@ type messageQueue struct {
}

func NewMessageQueue(
ctx *snow.ConsensusContext,
log logging.Logger,
subnetID ids.ID,
vdrs validators.Manager,
cpuTracker tracker.Tracker,
metricsNamespace string,
reg prometheus.Registerer,
) (MessageQueue, error) {
m := &messageQueue{
ctx: ctx,
log: log,
subnetID: subnetID,
vdrs: vdrs,
cpuTracker: cpuTracker,
cond: sync.NewCond(&sync.Mutex{}),
nodeToUnprocessedMsgs: make(map[ids.NodeID]int),
msgAndCtxs: buffer.NewUnboundedDeque[*msgAndContext](1 /*=initSize*/),
}
return m, m.metrics.initialize(metricsNamespace, ctx.Registerer)
return m, m.metrics.initialize(metricsNamespace, reg)
}

func (m *messageQueue) Push(ctx context.Context, msg Message) {
Expand Down Expand Up @@ -137,7 +141,7 @@ func (m *messageQueue) Pop() (context.Context, Message, bool) {
i := 0
for {
if i == n {
m.ctx.Log.Debug("canPop is false for all unprocessed messages",
m.log.Debug("canPop is false for all unprocessed messages",
zap.Int("numMessages", n),
)
}
Expand Down Expand Up @@ -212,21 +216,21 @@ func (m *messageQueue) canPop(msg message.InboundMessage) bool {
// the number of nodes with unprocessed messages.
baseMaxCPU := 1 / float64(len(m.nodeToUnprocessedMsgs))
nodeID := msg.NodeID()
weight := m.vdrs.GetWeight(m.ctx.SubnetID, nodeID)
weight := m.vdrs.GetWeight(m.subnetID, nodeID)

var portionWeight float64
if totalVdrsWeight, err := m.vdrs.TotalWeight(m.ctx.SubnetID); err != nil {
if totalVdrsWeight, err := m.vdrs.TotalWeight(m.subnetID); err != nil {
// The sum of validator weights should never overflow, but if they do,
// we treat portionWeight as 0.
m.ctx.Log.Error("failed to get total weight of validators",
zap.Stringer("subnetID", m.ctx.SubnetID),
m.log.Error("failed to get total weight of validators",
zap.Stringer("subnetID", m.subnetID),
zap.Error(err),
)
} else if totalVdrsWeight == 0 {
// The sum of validator weights should never be 0, but handle that case
// for completeness here to avoid divide by 0.
m.ctx.Log.Warn("validator set is empty",
zap.Stringer("subnetID", m.ctx.SubnetID),
m.log.Warn("validator set is empty",
zap.Stringer("subnetID", m.subnetID),
)
} else {
portionWeight = float64(weight) / float64(totalVdrsWeight)
Expand Down
19 changes: 13 additions & 6 deletions snow/networking/handler/message_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,35 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/snow/networking/tracker"
"github.com/ava-labs/avalanchego/snow/snowtest"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"
)

func TestQueue(t *testing.T) {
ctrl := gomock.NewController(t)
require := require.New(t)
cpuTracker := tracker.NewMockTracker(ctrl)
snowCtx := snowtest.Context(t, snowtest.CChainID)
ctx := snowtest.ConsensusContext(snowCtx)
vdrs := validators.NewManager()
vdr1ID, vdr2ID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID()
require.NoError(vdrs.AddStaker(ctx.SubnetID, vdr1ID, nil, ids.Empty, 1))
require.NoError(vdrs.AddStaker(ctx.SubnetID, vdr2ID, nil, ids.Empty, 1))
mIntf, err := NewMessageQueue(ctx, vdrs, cpuTracker, "")
require.NoError(vdrs.AddStaker(constants.PrimaryNetworkID, vdr1ID, nil, ids.Empty, 1))
require.NoError(vdrs.AddStaker(constants.PrimaryNetworkID, vdr2ID, nil, ids.Empty, 1))
mIntf, err := NewMessageQueue(
logging.NoLog{},
constants.PrimaryNetworkID,
vdrs,
cpuTracker,
"",
prometheus.NewRegistry(),
)
require.NoError(err)
u := mIntf.(*messageQueue)
currentTime := time.Now()
Expand Down
12 changes: 6 additions & 6 deletions vms/avm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/versiondb"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/ava-labs/avalanchego/version"
"github.com/ava-labs/avalanchego/vms/avm/block"
"github.com/ava-labs/avalanchego/vms/avm/config"
"github.com/ava-labs/avalanchego/vms/avm/metrics"
"github.com/ava-labs/avalanchego/vms/avm/network"
"github.com/ava-labs/avalanchego/vms/avm/state"
"github.com/ava-labs/avalanchego/vms/avm/txs"
Expand All @@ -47,6 +47,7 @@ import (
blockbuilder "github.com/ava-labs/avalanchego/vms/avm/block/builder"
blockexecutor "github.com/ava-labs/avalanchego/vms/avm/block/executor"
extensions "github.com/ava-labs/avalanchego/vms/avm/fxs"
avmmetrics "github.com/ava-labs/avalanchego/vms/avm/metrics"
txexecutor "github.com/ava-labs/avalanchego/vms/avm/txs/executor"
xmempool "github.com/ava-labs/avalanchego/vms/avm/txs/mempool"
)
Expand All @@ -66,7 +67,7 @@ type VM struct {

config.Config

metrics metrics.Metrics
metrics avmmetrics.Metrics

avax.AddressManager
ids.Aliaser
Expand Down Expand Up @@ -173,16 +174,15 @@ func (vm *VM) Initialize(
zap.Reflect("config", avmConfig),
)

registerer := prometheus.NewRegistry()
if err := ctx.Metrics.Register("", registerer); err != nil {
vm.registerer, err = metrics.MakeAndRegister(ctx.Metrics, "")
if err != nil {
return err
}
vm.registerer = registerer

vm.connectedPeers = make(map[ids.NodeID]*version.Application)

// Initialize metrics as soon as possible
vm.metrics, err = metrics.New(registerer)
vm.metrics, err = avmmetrics.New(vm.registerer)
if err != nil {
return fmt.Errorf("failed to initialize metrics: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions vms/platformvm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"time"

"github.com/gorilla/rpc/v2"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/codec/linearcodec"
Expand All @@ -35,7 +35,6 @@ import (
"github.com/ava-labs/avalanchego/vms/platformvm/block"
"github.com/ava-labs/avalanchego/vms/platformvm/config"
"github.com/ava-labs/avalanchego/vms/platformvm/fx"
"github.com/ava-labs/avalanchego/vms/platformvm/metrics"
"github.com/ava-labs/avalanchego/vms/platformvm/network"
"github.com/ava-labs/avalanchego/vms/platformvm/reward"
"github.com/ava-labs/avalanchego/vms/platformvm/state"
Expand All @@ -47,6 +46,7 @@ import (
snowmanblock "github.com/ava-labs/avalanchego/snow/engine/snowman/block"
blockbuilder "github.com/ava-labs/avalanchego/vms/platformvm/block/builder"
blockexecutor "github.com/ava-labs/avalanchego/vms/platformvm/block/executor"
platformvmmetrics "github.com/ava-labs/avalanchego/vms/platformvm/metrics"
txexecutor "github.com/ava-labs/avalanchego/vms/platformvm/txs/executor"
pmempool "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool"
pvalidators "github.com/ava-labs/avalanchego/vms/platformvm/validators"
Expand All @@ -65,7 +65,7 @@ type VM struct {
*network.Network
validators.State

metrics metrics.Metrics
metrics platformvmmetrics.Metrics

// Used to get time. Useful for faking time during tests.
clock mockable.Clock
Expand Down Expand Up @@ -113,13 +113,13 @@ func (vm *VM) Initialize(
}
chainCtx.Log.Info("using VM execution config", zap.Reflect("config", execConfig))

registerer := prometheus.NewRegistry()
if err := chainCtx.Metrics.Register("", registerer); err != nil {
registerer, err := metrics.MakeAndRegister(chainCtx.Metrics, "")
if err != nil {
return err
}

// Initialize metrics as soon as possible
vm.metrics, err = metrics.New(registerer)
vm.metrics, err = platformvmmetrics.New(registerer)
if err != nil {
return fmt.Errorf("failed to initialize metrics: %w", err)
}
Expand Down
Loading