Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace engine.Unit with ComponentManager in Pusher Engine #6747

Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
3552f06
replace engine.Unit with ComponentManager in Pusher Engine
tim-barry Nov 21, 2024
6d3f462
pusher engine test: update positive test
tim-barry Nov 21, 2024
0aadc13
Pusher engine test: update negative test
tim-barry Nov 21, 2024
dec0d58
Start pusher engine in mocks
tim-barry Nov 21, 2024
b5418dd
Merge branch 'master' into tim/7018-pusher-engine-use-componentmanager
jordanschalm Nov 22, 2024
b7166f3
Refactor pusher engine: merge function with no callers
tim-barry Nov 27, 2024
e25cba7
Refactor pusher engine: error on non-local messages
tim-barry Nov 27, 2024
f2f53a8
Refactor pusher engine: rename and propagate error
tim-barry Nov 27, 2024
f66ba69
Refactor pusher engine
tim-barry Nov 27, 2024
1c80949
Revert "Pusher engine test: update negative test"
tim-barry Nov 27, 2024
fbed8e7
Refactor pusher engine: (lint) remove unused code
tim-barry Nov 27, 2024
051e6d4
Merge branch 'feature/pusher-engine-refactor' into tim/7018-pusher-en…
tim-barry Nov 27, 2024
ddb0cb7
Refactor pusher engine: queue length metrics
tim-barry Nov 29, 2024
17a9a2b
Update pusher engine doc comment
tim-barry Nov 29, 2024
aad332c
Apply suggestions from code review
tim-barry Nov 30, 2024
ad04f27
Pusher engine refactor: remove unused collection metrics
tim-barry Nov 30, 2024
88a7d45
Refactor pusher engine: add error return doc comments
tim-barry Dec 2, 2024
cebf100
Refactor pusher engine: add metrics for dropped messages
tim-barry Dec 3, 2024
2048b4a
Apply suggestions from code review
tim-barry Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ func main() {
node.EngineRegistry,
node.State,
node.Metrics.Engine,
node.Metrics.Mempool,
colMetrics,
node.Me,
node.Storage.Collections,
Expand Down
168 changes: 116 additions & 52 deletions engine/collection/pusher/engine.go
Copy link
Member

@AlexHentschel AlexHentschel Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look at the core processing logic in the Engine and lets try to remove layers of complexity and indirection where possible:

func (e *Engine) onSubmitCollectionGuarantee(originID flow.Identifier, req *messages.SubmitCollectionGuarantee) error {
if originID != e.me.NodeID() {
return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID)
}
return e.SubmitCollectionGuarantee(&req.Guarantee)
}
// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes.
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {

Looking at this code raises a few questions:

  • we are only propagating collections that are originating from this node (👉 code). But the exported method SubmitCollectionGuarantee is not checking this? Why not?

    • If it is an important check, we should always check it. If we are fine with collections from other nodes being broadcast, then we don't need the check in onSubmitCollectionGuarantee either..
    • turns out, SubmitCollectionGuarantee is only ever called by the pusher.Engine itself (not even the tests call it).

    Lets simplify this: merge SubmitCollectionGuarantee and onSubmitCollectionGuarantee.

  • Further digging into the code, I found that the originID is just completely useless in this context:

    • the pusher.Engine is fed only from internal components (so originID is always filled with the node's own identifier)
    • the pusher.Engine drops messages that are not from itself. Just hypothetically, lets assume that there were collections with originID other than the node's own ID. ❗Note that those messages would be queued first, wasting resources and then we have a different thread discard the Collection after dequeueing. Checks like that should always be applied before queueing irrelevant messages.

    What you see here is a pattern we struggle with a lot: the code is very ambiguous and allows a whole bunch of stuff that is never intended to happen in practise. This is an unsuitable design for engineering complex systems such as Flow, as we then have to manually handle undesired cases.

    • Example 1: messages other than SubmitCollectionGuarantee. In comparison, if we had a method that only accepted SubmitCollectionGuarantee, the compiler would enforce this constraint for us as opposed to us having to write code. Often, people don't bother to manually disallow undesired inputs, resulting in software that is doing something incorrect in case of bugs as opposed to crashing (or the code not compiling in the first place)
    • Example 2: originID. We employ a generic patter which obfuscates that we don't need an originID in this particular example, because the Engine interface does not differentiate between node-external inputs (can be malicious) vs inputs from other components within its own node (always trusted) and mixes modes of processing (synchronous vs asynchronous).

    For those reasons, we deprecated the Engine interface. Lets try to update the pusher and remove the engine interface (for concrete steps towards this, please see my next comment).

For Flow, the APIs should generally be as descriptive and restrictive as possible and make it hard to feed in wrong / mismatching inputs. Writing extra lines of boilerplate code is not a challenge for Flow. In contrast, it is challenging if you have to dig through hundreds or thousands of lines of code because the APIs are too generic and you need to know implementation details to use them correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially adressed in b7166f3 and e25cba7: originID has been pretty much removed from the interface, now that Process and Submit (the functions in the Engine interface handling node-external inputs) simply error on any input.

tim-barry marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
package pusher

import (
"context"
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/common/fifoqueue"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
Expand All @@ -21,10 +25,9 @@ import (
"github.com/onflow/flow-go/utils/logging"
)

// Engine is the collection pusher engine, which provides access to resources
// held by the collection node.
// Engine is part of the Collection Nodes. It broadcasts finalized collections
// that the cluster generates to the broader network.
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
type Engine struct {
unit *engine.Unit
log zerolog.Logger
engMetrics module.EngineMetrics
colMetrics module.CollectionMetrics
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -33,18 +36,53 @@ type Engine struct {
state protocol.State
collections storage.Collections
transactions storage.Transactions

notifier engine.Notifier
queue *fifoqueue.FifoQueue

component.Component
cm *component.ComponentManager
}

func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) {
// TODO convert to network.MessageProcessor
var _ network.Engine = (*Engine)(nil)
var _ component.Component = (*Engine)(nil)
Comment on lines +47 to +49
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may also be a good exercise to make the change to MessageProcessor here, though likely as a separate PR.

I'll add some thoughts below; I suggest we spend some time discussing this live as well. There is no need to implement any of the comments below before we have a chance to discuss.

The pusher engine is unusual in a couple of ways:

  1. It is using an old framework for internal message-passing, where two components in the same process would pass messages between each other using standard methods (SubmitLocal and ProcessLocal). The meaning of the message was carried by its type (here: messages.SubmitCollectionGuarantee) and the logical structure was similar to that of messages received over the network. Message processing logic needed to potentially handle inputs originating both locally and remotely.
  2. It does not expect to receive any inbound messages from non-local sources. It is send-only from a networking perspective.

Replacing network.Engine with network.MessageProcessor there are a few opportunities for simplifying the implementation:

  • Currently the internal message-passing is done through SubmitLocal. We can remove SubmitLocal and ProcessLocal and replace them with a context-specific function (and corresponding interface type, for use in the caller).
    • A reference example is the Compliance interface, which is exposed by the compliance engine for local message-passing.
    • For this reason, engine.MessageHandler may not be needed in this context. It is better suited for cases where we are handling external messages.
    • We then do not need to check originIDs and do not need both onSubmitCollectionGuarantee and SubmitCollectionGuarantee methods
    • Now, this context-specific function is the only way for local messages to enter the pusher engine, which will simplify our network message processing logic 👇
  • In order to satisfy network.MessageProcessor, we need a Process method, which accepts messages from remote network sources. Since this engine does not expect any remote messages, we should reject all inputs to Process.
    • In general, when we observe an unexpected message from another node, we should flag it as a possible Byzantine message. In the future, these messages would result in a slashing challenge targeting the originator of the message. For now, we can log a warning with the relevant log tag.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Had a similar thought 👉 my comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to network.MessageProcessor and the new interface for local messages will be part of a separate PR.


// New creates a new pusher engine.
func New(
log zerolog.Logger,
net network.EngineRegistry,
state protocol.State,
engMetrics module.EngineMetrics,
mempoolMetrics module.MempoolMetrics,
colMetrics module.CollectionMetrics,
me module.Local,
collections storage.Collections,
transactions storage.Transactions,
) (*Engine, error) {
queue, err := fifoqueue.NewFifoQueue(
1000,
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
fifoqueue.WithLengthObserver(func(len int) {
mempoolMetrics.MempoolEntries(metrics.ResourceSubmitCollectionGuaranteesQueue, uint(len))
}),
)
if err != nil {
return nil, fmt.Errorf("could not create fifoqueue: %w", err)
}

notifier := engine.NewNotifier()
tim-barry marked this conversation as resolved.
Show resolved Hide resolved

e := &Engine{
unit: engine.NewUnit(),
log: log.With().Str("engine", "pusher").Logger(),
engMetrics: engMetrics,
colMetrics: colMetrics,
me: me,
state: state,
collections: collections,
transactions: transactions,

notifier: notifier,
queue: queue,
}

conduit, err := net.Register(channels.PushGuarantees, e)
Expand All @@ -53,81 +91,107 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e
}
e.conduit = conduit

e.cm = component.NewComponentManagerBuilder().
AddWorker(e.outboundQueueWorker).
Build()
e.Component = e.cm

return e, nil
}

// Ready returns a ready channel that is closed once the engine has fully
// started.
func (e *Engine) Ready() <-chan struct{} {
return e.unit.Ready()
// Worker to process SubmitCollectionGuarantee messages coming from the Finalizer.
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()

done := ctx.Done()
wake := e.notifier.Channel()
for {
select {
case <-done:
return
case <-wake:
err := e.processOutboundMessages(ctx)
if err != nil {
ctx.Throw(err)
}
}
}
}

// Done returns a done channel that is closed once the engine has fully stopped.
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done()
// processOutboundMessages processes any available messages from the queue.
// Only returns when the queue is empty (or the engine is terminated).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add error documentation here. Either enumerate possible error types, or add // No errors expected during normal operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't expect any errors here, because the only sources are wrong-typed items in the queue (shouldn't happen since we only ever insert the correct type) or error in publishCollectionGuarantee, which is not expected.

func (e *Engine) processOutboundMessages(ctx context.Context) error {
for {
nextMessage, ok := e.queue.Pop()
if !ok {
return nil
}

asSCGMsg, ok := nextMessage.(*messages.SubmitCollectionGuarantee)
if !ok {
return fmt.Errorf("invalid message type in pusher engine queue")
}

err := e.publishCollectionGuarantee(&asSCGMsg.Guarantee)
if err != nil {
return err
}

select {
case <-ctx.Done():
return nil
default:
}
}
}

// SubmitLocal submits an event originating on the local node.
func (e *Engine) SubmitLocal(event interface{}) {
Comment on lines 146 to 147
Copy link
Member

@AlexHentschel AlexHentschel Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an exercise, try to figure out which other components are feeing inputs into the pusher.Engine. We are lucky here because we have a rarely used message type SubmitCollectionGuarantee. But just imagine you would see a log that says no matching processor for message of type flow.Header from origin. With the Engine interface it is way too time intensive to figure out which components inside a node are feeding which other components with data.

Turns out that the Finalizer is the only component that is feeding the pusher.Engine (other than messages arriving from the networking layer). Let's make our lives easier in the future by improving the code:

  • introduce a dedicated interface GuaranteedCollectionPublisher. It only has one type-safe method
    SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee)
    this method just takes the message and puts it into the queue. Then we know that the queue will only ever contain elements of type flow.CollectionGuarantee. (currently, the Finalizer creates the wrapper messages.SubmitCollectionGuarantee around CollectionGuarantee ... and the pusher.Engine unwraps it and throws the wrapper away 😑 ).
  • change the Finalizer and all layers in between to work with the GuaranteedCollectionPublisher interface. This improves code clarity: currently, engineers would need to know what functionality was backing the generic Engine interface, e.g. here: because the right type of inputs need to be fed into the pusher.Engine. When using the GuaranteedCollectionPublisher interface we have the following benefits:
    • the type itself carries information about what functionality is behind this interface. Huge improvement of clarity.
    • its easy to find all places that can feed the Pusher with data, because we work with concrete interfaces and context-specific methods (e.g. SubmitCollectionGuarantee).
    • the compiler will reject code that feeds inputs with incompatible type into the Pusher
  • In my opinion, it would improve clarity if we were to discard all inputs from the network.MessageProcessor - conceptually they are messages from different node, which they are already broadcasting. The Pusher is for broadcasting our own messages. But at the moment, inputs from node-internal components as well as messages from other nodes passing through the networking layer share the same code path in the pusher.Enging, which makes it non-trivial to figure out where messages are coming from.

To summarize: one of the main benefits of the Component interface is that it provides lifecycles methods, but no methods for ingesting data - on purpose. This is because all data from components within the node should be passed via context-specific methods. The only exception is the interface network.MessageProcessor, which would still call Process with a generic message type.

Sorry for the lengthy comment. I hope you see how the very ambiguous structure of the Engine makes it really hard to determine from the code what we are expecting and where to drop messages, because its all hidden behind layers of ambiguous interfaces in a huge code base.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed comment :)

I added the described SubmitCollectionGuarantee method in f66ba69 , but the interface & changing the Finalizer will be pushed to the next PR.

Inputs from the network.MessageProcessor (via the Process method) are being discarded (e25cba7).

e.unit.Launch(func() {
err := e.process(e.me.NodeID(), event)
if err != nil {
engine.LogError(e.log, err)
}
})
ev, ok := event.(*messages.SubmitCollectionGuarantee)
if ok {
e.SubmitCollectionGuarantee(ev)
} else {
engine.LogError(e.log, fmt.Errorf("invalid message argument to pusher engine"))
}
}

// Submit submits the given event from the node with the given origin ID
// for processing in a non-blocking manner. It returns instantly and logs
// a potential processing error internally when done.
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
e.unit.Launch(func() {
err := e.process(originID, event)
if err != nil {
engine.LogError(e.log, err)
}
})
engine.LogError(e.log, fmt.Errorf("pusher engine should only receive local messages on the same node"))
}

