-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(p2p): avoid blocking ping/pong; feed msgs through a limited buffer to a separate goroutine #13
Conversation
p2p/conn/connection.go
Outdated
|
||
// Wake up recvRoutine if necessary | ||
select { | ||
case c.msgRecvAck <- struct{}{}: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't it be just a bool channel instead? OR are we using struct{}{} as it is memory efficient (0 bytes) when it comes to just signalling things with channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Go optimizes an empty struct not to use memory, so it is technically more memory efficient and idiomatic. In Go you use an empty structure when you want to use a channel for signaling only.
if msgLen > bufferMaxSize { | ||
bufferMaxSize = msgLen | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mhofman
Will this ever be True at this place? 🤔
The function recvPacketMsg
at line no. 656 must already throw err (received message exceeds available capacity
) if msgLen > bufferMaxSize
was true.
So I think this condition is redundant and will always evaluate to false
OR I might be missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that a channel can be configured with a different message size limit, which can be higher than the default. If such a message were to be received on the channel, the connection needs to allow it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah I see, FillDefaults
conditionally fills Channel's RecvMessageCapacity
with defaultRecvMessageCapacity
. Got it!
Btw shouldn't we respect (or use) channel.desc.RecvMessageCapacity
as our bufferMaxSize
especially when it is set lesser than the default (defaultRecvMessageCapacity
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe defaultRecvMessageCapacity is rightly used as our goal is to keep total msgBytes in queue (at any instance) to be within 21Mb, right? Otherwise we'd be burning 1000*21 memory.
So I guess we can resolve the discussion. Things are clarified 👍
@@ -45,13 +45,19 @@ const ( | |||
defaultSendTimeout = 10 * time.Second | |||
defaultPingInterval = 60 * time.Second | |||
defaultPongTimeout = 45 * time.Second | |||
defaultMsgRecvQueueSize = 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we picked this size based on some testing/analysis?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a guess. I have been running a mainnet follower since with a higher limit and will be looking at the high water mark to get an idea of a reasonable default value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JeancarloBarrios please update this to 5000
connMsg, ok := <-c.msgRecvQueue | ||
|
||
if !ok { | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is this break
expected to be encountered? If it's an error condition, something noisier is warranted. But if it's just routine shutdown from recvRoutine
closing the channel, a comment to that effect would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
routine shutdown
p2p/conn/connection.go
Outdated
// Will never block as long as msgRecvAck is at least one bigger than msgRecvQueue | ||
c.msgRecvAck <- uint32(msgLen) | ||
// The following logging of c.msgRecvSize is not a correct representation of the current aggregate size | ||
// of the queue but is instead the size of the queue as accounted by the producer routine (trailing behind) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"producer routine"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The producer for this channel is recvRoutine
. I admit we could be better at names here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update the names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to avoid changing the name of the original routine to keep future conflict resolution to a minimum, but maybe that's a misplaced hope
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok !!
// suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes, | ||
// at which point the recving slice stops being used and should be garbage collected | ||
ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity) | ||
ch.recving = make([]byte, 0, ch.desc.RecvBufferCapacity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
4a9f117
to
037c437
Compare
106ec6a
to
d816f6e
Compare
d816f6e
to
7223d99
Compare
Refs: Agoric/agoric-sdk#10742
Description
Add a new goroutine to handle messages read from the peer connection. This allows processing ping/pong messages even if reactors are blocked. To avoid unbounded growth the buffered channel between the recv and msg processing go routines is limited in depth of number of msgs, and in aggregate size of msgs.
This maintains the in order processing of messages over a p2p connection, but offloads all its processing, including protobuf parsing to the goroutine, and as such localizes the changed needed to the connection abstraction unlike other cometbft efforts (cometbft#3230, cometbft#3209). It's also compatible with efforts to parallelize some reactor processing that are ongoing (cometbft#2685, cometbft#3554)
Scaling considerations
Incorporates a "fix" for cometbft#3237 as we need to be able to keep receiving messages on a channel that has a message pending in the queue. This is done by simply reallocating a new buffer instead of truncating the working one. I believe this was a premature optimization of the tendermint code, golang has a pretty good memory allocator.
The aggregate size of the queue is bounded to the default max size of a message in a channel, but allowing a single message over that default size if the channel needs it. I believe this ensure a remote peer cannot consume significantly more memory than it could otherwise: it can only at most consume twice the max size of a message allowed by any channel.
Testing
Tested manually on a follower running this branch as an out-of-process tendermint.
TODO: add unit tests for edge conditions
MsgRecvQueueSize
at 0 (unbuffered channel). A single message received should be processed by the reactor immediatelyPR checklist
.changelog
(we useunclog to manage our changelog)
docs/
orspec/
) and code comments