-
Notifications
You must be signed in to change notification settings - Fork 129
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(dot/network): fix bugs in notifications protocol handlers; add me…
…trics for inbound/outbound streams (#2010)
- Loading branch information
Showing
17 changed files
with
443 additions
and
301 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
// Copyright 2021 ChainSafe Systems (ON) | ||
// SPDX-License-Identifier: LGPL-3.0-only | ||
|
||
package network | ||
|
||
import ( | ||
"errors" | ||
) | ||
|
||
var ( | ||
errCannotValidateHandshake = errors.New("failed to validate handshake") | ||
errMessageTypeNotValid = errors.New("message type is not valid") | ||
errMessageIsNotHandshake = errors.New("failed to convert message to Handshake") | ||
errMissingHandshakeMutex = errors.New("outboundHandshakeMutex does not exist") | ||
errInvalidHandshakeForPeer = errors.New("peer previously sent invalid handshake") | ||
errHandshakeTimeout = errors.New("handshake timeout reached") | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
// Copyright 2021 ChainSafe Systems (ON) | ||
// SPDX-License-Identifier: LGPL-3.0-only | ||
|
||
package network | ||
|
||
import ( | ||
libp2pnetwork "github.com/libp2p/go-libp2p-core/network" | ||
) | ||
|
||
func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) { | ||
// we NEED to reset the stream if we ever return from this function, as if we return, | ||
// the stream will never again be read by us, so we need to tell the remote side we're | ||
// done with this stream, and they should also forget about it. | ||
defer s.resetInboundStream(stream) | ||
s.streamManager.logNewStream(stream) | ||
|
||
peer := stream.Conn().RemotePeer() | ||
msgBytes := s.bufPool.get() | ||
defer s.bufPool.put(msgBytes) | ||
|
||
for { | ||
n, err := readStream(stream, msgBytes[:]) | ||
if err != nil { | ||
logger.Tracef( | ||
"failed to read from stream id %s of peer %s using protocol %s: %s", | ||
stream.ID(), stream.Conn().RemotePeer(), stream.Protocol(), err) | ||
return | ||
} | ||
|
||
s.streamManager.logMessageReceived(stream.ID()) | ||
|
||
// decode message based on message type | ||
msg, err := decoder(msgBytes[:n], peer, isInbound(stream)) // stream should always be inbound if it passes through service.readStream | ||
if err != nil { | ||
logger.Tracef("failed to decode message from stream id %s using protocol %s: %s", | ||
stream.ID(), stream.Protocol(), err) | ||
continue | ||
} | ||
|
||
logger.Tracef( | ||
"host %s received message from peer %s: %s", | ||
s.host.id(), peer, msg) | ||
|
||
if err = handler(stream, msg); err != nil { | ||
logger.Tracef("failed to handle message %s from stream id %s: %s", msg, stream.ID(), err) | ||
return | ||
} | ||
|
||
s.host.bwc.LogRecvMessage(int64(n)) | ||
} | ||
} | ||
|
||
func (s *Service) resetInboundStream(stream libp2pnetwork.Stream) { | ||
protocolID := stream.Protocol() | ||
peerID := stream.Conn().RemotePeer() | ||
|
||
s.notificationsMu.Lock() | ||
defer s.notificationsMu.Unlock() | ||
|
||
for _, prtl := range s.notificationsProtocols { | ||
if prtl.protocolID != protocolID { | ||
continue | ||
} | ||
|
||
prtl.inboundHandshakeData.Delete(peerID) | ||
break | ||
} | ||
|
||
logger.Debugf( | ||
"cleaning up inbound handshake data for protocol=%s, peer=%s", | ||
stream.Protocol(), | ||
peerID, | ||
) | ||
|
||
_ = stream.Reset() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.