Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup consensus metrics #2815

Merged
merged 2 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions snow/engine/snowman/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ func (i *issuer) Abandon(ctx context.Context, _ ids.ID) {
i.t.removeFromPending(i.blk)
i.t.addToNonVerifieds(i.blk)
i.t.blocked.Abandon(ctx, blkID)

// Tracks performance statistics
i.t.metrics.numRequests.Set(float64(i.t.blkReqs.Len()))
i.t.metrics.numBlocked.Set(float64(len(i.t.pending)))
i.t.metrics.numBlockers.Set(float64(i.t.blocked.Len()))
}
i.abandoned = true
}
Expand Down
72 changes: 28 additions & 44 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (t *Transitive) Gossip(ctx context.Context) error {
return nil
}

lastAccepted, err := t.GetBlock(ctx, lastAcceptedID)
lastAccepted, err := t.getBlock(ctx, lastAcceptedID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
Expand Down Expand Up @@ -300,7 +300,7 @@ func (t *Transitive) Put(ctx context.Context, nodeID ids.NodeID, requestID uint3
if _, err := t.issueFrom(ctx, nodeID, blk, issuedMetric); err != nil {
return err
}
return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (t *Transitive) GetFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
Expand All @@ -323,9 +323,7 @@ func (t *Transitive) GetFailed(ctx context.Context, nodeID ids.NodeID, requestID

// Because the get request was dropped, we no longer expect blkID to be issued.
t.blocked.Abandon(ctx, blkID)
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (t *Transitive) PullQuery(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkID ids.ID, requestedHeight uint64) error {
Expand All @@ -339,7 +337,7 @@ func (t *Transitive) PullQuery(ctx context.Context, nodeID ids.NodeID, requestID
return err
}

return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (t *Transitive) PushQuery(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkBytes []byte, requestedHeight uint64) error {
Expand Down Expand Up @@ -380,7 +378,7 @@ func (t *Transitive) PushQuery(ctx context.Context, nodeID ids.NodeID, requestID
return err
}

return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (t *Transitive) Chits(ctx context.Context, nodeID ids.NodeID, requestID uint32, preferredID ids.ID, preferredIDAtHeight ids.ID, acceptedID ids.ID) error {
Expand Down Expand Up @@ -436,8 +434,7 @@ func (t *Transitive) Chits(ctx context.Context, nodeID ids.NodeID, requestID uin
}

t.blocked.Register(ctx, v)
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (t *Transitive) QueryFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
Expand All @@ -454,8 +451,7 @@ func (t *Transitive) QueryFailed(ctx context.Context, nodeID ids.NodeID, request
requestID: requestID,
},
)
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (*Transitive) Timeout(context.Context) error {
Expand All @@ -478,7 +474,7 @@ func (t *Transitive) Notify(ctx context.Context, msg common.Message) error {
case common.PendingTxs:
// the pending txs message means we should attempt to build a block.
t.pendingBuildBlocks++
return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
case common.StateSyncDone:
t.Ctx.StateSyncing.Set(false)
return nil
Expand All @@ -501,7 +497,7 @@ func (t *Transitive) Start(ctx context.Context, startReqID uint32) error {
return err
}

lastAccepted, err := t.GetBlock(ctx, lastAcceptedID)
lastAccepted, err := t.getBlock(ctx, lastAcceptedID)
if err != nil {
t.Ctx.Log.Error("failed to get last accepted block",
zap.Error(err),
Expand Down Expand Up @@ -553,7 +549,7 @@ func (t *Transitive) Start(ctx context.Context, startReqID uint32) error {
return fmt.Errorf("failed to notify VM that consensus is starting: %w",
err)
}
return nil
return t.executeDeferredWork(ctx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added here because of the call to t.deliver above.

}

func (t *Transitive) HealthCheck(ctx context.Context) (interface{}, error) {
Expand Down Expand Up @@ -584,7 +580,19 @@ func (t *Transitive) HealthCheck(ctx context.Context) (interface{}, error) {
return intf, fmt.Errorf("vm: %w ; consensus: %w", vmErr, consensusErr)
}

func (t *Transitive) GetBlock(ctx context.Context, blkID ids.ID) (snowman.Block, error) {
func (t *Transitive) executeDeferredWork(ctx context.Context) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is going to get larger as I continue to cleanup the logic here.

if err := t.buildBlocks(ctx); err != nil {
return err
}

t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
t.metrics.numBlocked.Set(float64(len(t.pending)))
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
t.metrics.numNonVerifieds.Set(float64(t.nonVerifieds.Len()))
Comment on lines +584 to +587
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now the only location these gauges are set.

return nil
}

func (t *Transitive) getBlock(ctx context.Context, blkID ids.ID) (snowman.Block, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor unrelated cleanup - but this method didn't need to be exported anymore.

if blk, ok := t.pending[blkID]; ok {
return blk, nil
}
Expand Down Expand Up @@ -737,7 +745,7 @@ func (t *Transitive) issueFromByID(
blkID ids.ID,
issuedMetric prometheus.Counter,
) (bool, error) {
blk, err := t.GetBlock(ctx, blkID)
blk, err := t.getBlock(ctx, blkID)
if err != nil {
t.sendRequest(ctx, nodeID, blkID, issuedMetric)
return false, nil
Expand All @@ -763,7 +771,7 @@ func (t *Transitive) issueFrom(

blkID = blk.Parent()
var err error
blk, err = t.GetBlock(ctx, blkID)
blk, err = t.getBlock(ctx, blkID)

// If we don't have this ancestor, request it from [vdr]
if err != nil || !blk.Status().Fetched() {
Expand All @@ -784,10 +792,6 @@ func (t *Transitive) issueFrom(
// dependencies may still be waiting. Therefore, they should abandoned.
t.blocked.Abandon(ctx, blkID)
}

// Tracks performance statistics
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return issued, t.errs.Err
}

Expand All @@ -808,7 +812,7 @@ func (t *Transitive) issueWithAncestors(
return false, err
}
blkID = blk.Parent()
blk, err = t.GetBlock(ctx, blkID)
blk, err = t.getBlock(ctx, blkID)
if err != nil {
status = choices.Unknown
break
Expand All @@ -830,7 +834,6 @@ func (t *Transitive) issueWithAncestors(
// We don't have this block and have no reason to expect that we will get it.
// Abandon the block to avoid a memory leak.
t.blocked.Abandon(ctx, blkID)
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return false, t.errs.Err
}

Expand Down Expand Up @@ -873,7 +876,7 @@ func (t *Transitive) issue(

// block on the parent if needed
parentID := blk.Parent()
if parent, err := t.GetBlock(ctx, parentID); err != nil || !(t.Consensus.Decided(parent) || t.Consensus.Processing(parentID)) {
if parent, err := t.getBlock(ctx, parentID); err != nil || !(t.Consensus.Decided(parent) || t.Consensus.Processing(parentID)) {
t.Ctx.Log.Verbo("block waiting for parent to be issued",
zap.Stringer("blkID", blkID),
zap.Stringer("parentID", parentID),
Expand All @@ -882,11 +885,6 @@ func (t *Transitive) issue(
}

t.blocked.Register(ctx, i)

// Tracks performance statistics
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
t.metrics.numBlocked.Set(float64(len(t.pending)))
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.errs.Err
}

Expand Down Expand Up @@ -916,9 +914,6 @@ func (t *Transitive) sendRequest(
zap.Stringer("blkID", blkID),
)
t.Sender.SendGet(ctx, nodeID, t.requestID, blkID)

// Tracks performance statistics
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
}

// Send a query for this block. If push is set to true, blkBytes will be used to
Expand Down Expand Up @@ -994,16 +989,14 @@ func (t *Transitive) deliver(
// longer pending
t.removeFromPending(blk)
parentID := blk.Parent()
parent, err := t.GetBlock(ctx, parentID)
parent, err := t.getBlock(ctx, parentID)
// Because the dependency must have been fulfilled by the time this function
// is called - we don't expect [err] to be non-nil. But it is handled for
// completness and future proofing.
if err != nil || !(parent.Status() == choices.Accepted || t.Consensus.Processing(parentID)) {
// if the parent isn't processing or the last accepted block, then this
// block is effectively rejected
t.blocked.Abandon(ctx, blkID)
t.metrics.numBlocked.Set(float64(len(t.pending))) // Tracks performance statistics
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.errs.Err
}

Expand All @@ -1016,8 +1009,6 @@ func (t *Transitive) deliver(
}
if !blkAdded {
t.blocked.Abandon(ctx, blkID)
t.metrics.numBlocked.Set(float64(len(t.pending))) // Tracks performance statistics
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.errs.Err
}

Expand Down Expand Up @@ -1081,11 +1072,6 @@ func (t *Transitive) deliver(

// If we should issue multiple queries at the same time, we need to repoll
t.repoll(ctx)

// Tracks performance statistics
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
t.metrics.numBlocked.Set(float64(len(t.pending)))
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.errs.Err
}

Expand All @@ -1112,7 +1098,6 @@ func (t *Transitive) addToNonVerifieds(blk snowman.Block) {
if t.nonVerifieds.Has(parentID) || t.Consensus.Processing(parentID) {
t.nonVerifieds.Add(blkID, parentID)
t.nonVerifiedCache.Put(blkID, blk)
t.metrics.numNonVerifieds.Set(float64(t.nonVerifieds.Len()))
}
}

Expand Down Expand Up @@ -1144,7 +1129,6 @@ func (t *Transitive) addUnverifiedBlockToConsensus(
issuedMetric.Inc()
t.nonVerifieds.Remove(blkID)
t.nonVerifiedCache.Evict(blkID)
t.metrics.numNonVerifieds.Set(float64(t.nonVerifieds.Len()))
t.metrics.issuerStake.Observe(float64(t.Validators.GetWeight(t.Ctx.SubnetID, nodeID)))
t.Ctx.Log.Verbo("adding block to consensus",
zap.Stringer("nodeID", nodeID),
Expand Down
2 changes: 1 addition & 1 deletion snow/engine/snowman/voter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (v *voter) getProcessingAncestor(ctx context.Context, initialVote ids.ID) (
// have at our disposal as a best-effort mechanism to find a valid ancestor.
bubbledVote := v.t.nonVerifieds.GetAncestor(initialVote)
for {
blk, err := v.t.GetBlock(ctx, bubbledVote)
blk, err := v.t.getBlock(ctx, bubbledVote)
// If we cannot retrieve the block, drop [vote]
if err != nil {
v.t.Ctx.Log.Debug("dropping vote",
Expand Down
Loading