Skip to content

Commit

Permalink
Merge pull request #13 from agoric-labs/mhofman/p2p-recv-msg-routine
Browse files Browse the repository at this point in the history
feat(p2p): avoid blocking ping/pong; feed msgs through a limited buffer to a separate goroutine
  • Loading branch information
michaelfig authored Dec 24, 2024
2 parents 82eab39 + 7223d99 commit 3f83b97
Show file tree
Hide file tree
Showing 2 changed files with 335 additions and 21 deletions.
100 changes: 79 additions & 21 deletions p2p/conn/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@ const (
defaultSendTimeout = 10 * time.Second
defaultPingInterval = 60 * time.Second
defaultPongTimeout = 45 * time.Second
defaultMsgRecvQueueSize = 1000
)

type (
receiveCbFunc func(chID byte, msgBytes []byte)
errorCbFunc func(interface{})
)

type ConnMsg struct {
channelID byte
msgBytes []byte
}

/*
Each peer has one `MConnection` (multiplex connection) instance.
Expand Down Expand Up @@ -80,19 +86,22 @@ Inbound message bytes are handled with an onReceive callback function.
type MConnection struct {
service.BaseService

conn net.Conn
bufConnReader *bufio.Reader
bufConnWriter *bufio.Writer
sendMonitor *flow.Monitor
recvMonitor *flow.Monitor
send chan struct{}
pong chan struct{}
channels []*Channel
channelsIdx map[byte]*Channel
onReceive receiveCbFunc
onError errorCbFunc
errored uint32
config MConnConfig
conn net.Conn
bufConnReader *bufio.Reader
bufConnWriter *bufio.Writer
sendMonitor *flow.Monitor
recvMonitor *flow.Monitor
send chan struct{}
pong chan struct{}
msgRecvQueue chan ConnMsg
msgRecvAck chan uint32
msgBytesBufferedLength uint32
channels []*Channel
channelsIdx map[byte]*Channel
onReceive receiveCbFunc
onError errorCbFunc
errored uint32
config MConnConfig

// Closing quitSendRoutine will cause the sendRoutine to eventually quit.
// doneSendRoutine is closed when the sendRoutine actually quits.
Expand Down Expand Up @@ -136,6 +145,12 @@ type MConnConfig struct {

// Maximum wait time for pongs
PongTimeout time.Duration `mapstructure:"pong_timeout"`

// Maximum number of received messages to buffer from a peer
MsgRecvQueueSize int `mapstructure:"msg_recv_queue_size"`

// Maximum size of the sum of bytes for received messages to the buffer from a peer
RecvMessageCapacity int `mapstructure:"recv_message_capacity"`
}

// DefaultMConnConfig returns the default config.
Expand All @@ -147,6 +162,8 @@ func DefaultMConnConfig() MConnConfig {
FlushThrottle: defaultFlushThrottle,
PingInterval: defaultPingInterval,
PongTimeout: defaultPongTimeout,
MsgRecvQueueSize: defaultMsgRecvQueueSize,
RecvMessageCapacity: defaultRecvMessageCapacity,
}
}

Expand Down Expand Up @@ -185,6 +202,8 @@ func NewMConnectionWithConfig(
recvMonitor: flow.New(0, 0),
send: make(chan struct{}, 1),
pong: make(chan struct{}, 1),
msgRecvQueue: make(chan ConnMsg, config.MsgRecvQueueSize),
msgRecvAck: make(chan uint32, config.MsgRecvQueueSize+1),
onReceive: onReceive,
onError: onError,
config: config,
Expand Down Expand Up @@ -232,6 +251,7 @@ func (c *MConnection) OnStart() error {
c.quitRecvRoutine = make(chan struct{})
go c.sendRoutine()
go c.recvRoutine()
go c.recvMsgRoutine()
return nil
}

Expand Down Expand Up @@ -561,6 +581,7 @@ func (c *MConnection) sendPacketMsg() bool {
// Otherwise, it never blocks.
func (c *MConnection) recvRoutine() {
defer c._recover()
defer close(c.msgRecvQueue)

protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)

Expand Down Expand Up @@ -627,6 +648,16 @@ FOR_LOOP:
// never block
}
case *tmp2p.Packet_PacketMsg:
DRAIN_MSG_ACKS_LOOP:
for {
select {
case readSize := <-c.msgRecvAck:
c.msgBytesBufferedLength -= readSize
default:
break DRAIN_MSG_ACKS_LOOP
}
}

channelID := byte(pkt.PacketMsg.ChannelID)
channel, ok := c.channelsIdx[channelID]
if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil {
Expand All @@ -645,9 +676,18 @@ FOR_LOOP:
break FOR_LOOP
}
if msgBytes != nil {
c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes)
// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
c.onReceive(channelID, msgBytes)
msgLen := uint32(len(msgBytes))
bufferMaxSize := uint32(c.config.RecvMessageCapacity)
if msgLen > bufferMaxSize {
bufferMaxSize = msgLen
}
c.msgBytesBufferedLength += msgLen
// Wait until our message fits in the queue
for c.msgBytesBufferedLength > bufferMaxSize {
c.msgBytesBufferedLength -= <-c.msgRecvAck
}
connMsg := ConnMsg{channelID, msgBytes}
c.msgRecvQueue <- connMsg
}
default:
err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
Expand All @@ -664,6 +704,28 @@ FOR_LOOP:
}
}

func (c *MConnection) recvMsgRoutine() {
defer c._recover()
defer close(c.msgRecvAck)

for {
connMsg, ok := <-c.msgRecvQueue

if !ok {
// The recvRoutine exited and closed the msgRecvQueue channel. No more messages to process
break
}

msgLen := len(connMsg.msgBytes)
// Will never block as long as msgRecvAck is at least one bigger than msgRecvQueue
c.msgRecvAck <- uint32(msgLen)

// NOTE: This means the reactor.Receive of all messages each runs in a thread
// per channel
c.onReceive(connMsg.channelID, connMsg.msgBytes)
}
}

// not goroutine-safe
func (c *MConnection) stopPongTimer() {
if c.pongTimer != nil {
Expand Down Expand Up @@ -867,11 +929,7 @@ func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
if packet.EOF {
msgBytes := ch.recving

// clear the slice without re-allocating.
// http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
// suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
// at which point the recving slice stops being used and should be garbage collected
ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
ch.recving = make([]byte, 0, ch.desc.RecvBufferCapacity)
return msgBytes, nil
}
return nil, nil
Expand Down
Loading

0 comments on commit 3f83b97

Please sign in to comment.