diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 1f1dad5bfe..3d628cbda9 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -86,22 +86,22 @@ Inbound message bytes are handled with an onReceive callback function. type MConnection struct { service.BaseService - conn net.Conn - bufConnReader *bufio.Reader - bufConnWriter *bufio.Writer - sendMonitor *flow.Monitor - recvMonitor *flow.Monitor - send chan struct{} - pong chan struct{} - msgRecvQueue chan ConnMsg - msgRecvAck chan uint32 - msgRecvSize uint32 - channels []*Channel - channelsIdx map[byte]*Channel - onReceive receiveCbFunc - onError errorCbFunc - errored uint32 - config MConnConfig + conn net.Conn + bufConnReader *bufio.Reader + bufConnWriter *bufio.Writer + sendMonitor *flow.Monitor + recvMonitor *flow.Monitor + send chan struct{} + pong chan struct{} + msgRecvQueue chan ConnMsg + msgRecvAck chan uint32 + msgBytesBufferedLength uint32 + channels []*Channel + channelsIdx map[byte]*Channel + onReceive receiveCbFunc + onError errorCbFunc + errored uint32 + config MConnConfig // Closing quitSendRoutine will cause the sendRoutine to eventually quit. // doneSendRoutine is closed when the sendRoutine actually quits. @@ -651,7 +651,7 @@ FOR_LOOP: for { select { case readSize := <-c.msgRecvAck: - c.msgRecvSize -= readSize + c.msgBytesBufferedLength -= readSize default: break DRAIN_MSG_ACKS_LOOP } @@ -680,11 +680,11 @@ FOR_LOOP: if msgLen > bufferMaxSize { bufferMaxSize = msgLen } - c.msgRecvSize += msgLen - c.Logger.Debug("Received bytes", "chID", channelID, "msgBytesLength", msgLen, "msgBytesBufferedLength", c.msgRecvSize, "pendingMsgsAhead", len(c.msgRecvQueue)) + c.msgBytesBufferedLength += msgLen + c.Logger.Debug("Received bytes", "chID", channelID, "msgBytesLength", msgLen, "msgBytesBufferedLength", c.msgBytesBufferedLength, "pendingMsgsAhead", len(c.msgRecvQueue)) // Wait until our message fits in the queue - for c.msgRecvSize > bufferMaxSize { - c.msgRecvSize -= <-c.msgRecvAck + for c.msgBytesBufferedLength > bufferMaxSize { + c.msgBytesBufferedLength -= <-c.msgRecvAck } connMsg := ConnMsg{channelID, msgBytes} c.msgRecvQueue <- connMsg @@ -718,10 +718,10 @@ func (c *MConnection) recvMsgRoutine() { msgLen := len(connMsg.msgBytes) // Will never block as long as msgRecvAck is at least one bigger than msgRecvQueue c.msgRecvAck <- uint32(msgLen) - // The following logging of c.msgRecvSize is not a correct representation of the current aggregate size + // The following logging of c.msgBytesBufferedLength is not a correct representation of the current aggregate size // of the queue but is instead the size of the queue as accounted by the producer routine (trailing behind) // However for debug logging it may be enough - c.Logger.Debug("Processing message", "chID", connMsg.channelID, "msgBytesBufferedLength", c.msgRecvSize, "pendingMsgs", len(c.msgRecvQueue)) + c.Logger.Debug("Processing message", "chID", connMsg.channelID, "msgBytesBufferedLength", c.msgBytesBufferedLength, "pendingMsgs", len(c.msgRecvQueue)) // NOTE: This means the reactor.Receive of all channels run in the same thread c.onReceive(connMsg.channelID, connMsg.msgBytes) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index aa106da237..8c7db63f0a 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -775,10 +775,6 @@ func newClientAndServerConnsForReadErrorsWithQueueSize( return mconnClient, mconnServer } -func TestNewMConnectionNoOpBehavior(t *testing.T) { - -} - func TestMConnectionReceiveBufferAndCapacity(t *testing.T) { msgs := [][]byte{[]byte("10000000"), []byte("20000000"), []byte("30000000")}