// ProcessLocal processes an event originating on the local node.
jordanschalm marked this conversation as resolved.
Show resolved Hide resolved
func (e *Engine) ProcessLocal(event interface{}) error {
return e.unit.Do(func() error {
return e.process(e.me.NodeID(), event)
})
ev, ok := event.(*messages.SubmitCollectionGuarantee)
if ok {
e.SubmitCollectionGuarantee(ev)
return nil
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
} else {
return fmt.Errorf("invalid message argument to pusher engine")
}
}

// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns the potential processing error when done.
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error {
return e.unit.Do(func() error {
return e.process(originID, event)
})
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error {
return fmt.Errorf("pusher engine should only receive local messages on the same node")
}

// process processes events for the pusher engine on the collection node.
func (e *Engine) process(originID flow.Identifier, event interface{}) error {
switch ev := event.(type) {
case *messages.SubmitCollectionGuarantee:
e.engMetrics.MessageReceived(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee)
defer e.engMetrics.MessageHandled(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee)
return e.onSubmitCollectionGuarantee(originID, ev)
default:
return fmt.Errorf("invalid event type (%T)", event)
// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue
// to later be published to consensus nodes.
func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) {
ok := e.queue.Push(msg)
Copy link
Member

@jordanschalm jordanschalm Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we were using a generic function (eg. SubmitLocal(message any)), the type wrapper (*messages.SubmitCollectionGuarantee) was useful because it communicated the meaning of the message ("this collection guarantee should be broadcast").

When we have a context-specific method for enqueueing collection guarantees, we no longer need the type wrapper, and can simply pass *CollectionGuarantee directly: the method communicates the meaning of the message, not the argument type.

(Let's apply this change in the follow-up PR.)

if !ok {
engine.LogError(e.log, fmt.Errorf("failed to store collection guarantee in queue"))
return
}
e.notifier.Notify()
}

// onSubmitCollectionGuarantee handles submitting the given collection guarantee
// to consensus nodes.
func (e *Engine) onSubmitCollectionGuarantee(originID flow.Identifier, req *messages.SubmitCollectionGuarantee) error {
if originID != e.me.NodeID() {
return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID)
}

return e.SubmitCollectionGuarantee(&req.Guarantee)
}

// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes.
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {
// publishCollectionGuarantee publishes the collection guarantee to all consensus nodes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add error return documentation to this function. If it might return an expected sentinel error (including from sub-calls into Identities() and Publish()), we should list those possible error returns. Otherwise, we should add the line // No errors expected during normal operation.

If you haven't already, take a look at the godoc references for Identities() and Publish() and think about what errors they might return, and which of those are expected vs unexpected in the context of executing publishCollectionGuarantee.

In particular, Identities() does have a listed error return value -- state.ErrUnknownSnapshotReference -- but in the context here, it is not expected. Why is that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Identities(), I believe since we're using the Final() (latest finalized in cache) snapshot, we expect to already have a valid snapshot reference / queryable block in the state, so expect Identities() on it to never error.

From what I can tell Publish() only returns an error if the conduit channel has been closed (in which case I would assume we can't continue), or other "benign" errors when the constructed message is invalid in some way (pretty sure this is not expected) or some failure in the underlying libp2p (I assume also not expected).

func (e *Engine) publishCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {
consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus))
if err != nil {
return fmt.Errorf("could not get consensus nodes: %w", err)
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
13 changes: 12 additions & 1 deletion engine/collection/pusher/engine_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package pusher_test

import (
"context"
"io"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
Expand All @@ -12,6 +14,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
module "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/network/channels"
Expand Down Expand Up @@ -69,6 +72,7 @@ func (suite *Suite) SetupTest() {
suite.state,
metrics,
metrics,
metrics,
suite.me,
suite.collections,
suite.transactions,
Expand All @@ -82,19 +86,26 @@ func TestPusherEngine(t *testing.T) {

// should be able to submit collection guarantees to consensus nodes
func (suite *Suite) TestSubmitCollectionGuarantee() {
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(suite.T(), context.Background())
suite.engine.Start(ctx)
defer cancel()
done := make(chan struct{})

guarantee := unittest.CollectionGuaranteeFixture()

// should submit the collection to consensus nodes
consensus := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleConsensus))
suite.conduit.On("Publish", guarantee, consensus[0].NodeID).Return(nil)
suite.conduit.On("Publish", guarantee, consensus[0].NodeID).
Run(func(_ mock.Arguments) { close(done) }).Return(nil).Once()

msg := &messages.SubmitCollectionGuarantee{
Guarantee: *guarantee,
}
err := suite.engine.ProcessLocal(msg)
suite.Require().Nil(err)

unittest.RequireCloseBefore(suite.T(), done, time.Second, "message not sent")

suite.conduit.AssertExpectations(suite.T())
}

Expand Down
1 change: 1 addition & 0 deletions engine/testutil/mock/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (n CollectionNode) Start(t *testing.T) {
n.IngestionEngine.Start(n.Ctx)
n.EpochManagerEngine.Start(n.Ctx)
n.ProviderEngine.Start(n.Ctx)
n.PusherEngine.Start(n.Ctx)
}

func (n CollectionNode) Ready() <-chan struct{} {
Expand Down
2 changes: 1 addition & 1 deletion engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func CollectionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ro
retrieve)
require.NoError(t, err)

pusherEngine, err := pusher.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Me, collections, transactions)
pusherEngine, err := pusher.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Metrics, node.Me, collections, transactions)
require.NoError(t, err)

clusterStateFactory, err := factories.NewClusterStateFactory(
Expand Down
1 change: 1 addition & 0 deletions module/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ const (
ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel
ResourceClusterBlockProposalQueue = "cluster_compliance_proposal_queue" // collection node, compliance engine
ResourceTransactionIngestQueue = "ingest_transaction_queue" // collection node, ingest engine
ResourceSubmitCollectionGuaranteesQueue = "submit_col_guarantee_queue" // collection node, pusher engine
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
ResourceBeaconKey = "beacon-key" // consensus node, DKG engine
ResourceDKGMessage = "dkg_private_message" // consensus, DKG messaging engine
ResourceApprovalQueue = "sealing_approval_queue" // consensus node, sealing engine
Expand Down
Loading