-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
replace engine.Unit with ComponentManager in Pusher Engine #6747
Changes from 5 commits
3552f06
6d3f462
0aadc13
dec0d58
b5418dd
b7166f3
e25cba7
f2f53a8
f66ba69
1c80949
fbed8e7
051e6d4
ddb0cb7
17a9a2b
aad332c
ad04f27
88a7d45
cebf100
2048b4a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
tim-barry marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -4,15 +4,20 @@ | |||||||||||
package pusher | ||||||||||||
|
||||||||||||
import ( | ||||||||||||
"context" | ||||||||||||
"errors" | ||||||||||||
"fmt" | ||||||||||||
|
||||||||||||
"github.com/rs/zerolog" | ||||||||||||
|
||||||||||||
"github.com/onflow/flow-go/engine" | ||||||||||||
"github.com/onflow/flow-go/engine/common/fifoqueue" | ||||||||||||
"github.com/onflow/flow-go/model/flow" | ||||||||||||
"github.com/onflow/flow-go/model/flow/filter" | ||||||||||||
"github.com/onflow/flow-go/model/messages" | ||||||||||||
"github.com/onflow/flow-go/module" | ||||||||||||
"github.com/onflow/flow-go/module/component" | ||||||||||||
"github.com/onflow/flow-go/module/irrecoverable" | ||||||||||||
"github.com/onflow/flow-go/module/metrics" | ||||||||||||
"github.com/onflow/flow-go/network" | ||||||||||||
"github.com/onflow/flow-go/network/channels" | ||||||||||||
|
@@ -24,7 +29,6 @@ import ( | |||||||||||
// Engine is the collection pusher engine, which provides access to resources | ||||||||||||
// held by the collection node. | ||||||||||||
type Engine struct { | ||||||||||||
tim-barry marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
unit *engine.Unit | ||||||||||||
log zerolog.Logger | ||||||||||||
engMetrics module.EngineMetrics | ||||||||||||
colMetrics module.CollectionMetrics | ||||||||||||
tim-barry marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
|
@@ -33,18 +37,44 @@ type Engine struct { | |||||||||||
state protocol.State | ||||||||||||
collections storage.Collections | ||||||||||||
transactions storage.Transactions | ||||||||||||
|
||||||||||||
messageHandler *engine.MessageHandler | ||||||||||||
notifier engine.Notifier | ||||||||||||
inbound *fifoqueue.FifoQueue | ||||||||||||
|
||||||||||||
component.Component | ||||||||||||
cm *component.ComponentManager | ||||||||||||
} | ||||||||||||
|
||||||||||||
// TODO convert to network.MessageProcessor | ||||||||||||
var _ network.Engine = (*Engine)(nil) | ||||||||||||
var _ component.Component = (*Engine)(nil) | ||||||||||||
Comment on lines
+47
to
+49
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may also be a good exercise to make the change to I'll add some thoughts below; I suggest we spend some time discussing this live as well. There is no need to implement any of the comments below before we have a chance to discuss. The
Replacing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. Had a similar thought 👉 my comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change to |
||||||||||||
|
||||||||||||
func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) { | ||||||||||||
// TODO length observer metrics | ||||||||||||
inbound, err := fifoqueue.NewFifoQueue(1000) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure about the queue capacity we should pick here. I think we should have a good argument (and document it!) of why the number we chose makes sense. If we don't want to go through the trouble of defining a sensible limit, I think we could make this clear by using an unbounded queue (implementation). Anyway, I think a limit makes sense:
So very very roughly, between a collection being first proposed (usually referencing the newest known block) to the collection expiring, the collector cluster will produce @jordanschalm what do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the choice of queue size here is unlikely to have significant impact:
If we actually had 1500 collections waiting in this queue, I would feel much more confident that there was a severe problem with that node's ability to publish messages, than that collections would be broadcasted before expiring. 😀 General thoughts:
|
||||||||||||
if err != nil { | ||||||||||||
return nil, fmt.Errorf("could not create inbound fifoqueue: %w", err) | ||||||||||||
} | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add those length metrics now for completeness. You can use this engine as an example. We'll also need to add a parameter for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed in ddb0cb7 |
||||||||||||
|
||||||||||||
notifier := engine.NewNotifier() | ||||||||||||
tim-barry marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
messageHandler := engine.NewMessageHandler(log, notifier, engine.Pattern{ | ||||||||||||
Match: engine.MatchType[*messages.SubmitCollectionGuarantee], | ||||||||||||
Store: &engine.FifoMessageStore{FifoQueue: inbound}, | ||||||||||||
}) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would recommend to not use the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it did end up being less complicated - applied in f66ba69 |
||||||||||||
|
||||||||||||
e := &Engine{ | ||||||||||||
unit: engine.NewUnit(), | ||||||||||||
log: log.With().Str("engine", "pusher").Logger(), | ||||||||||||
engMetrics: engMetrics, | ||||||||||||
colMetrics: colMetrics, | ||||||||||||
me: me, | ||||||||||||
state: state, | ||||||||||||
collections: collections, | ||||||||||||
transactions: transactions, | ||||||||||||
|
||||||||||||
messageHandler: messageHandler, | ||||||||||||
notifier: notifier, | ||||||||||||
inbound: inbound, | ||||||||||||
} | ||||||||||||
|
||||||||||||
conduit, err := net.Register(channels.PushGuarantees, e) | ||||||||||||
|
@@ -53,55 +83,86 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e | |||||||||||
} | ||||||||||||
e.conduit = conduit | ||||||||||||
|
||||||||||||
e.cm = component.NewComponentManagerBuilder(). | ||||||||||||
AddWorker(e.inboundMessageWorker). | ||||||||||||
Build() | ||||||||||||
e.Component = e.cm | ||||||||||||
|
||||||||||||
return e, nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
// Ready returns a ready channel that is closed once the engine has fully | ||||||||||||
// started. | ||||||||||||
func (e *Engine) Ready() <-chan struct{} { | ||||||||||||
return e.unit.Ready() | ||||||||||||
func (e *Engine) inboundMessageWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a short godoc for this method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||||||||||||
ready() | ||||||||||||
|
||||||||||||
done := ctx.Done() | ||||||||||||
wake := e.notifier.Channel() | ||||||||||||
for { | ||||||||||||
select { | ||||||||||||
case <-done: | ||||||||||||
return | ||||||||||||
case <-wake: | ||||||||||||
e.processInboundMessages(ctx) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
// Done returns a done channel that is closed once the engine has fully stopped. | ||||||||||||
func (e *Engine) Done() <-chan struct{} { | ||||||||||||
return e.unit.Done() | ||||||||||||
func (e *Engine) processInboundMessages(ctx context.Context) { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should probably make sure functions have doc comments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, please add a short godoc comment to each function (especially public functions, but ideally private ones too). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added short doc comments in f2f53a8 |
||||||||||||
for { | ||||||||||||
nextMessage, ok := e.inbound.Pop() | ||||||||||||
if !ok { | ||||||||||||
return | ||||||||||||
} | ||||||||||||
|
||||||||||||
asEngineWrapper := nextMessage.(*engine.Message) | ||||||||||||
asSCGMsg := asEngineWrapper.Payload.(*messages.SubmitCollectionGuarantee) | ||||||||||||
originID := asEngineWrapper.OriginID | ||||||||||||
|
||||||||||||
_ = e.process(originID, asSCGMsg) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems incorrect to fully discard any error, should likely be logged instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we should be handling all possible error cases. In general, all functions which return an Here is the part of our coding guidelines talking about error handling: https://github.com/onflow/flow-go/blob/master/CodingConventions.md#error-handling. In short, the rule is to prioritize safety over liveness. Practically, the way we would do this here is using the I usually propagate errors all the way up the stack to where the Change Suggestions:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agree with your comment that discarding any error is bad. Please take a look at our Coding Conventions, if you haven't already. Tldr:
The There are exceptions to this rule, e.g. if it would be a massive time investment to clear up technical debt and thoroughly documenting possible benign errors. But then, a detailed comment is absolutely required why it is always fine to log and discard any errors in that particular case. I would suggest to leave error handling for a bit and come back to it after some iterations. After having implemented some of the cleanup and simplifications I suggest further below, I suspect that error handling will become a lot more straightforward. |
||||||||||||
|
||||||||||||
select { | ||||||||||||
case <-ctx.Done(): | ||||||||||||
return | ||||||||||||
default: | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
// SubmitLocal submits an event originating on the local node. | ||||||||||||
func (e *Engine) SubmitLocal(event interface{}) { | ||||||||||||
Comment on lines
146
to
147
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As an exercise, try to figure out which other components are feeing inputs into the Turns out that the Finalizer is the only component that is feeding the
To summarize: one of the main benefits of the Sorry for the lengthy comment. I hope you see how the very ambiguous structure of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||
e.unit.Launch(func() { | ||||||||||||
err := e.process(e.me.NodeID(), event) | ||||||||||||
if err != nil { | ||||||||||||
engine.LogError(e.log, err) | ||||||||||||
} | ||||||||||||
}) | ||||||||||||
err := e.messageHandler.Process(e.me.NodeID(), event) | ||||||||||||
if err != nil { | ||||||||||||
engine.LogError(e.log, err) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
// Submit submits the given event from the node with the given origin ID | ||||||||||||
// for processing in a non-blocking manner. It returns instantly and logs | ||||||||||||
// a potential processing error internally when done. | ||||||||||||
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { | ||||||||||||
e.unit.Launch(func() { | ||||||||||||
err := e.process(originID, event) | ||||||||||||
if err != nil { | ||||||||||||
engine.LogError(e.log, err) | ||||||||||||
} | ||||||||||||
}) | ||||||||||||
err := e.messageHandler.Process(originID, event) | ||||||||||||
if err != nil { | ||||||||||||
engine.LogError(e.log, err) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
// ProcessLocal processes an event originating on the local node. | ||||||||||||
jordanschalm marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
func (e *Engine) ProcessLocal(event interface{}) error { | ||||||||||||
return e.unit.Do(func() error { | ||||||||||||
return e.process(e.me.NodeID(), event) | ||||||||||||
}) | ||||||||||||
return e.messageHandler.Process(e.me.NodeID(), event) | ||||||||||||
} | ||||||||||||
|
||||||||||||
// Process processes the given event from the node with the given origin ID in | ||||||||||||
// a blocking manner. It returns the potential processing error when done. | ||||||||||||
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { | ||||||||||||
return e.unit.Do(func() error { | ||||||||||||
return e.process(originID, event) | ||||||||||||
}) | ||||||||||||
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { | ||||||||||||
err := e.messageHandler.Process(originID, message) | ||||||||||||
if err != nil { | ||||||||||||
if errors.Is(err, engine.IncompatibleInputTypeError) { | ||||||||||||
e.log.Warn().Bool(logging.KeySuspicious, true).Msgf("%v delivered unsupported message %T through %v", originID, message, channel) | ||||||||||||
return nil | ||||||||||||
} | ||||||||||||
// TODO add comment about Process errors... | ||||||||||||
return fmt.Errorf("unexpected failure to process inbound pusher message") | ||||||||||||
} | ||||||||||||
return nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
// process processes events for the pusher engine on the collection node. | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take a look at the core processing logic in the Engine and lets try to remove layers of complexity and indirection where possible:
flow-go/engine/collection/pusher/engine.go
Lines 182 to 191 in b5418dd
Looking at this code raises a few questions:
we are only propagating collections that are originating from this node (👉 code). But the exported method
SubmitCollectionGuarantee
is not checking this? Why not?onSubmitCollectionGuarantee
either..SubmitCollectionGuarantee
is only ever called by thepusher.Engine
itself (not even the tests call it).Lets simplify this: merge
SubmitCollectionGuarantee
andonSubmitCollectionGuarantee
.Further digging into the code, I found that the
originID
is just completely useless in this context:pusher.Engine
is fed only from internal components (sooriginID
is always filled with the node's own identifier)pusher.Engine
drops messages that are not from itself. Just hypothetically, lets assume that there were collections withoriginID
other than the node's own ID. ❗Note that those messages would be queued first, wasting resources and then we have a different thread discard the Collection after dequeueing. Checks like that should always be applied before queueing irrelevant messages.What you see here is a pattern we struggle with a lot: the code is very ambiguous and allows a whole bunch of stuff that is never intended to happen in practise. This is an unsuitable design for engineering complex systems such as Flow, as we then have to manually handle undesired cases.
SubmitCollectionGuarantee
, the compiler would enforce this constraint for us as opposed to us having to write code. Often, people don't bother to manually disallow undesired inputs, resulting in software that is doing something incorrect in case of bugs as opposed to crashing (or the code not compiling in the first place)originID
. We employ a generic patter which obfuscates that we don't need anoriginID
in this particular example, because theEngine
interface does not differentiate between node-external inputs (can be malicious) vs inputs from other components within its own node (always trusted) and mixes modes of processing (synchronous vs asynchronous).For those reasons, we deprecated the
Engine
interface. Lets try to update thepusher
and remove the engine interface (for concrete steps towards this, please see my next comment).For Flow, the APIs should generally be as descriptive and restrictive as possible and make it hard to feed in wrong / mismatching inputs. Writing extra lines of boilerplate code is not a challenge for Flow. In contrast, it is challenging if you have to dig through hundreds or thousands of lines of code because the APIs are too generic and you need to know implementation details to use them correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partially adressed in b7166f3 and e25cba7:
originID
has been pretty much removed from the interface, now thatProcess
andSubmit
(the functions in the Engine interface handling node-external inputs) simply error on any input.