From 24807d98e151b10b34696dcc0707367f1313572b Mon Sep 17 00:00:00 2001 From: Mathieu Hofman <86499+mhofman@users.noreply.github.com> Date: Wed, 11 Dec 2024 19:48:36 -0300 Subject: [PATCH 1/4] feat(p2p): add msg recv routine --- p2p/conn/connection.go | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index da50c7f7e94..974b7f9e458 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -45,6 +45,7 @@ const ( defaultSendTimeout = 10 * time.Second defaultPingInterval = 60 * time.Second defaultPongTimeout = 45 * time.Second + defaultMsgRecvQueueSize = 1000 ) type ( @@ -52,6 +53,11 @@ type ( errorCbFunc func(interface{}) ) +type ConnMsg struct { + channelID byte + msgBytes []byte +} + /* Each peer has one `MConnection` (multiplex connection) instance. @@ -87,6 +93,7 @@ type MConnection struct { recvMonitor *flow.Monitor send chan struct{} pong chan struct{} + msgRecvQueue chan ConnMsg channels []*Channel channelsIdx map[byte]*Channel onReceive receiveCbFunc @@ -136,6 +143,9 @@ type MConnConfig struct { // Maximum wait time for pongs PongTimeout time.Duration `mapstructure:"pong_timeout"` + + // Maximum number of received messages to buffer from a peer + MsgRecvQueueSize int `mapstructure:"msg_recv_queue_size"` } // DefaultMConnConfig returns the default config. @@ -147,6 +157,7 @@ func DefaultMConnConfig() MConnConfig { FlushThrottle: defaultFlushThrottle, PingInterval: defaultPingInterval, PongTimeout: defaultPongTimeout, + MsgRecvQueueSize: defaultMsgRecvQueueSize, } } @@ -185,6 +196,7 @@ func NewMConnectionWithConfig( recvMonitor: flow.New(0, 0), send: make(chan struct{}, 1), pong: make(chan struct{}, 1), + msgRecvQueue: make(chan ConnMsg, config.MsgRecvQueueSize), onReceive: onReceive, onError: onError, config: config, @@ -232,6 +244,7 @@ func (c *MConnection) OnStart() error { c.quitRecvRoutine = make(chan struct{}) go c.sendRoutine() go c.recvRoutine() + go c.recvMsgRoutine() return nil } @@ -561,6 +574,7 @@ func (c *MConnection) sendPacketMsg() bool { // Otherwise, it never blocks. func (c *MConnection) recvRoutine() { defer c._recover() + defer close(c.msgRecvQueue) protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize) @@ -646,8 +660,8 @@ FOR_LOOP: } if msgBytes != nil { c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes) - // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine - c.onReceive(channelID, msgBytes) + connMsg := ConnMsg{channelID, msgBytes} + c.msgRecvQueue <- connMsg } default: err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet)) @@ -664,6 +678,23 @@ FOR_LOOP: } } +func (c *MConnection) recvMsgRoutine() { + defer c._recover() + + for { + connMsg, ok := <-c.msgRecvQueue + + if !ok { + // The recvRoutine exited and closed the msgRecvQueue channel. No more messages to process + break + } + + c.Logger.Debug("Processing message", "chID", connMsg.channelID, "pendingMsgs", len(c.msgRecvQueue)) + // NOTE: This means the reactor.Receive of all channels run in the same thread + c.onReceive(connMsg.channelID, connMsg.msgBytes) + } +} + // not goroutine-safe func (c *MConnection) stopPongTimer() { if c.pongTimer != nil { @@ -867,11 +898,7 @@ func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { if packet.EOF { msgBytes := ch.recving - // clear the slice without re-allocating. - // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go - // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes, - // at which point the recving slice stops being used and should be garbage collected - ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity) + ch.recving = make([]byte, 0, ch.desc.RecvBufferCapacity) return msgBytes, nil } return nil, nil From d18fe0db40a1f91f7bd6fd5cfdd48a6438474a92 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman <86499+mhofman@users.noreply.github.com> Date: Thu, 12 Dec 2024 01:18:38 -0300 Subject: [PATCH 2/4] feat(p2p): limit max aggregate size of recv buffer --- p2p/conn/connection.go | 65 +++++++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 974b7f9e458..d78ee59893c 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -86,20 +86,22 @@ Inbound message bytes are handled with an onReceive callback function. type MConnection struct { service.BaseService - conn net.Conn - bufConnReader *bufio.Reader - bufConnWriter *bufio.Writer - sendMonitor *flow.Monitor - recvMonitor *flow.Monitor - send chan struct{} - pong chan struct{} - msgRecvQueue chan ConnMsg - channels []*Channel - channelsIdx map[byte]*Channel - onReceive receiveCbFunc - onError errorCbFunc - errored uint32 - config MConnConfig + conn net.Conn + bufConnReader *bufio.Reader + bufConnWriter *bufio.Writer + sendMonitor *flow.Monitor + recvMonitor *flow.Monitor + send chan struct{} + pong chan struct{} + msgRecvQueue chan ConnMsg + msgRecvAck chan uint32 + msgBytesBufferedLength uint32 + channels []*Channel + channelsIdx map[byte]*Channel + onReceive receiveCbFunc + onError errorCbFunc + errored uint32 + config MConnConfig // Closing quitSendRoutine will cause the sendRoutine to eventually quit. // doneSendRoutine is closed when the sendRoutine actually quits. @@ -146,6 +148,9 @@ type MConnConfig struct { // Maximum number of received messages to buffer from a peer MsgRecvQueueSize int `mapstructure:"msg_recv_queue_size"` + + // Maximum size of the sum of bytes for received messages to the buffer from a peer + RecvMessageCapacity int `mapstructure:"recv_message_capacity"` } // DefaultMConnConfig returns the default config. @@ -158,6 +163,7 @@ func DefaultMConnConfig() MConnConfig { PingInterval: defaultPingInterval, PongTimeout: defaultPongTimeout, MsgRecvQueueSize: defaultMsgRecvQueueSize, + RecvMessageCapacity: defaultRecvMessageCapacity, } } @@ -197,6 +203,7 @@ func NewMConnectionWithConfig( send: make(chan struct{}, 1), pong: make(chan struct{}, 1), msgRecvQueue: make(chan ConnMsg, config.MsgRecvQueueSize), + msgRecvAck: make(chan uint32, config.MsgRecvQueueSize+1), onReceive: onReceive, onError: onError, config: config, @@ -641,6 +648,16 @@ FOR_LOOP: // never block } case *tmp2p.Packet_PacketMsg: + DRAIN_MSG_ACKS_LOOP: + for { + select { + case readSize := <-c.msgRecvAck: + c.msgBytesBufferedLength -= readSize + default: + break DRAIN_MSG_ACKS_LOOP + } + } + channelID := byte(pkt.PacketMsg.ChannelID) channel, ok := c.channelsIdx[channelID] if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil { @@ -659,7 +676,16 @@ FOR_LOOP: break FOR_LOOP } if msgBytes != nil { - c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes) + msgLen := uint32(len(msgBytes)) + bufferMaxSize := uint32(c.config.RecvMessageCapacity) + if msgLen > bufferMaxSize { + bufferMaxSize = msgLen + } + c.msgBytesBufferedLength += msgLen + // Wait until our message fits in the queue + for c.msgBytesBufferedLength > bufferMaxSize { + c.msgBytesBufferedLength -= <-c.msgRecvAck + } connMsg := ConnMsg{channelID, msgBytes} c.msgRecvQueue <- connMsg } @@ -680,6 +706,7 @@ FOR_LOOP: func (c *MConnection) recvMsgRoutine() { defer c._recover() + defer close(c.msgRecvAck) for { connMsg, ok := <-c.msgRecvQueue @@ -689,8 +716,12 @@ func (c *MConnection) recvMsgRoutine() { break } - c.Logger.Debug("Processing message", "chID", connMsg.channelID, "pendingMsgs", len(c.msgRecvQueue)) - // NOTE: This means the reactor.Receive of all channels run in the same thread + msgLen := len(connMsg.msgBytes) + // Will never block as long as msgRecvAck is at least one bigger than msgRecvQueue + c.msgRecvAck <- uint32(msgLen) + + // NOTE: This means the reactor.Receive of all messages each runs in a thread + // per channel c.onReceive(connMsg.channelID, connMsg.msgBytes) } } From 161f5ccd35aa19d1ee52a030a6a2e085021e080b Mon Sep 17 00:00:00 2001 From: JC Date: Thu, 19 Dec 2024 10:35:10 -0600 Subject: [PATCH 3/4] test(p2p): add tests for recv msg routine --- p2p/conn/connection_test.go | 213 ++++++++++++++++++++++++++++++++++++ 1 file changed, 213 insertions(+) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index f59df3dc8b0..38e272633a8 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -618,6 +618,219 @@ func TestMConnectionChannelOverflow(t *testing.T) { } +func TestMConnectionMaxChannelSize(t *testing.T) { + log.TestingLogger().Info("TestMConnectionSendMsgRecvQueueSize") + chOnErr := make(chan struct{}) + chOnRcv := make(chan struct{}) + + mconnClient, mconnServer := newClientAndServerConnsForReadErrorsWithQueueSize(t, chOnErr) + defer mconnClient.Stop() //nolint:errcheck // ignore for tests + defer mconnServer.Stop() //nolint:errcheck // ignore for tests + + onReceiveDone := make(chan struct{}) + + mconnServer.onReceive = func(chID byte, msgBytes []byte) { + log.TestingLogger().Info(":- onReceive called") + <-onReceiveDone + chOnRcv <- struct{}{} + } + + mconnServer._maxPacketMsgSize = mconnClient.config.MaxPacketMsgPayloadSize * 15 + mconnServer.msgRecvQueue = make(chan ConnMsg) + client := mconnClient.conn + protoWriter := protoio.NewDelimitedWriter(client) + + // send msg thats just right + var packet = tmp2p.PacketMsg{ + ChannelID: 0x01, + EOF: true, + Data: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize), + } + var err error + _, err = protoWriter.WriteMsg(mustWrapPacket(&packet)) + require.NoError(t, err) + + _, err = protoWriter.WriteMsg(mustWrapPacket(&packet)) + require.NoError(t, err) + + _, err = protoWriter.WriteMsg(mustWrapPacket(&packet)) + require.Error(t, err, "read/write on closed pipe") + +} + +func TestMConnectionMaxChannelSizeUnreached(t *testing.T) { + log.TestingLogger().Info("TestMConnectionSendMsgRecvQueueSize") + chOnErr := make(chan struct{}) + chOnRcv := make(chan struct{}) + + mconnClient, mconnServer := newClientAndServerConnsForReadErrorsWithQueueSize(t, chOnErr) + defer mconnClient.Stop() //nolint:errcheck // ignore for tests + defer mconnServer.Stop() //nolint:errcheck // ignore for tests + + onReceiveDone := make(chan struct{}) + + mconnServer.onReceive = func(chID byte, msgBytes []byte) { + log.TestingLogger().Info(":- onReceive called") + <-onReceiveDone + chOnRcv <- struct{}{} + } + + mconnServer._maxPacketMsgSize = mconnClient.config.MaxPacketMsgPayloadSize * 15 + mconnServer.msgRecvQueue = make(chan ConnMsg, 3) + client := mconnClient.conn + protoWriter := protoio.NewDelimitedWriter(client) + + // send msg thats just right + var packet = tmp2p.PacketMsg{ + ChannelID: 0x01, + EOF: true, + Data: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize), + } + var err error + _, err = protoWriter.WriteMsg(mustWrapPacket(&packet)) + require.NoError(t, err) + + _, err = protoWriter.WriteMsg(mustWrapPacket(&packet)) + require.NoError(t, err) + + _, err = protoWriter.WriteMsg(mustWrapPacket(&packet)) + require.NoError(t, err) + +} + +func newClientAndServerConnsForReadErrorsWithQueueSize( + t *testing.T, + chOnErr chan struct{}) (*MConnection, *MConnection) { + server, client := NetPipe() + + onReceive := func(chID byte, msgBytes []byte) {} + onError := func(r interface{}) {} + + // create client conn with two channels + chDescs := []*ChannelDescriptor{ + {ID: 0x01, Priority: 1, SendQueueCapacity: 1}, + {ID: 0x02, Priority: 1, SendQueueCapacity: 1}, + } + + mcConfig := DefaultMConnConfig() + mcConfig.MsgRecvQueueSize = 1 + mconnClient := NewMConnection(client, chDescs, onReceive, onError) + mconnClient.SetLogger(log.TestingLogger().With("module", "client")) + err := mconnClient.Start() + require.Nil(t, err) + + // create server conn with 1 channel + // it fires on chOnErr when there's an error + serverLogger := log.TestingLogger().With("module", "server") + onError = func(r interface{}) { + chOnErr <- struct{}{} + } + mconnServer := createMConnectionWithCallbacks(server, onReceive, onError) + mconnServer.SetLogger(serverLogger) + err = mconnServer.Start() + require.Nil(t, err) + return mconnClient, mconnServer +} + +func TestMConnectionReceiveBufferAndCapacity(t *testing.T) { + msgs := [][]byte{[]byte("10000000"), []byte("20000000"), []byte("30000000")} + + testCases := []struct { + name string + isBuffer bool + capacity int + shouldBlock bool + messages [][]byte + }{ + { + name: "happy path", + capacity: 10000, + isBuffer: true, + messages: msgs, + shouldBlock: false, + }, + { + name: "capacity is maxed", + isBuffer: true, + capacity: 9, + messages: msgs, + shouldBlock: true, + }, + { + name: "channel is maxed", + isBuffer: false, + capacity: 2, + messages: msgs, + shouldBlock: true, + }, + { + name: "channel happy path", + isBuffer: false, + capacity: 10000, + messages: msgs, + shouldBlock: false, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + server, client := NetPipe() + defer server.Close() + defer client.Close() + + receivedCh := make(chan []byte) + errorsCh := make(chan interface{}) + onReceiveWork := make(chan struct{}) + onReceive := func(chID byte, msgBytes []byte) { + <-onReceiveWork + receivedCh <- msgBytes + } + onError := func(r interface{}) { + errorsCh <- r + } + mconn1 := createMConnectionWithCallbacks(client, onReceive, onError) + if tc.isBuffer { + mconn1.config.RecvMessageCapacity = tc.capacity + } else { + mconn1.msgRecvQueue = make(chan ConnMsg, tc.capacity) + } + err := mconn1.Start() + require.Nil(t, err) + defer mconn1.Stop() //nolint:errcheck // ignore for tests + + mconn2 := createTestMConnection(server) + err = mconn2.Start() + require.Nil(t, err) + defer mconn2.Stop() //nolint:errcheck // ignore for tests + + msgs := tc.messages + + for _, msg := range msgs { + assert.True(t, mconn2.Send(0x01, msg)) + } + MSG_LOOP: + for _, msg := range msgs { + select { + case receivedBytes := <-receivedCh: + log.TestingLogger().Info(string(receivedBytes)) + t.Fatalf("This should not have been reached") + case err := <-errorsCh: + if !tc.shouldBlock { + t.Fatalf("Expected %s, got %+v", msg, err) + } + break MSG_LOOP + + case <-time.After(100 * time.Millisecond): + log.TestingLogger().Info("TestMConnectionReceive2 timeout") + onReceiveWork <- struct{}{} + } + receivedBytes := <-receivedCh + assert.Equal(t, msg, receivedBytes) + } + }) + } +} + type stopper interface { Stop() error } From 7223d9966fd050be47f83dc9663fc61373a7cc51 Mon Sep 17 00:00:00 2001 From: JC Date: Thu, 19 Dec 2024 10:44:32 -0600 Subject: [PATCH 4/4] test(p2p): check maximum buffer size --- p2p/conn/connection_test.go | 43 +++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 38e272633a8..e153b1cdd8f 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -698,6 +698,49 @@ func TestMConnectionMaxChannelSizeUnreached(t *testing.T) { } +func TestMConnectionMaxByteSizeReached(t *testing.T) { + log.TestingLogger().Info("TestMConnectionSendMsgRecvQueueSize") + chOnErr := make(chan struct{}) + chOnRcv := make(chan struct{}) + + mconnClient, mconnServer := newClientAndServerConnsForReadErrorsWithQueueSize(t, chOnErr) + defer mconnClient.Stop() //nolint:errcheck // ignore for tests + defer mconnServer.Stop() //nolint:errcheck // ignore for tests + + //onReceiveDone := make(chan struct{}) + + mconnServer.onReceive = func(chID byte, msgBytes []byte) { + log.TestingLogger().Info(":- onReceive called") + //<-onReceiveDone + time.Sleep(10 * time.Millisecond) + chOnRcv <- struct{}{} + } + + mconnServer._maxPacketMsgSize = defaultRecvMessageCapacity * 2 + mconnServer.msgRecvQueue = make(chan ConnMsg, 100000) + client := mconnClient.conn + protoWriter := protoio.NewDelimitedWriter(client) + + // send msg thats just right + var packet = tmp2p.PacketMsg{ + ChannelID: 0x01, + EOF: true, + Data: make([]byte, defaultRecvMessageCapacity/2+1000), + } + var err error + _, err = protoWriter.WriteMsg(mustWrapPacket(&packet)) + require.NoError(t, err) + + _, err = protoWriter.WriteMsg(mustWrapPacket(&packet)) + require.NoError(t, err) + + //require.NoError(t, err) + //assert.True(t, expectSend(chOnRcv)) + _, err = protoWriter.WriteMsg(mustWrapPacket(&packet)) + require.Error(t, err, "read/write on closed pipe") + +} + func newClientAndServerConnsForReadErrorsWithQueueSize( t *testing.T, chOnErr chan struct{}) (*MConnection, *MConnection) {