Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: subscribing to libp2p eventbus #831

Merged
merged 8 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@
## rotate_log_after_days = 1

# `compress` determines if the rotated log files should be compressed.
# Default is `false`.
## compress = false
# Default is `true`.
## compress = true

# `logger.levels` contains the level of logger per module.
# Available log levels are:
Expand Down
19 changes: 17 additions & 2 deletions network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
const (
EventTypeConnect EventType = 1
EventTypeDisconnect EventType = 2
EventTypeGossip EventType = 3
EventTypeStream EventType = 4
EventTypeProtocols EventType = 3
EventTypeGossip EventType = 4
EventTypeStream EventType = 5
)

func (t EventType) String() string {
Expand All @@ -38,6 +39,8 @@
return "connect"
case EventTypeDisconnect:
return "disconnect"
case EventTypeProtocols:
return "protocols"

Check warning on line 43 in network/interface.go

View check run for this annotation

Codecov / codecov/patch

network/interface.go#L42-L43

Added lines #L42 - L43 were not covered by tests
case EventTypeGossip:
return "gossip-msg"
case EventTypeStream:
Expand Down Expand Up @@ -76,6 +79,7 @@
type ConnectEvent struct {
PeerID lp2pcore.PeerID
RemoteAddress string
Direction string
}

func (*ConnectEvent) Type() EventType {
Expand All @@ -91,6 +95,17 @@
return EventTypeDisconnect
}

// ProtocolsEvents represents updating protocols event.
type ProtocolsEvents struct {
PeerID lp2pcore.PeerID
Protocols []string
SupportStream bool
}

func (*ProtocolsEvents) Type() EventType {
return EventTypeProtocols
}

// ShouldPropagate determines whether a message should be disregarded:
// it will be neither delivered to the application nor forwarded to the network.
type ShouldPropagate func(*GossipMessage) bool
Expand Down
6 changes: 5 additions & 1 deletion network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo
n.mdns = newMdnsService(ctx, n.host, n.logger)
}
n.dht = newDHTService(n.ctx, n.host, kadProtocolID, isBootstrapper, conf, n.logger)
n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, conf, n.logger)
n.peerMgr = newPeerMgr(ctx, host, conf, n.logger)
n.stream = newStreamService(ctx, n.host, streamProtocolID, n.eventChannel, n.logger)
n.gossip = newGossipService(ctx, n.host, n.eventChannel, isBootstrapper, n.logger)
n.notifee = newNotifeeService(ctx, n.host, n.eventChannel, n.peerMgr, streamProtocolID, isBootstrapper, n.logger)
Expand Down Expand Up @@ -354,9 +354,13 @@ func (n *network) TopicName(topic string) string {
}

func (n *network) CloseConnection(pid lp2ppeer.ID) {
n.logger.Debug("closing connection", "pid", pid)

if err := n.host.Network().ClosePeer(pid); err != nil {
n.logger.Warn("unable to close connection", "peer", pid)
}

n.logger.Debug("connection closed", "pid", pid)
}

func (n *network) String() string {
Expand Down
57 changes: 31 additions & 26 deletions network/notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ type NotifeeService struct {
func newNotifeeService(ctx context.Context, host lp2phost.Host, eventChannel chan<- Event, peerMgr *peerMgr,
protocolID lp2pcore.ProtocolID, bootstrapper bool, log *logger.SubLogger,
) *NotifeeService {
eventSub, err := host.EventBus().Subscribe(lp2pevent.WildcardSubscription)
events := []interface{}{
new(lp2pevent.EvtLocalReachabilityChanged),
new(lp2pevent.EvtPeerIdentificationCompleted),
new(lp2pevent.EvtPeerProtocolsUpdated),
}
eventSub, err := host.EventBus().Subscribe(events)
if err != nil {
logger.Error("failed to register for libp2p events")
}
Expand All @@ -47,30 +52,20 @@ func newNotifeeService(ctx context.Context, host lp2phost.Host, eventChannel cha

func (s *NotifeeService) Start() {
go func() {
defer s.lp2pEventSub.Close()

for {
select {
case evt := <-s.lp2pEventSub.Out():
switch e := evt.(type) {
case lp2pevent.EvtLocalReachabilityChanged:
s.logger.Info("reachability changed", "reachability", e.Reachability)

case lp2pevent.EvtPeerConnectednessChanged:
s.logger.Debug("connectedness changed", "pid", e.Peer, "connectedness", e.Connectedness)
if e.Connectedness == lp2pnetwork.Connected {
s.sendConnectEvent(e.Peer)
} else if e.Connectedness == lp2pnetwork.NotConnected {
s.sendDisconnectEvent(e.Peer)
}

case lp2pevent.EvtPeerIdentificationCompleted:
s.logger.Debug("identification completed", "pid", e.Peer)
s.sendConnectEvent(e.Peer)
s.sendProtocolsEvent(e.Peer)

case lp2pevent.EvtPeerProtocolsUpdated:
s.logger.Debug("protocols updated", "pid", e.Peer, "protocols", e.Added)
s.sendConnectEvent(e.Peer)
s.sendProtocolsEvent(e.Peer)

default:
s.logger.Debug("unhandled libp2p event", "event", evt)
Expand All @@ -84,14 +79,15 @@ func (s *NotifeeService) Start() {
}

func (s *NotifeeService) Stop() {
s.lp2pEventSub.Close()
}

func (s *NotifeeService) Connected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) {
pid := conn.RemotePeer()
s.logger.Info("connected to peer", "pid", pid, "direction", conn.Stat().Direction)

s.peerMgr.AddPeer(pid, conn.RemoteMultiaddr(), conn.Stat().Direction)
s.sendConnectEvent(pid)
s.sendConnectEvent(pid, conn.RemoteMultiaddr(), conn.Stat().Direction)
}

func (s *NotifeeService) Disconnected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) {
Expand All @@ -113,20 +109,29 @@ func (s *NotifeeService) ListenClose(_ lp2pnetwork.Network, ma multiaddr.Multiad
s.logger.Debug("notifee ListenClose event emitted", "addr", ma.String())
}

func (s *NotifeeService) sendConnectEvent(pid lp2pcore.PeerID) {
protocols, err := s.host.Peerstore().GetProtocols(pid)
if err != nil {
s.logger.Error("unable to get supported protocols", "pid", pid)
func (s *NotifeeService) sendProtocolsEvent(pid lp2pcore.PeerID) {
protocols, _ := s.host.Peerstore().GetProtocols(pid)
protocolsStr := []string{}
for _, p := range protocols {
protocolsStr = append(protocolsStr, string(p))
}

slices.Sort(protocolsStr)
supportStream := slices.Contains(protocols, s.streamProtocolID)
if supportStream {
addr := s.peerMgr.GetMultiAddr(pid)
if supportStream && addr != nil {
s.eventChannel <- &ConnectEvent{
PeerID: pid,
RemoteAddress: addr.String(),
}
}
s.eventChannel <- &ProtocolsEvents{
PeerID: pid,
Protocols: protocolsStr,
SupportStream: supportStream,
}
}

func (s *NotifeeService) sendConnectEvent(pid lp2pcore.PeerID,
remoteAddress multiaddr.Multiaddr, direction lp2pnetwork.Direction,
) {
s.eventChannel <- &ConnectEvent{
PeerID: pid,
RemoteAddress: remoteAddress.String(),
Direction: direction.String(),
}
}

Expand Down
20 changes: 7 additions & 13 deletions network/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"sync"
"time"

lp2pdht "github.com/libp2p/go-libp2p-kad-dht"
lp2phost "github.com/libp2p/go-libp2p/core/host"
lp2pnet "github.com/libp2p/go-libp2p/core/network"
lp2ppeer "github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -29,13 +28,12 @@
minConns int
maxConns int
host lp2phost.Host
dht *lp2pdht.IpfsDHT
peers map[lp2ppeer.ID]*peerInfo
logger *logger.SubLogger
}

// newPeerMgr creates a new Peer Manager instance.
func newPeerMgr(ctx context.Context, h lp2phost.Host, dht *lp2pdht.IpfsDHT,
func newPeerMgr(ctx context.Context, h lp2phost.Host,
conf *Config, log *logger.SubLogger,
) *peerMgr {
b := &peerMgr{
Expand All @@ -45,7 +43,6 @@
maxConns: conf.MaxConns,
peers: make(map[lp2ppeer.ID]*peerInfo),
host: h,
dht: dht,
logger: log,
}

Expand Down Expand Up @@ -128,9 +125,6 @@
connectedness := net.Connectedness(pid)
if connectedness == lp2pnet.Connected {
connectedPeers = append(connectedPeers, pid)
} else {
mgr.logger.Debug("peer is not connected to us", "peer", pid)
delete(mgr.peers, pid)
}
}

Expand All @@ -146,20 +140,20 @@
"count", len(connectedPeers),
"min", mgr.minConns)

for _, pi := range mgr.bootstrapAddrs {
mgr.logger.Debug("try connecting to a bootstrap peer", "peer", pi.String())
for _, ai := range mgr.bootstrapAddrs {
mgr.logger.Debug("try connecting to a bootstrap peer", "peer", ai.String())

// Don't try to connect to an already connected peer.
if HasPID(connectedPeers, pi.ID) {
mgr.logger.Trace("already connected", "peer", pi.String())
if HasPID(connectedPeers, ai.ID) {
mgr.logger.Trace("already connected", "peer", ai.String())

Check warning on line 148 in network/peermgr.go

View check run for this annotation

Codecov / codecov/patch

network/peermgr.go#L148

Added line #L148 was not covered by tests
continue
}

if swarm, ok := mgr.host.Network().(*lp2pswarm.Swarm); ok {
swarm.Backoff().Clear(pi.ID)
swarm.Backoff().Clear(ai.ID)
}

ConnectAsync(mgr.ctx, mgr.host, pi, mgr.logger)
ConnectAsync(mgr.ctx, mgr.host, ai, mgr.logger)
}
}
}
2 changes: 1 addition & 1 deletion sync/handler_block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ func TestBroadcastingBlockAnnounceMessages(t *testing.T) {
msg := message.NewBlockAnnounceMessage(blk, cert)
td.sync.broadcast(msg)

msg1 := td.shouldPublishMessageWithThisType(t, td.network, message.TypeBlockAnnounce)
msg1 := td.shouldPublishMessageWithThisType(t, message.TypeBlockAnnounce)
assert.Equal(t, msg1.Message.(*message.BlockAnnounceMessage).Certificate.Height(), msg.Certificate.Height())
}
40 changes: 19 additions & 21 deletions sync/handler_blocks_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("unknown peer (%s)", pid.String()), msg.SessionID, 0, nil, nil)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil
}

if !p.IsKnownOrTrusty() {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("not handshaked (%s)", p.Status.String()), msg.SessionID, 0, nil, nil)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil

Check warning on line 40 in sync/handler_blocks_request.go

View check run for this annotation

Codecov / codecov/patch

sync/handler_blocks_request.go#L39-L40

Added lines #L39 - L40 were not covered by tests
}

if !handler.config.NodeNetwork {
Expand All @@ -44,7 +46,8 @@
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("the request height is not acceptable: %v", msg.From), msg.SessionID, 0, nil, nil)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil
}
}
height := msg.From
Expand All @@ -54,7 +57,8 @@
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("too many blocks requested: %v-%v", msg.From, msg.Count), msg.SessionID, 0, nil, nil)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil
}

// Help this peer to sync up
Expand All @@ -67,54 +71,48 @@

response := message.NewBlocksResponseMessage(message.ResponseCodeMoreBlocks,
message.ResponseCodeMoreBlocks.String(), msg.SessionID, height, blocksData, nil)
err := handler.respond(response, pid)
if err != nil {
return err
}
handler.respond(response, pid)

height += uint32(len(blocksData))
count -= uint32(len(blocksData))
if count <= 0 {
break
}
}
// To avoid sending blocks again, we update height for this peer
// Height is always greater than zeo.
peerHeight := height - 1

if msg.To() >= handler.state.LastBlockHeight() {
lastCert := handler.state.LastCertificate()
response := message.NewBlocksResponseMessage(message.ResponseCodeSynced,
message.ResponseCodeSynced.String(), msg.SessionID, peerHeight, nil, lastCert)
message.ResponseCodeSynced.String(), msg.SessionID, lastCert.Height(), nil, lastCert)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil
}

response := message.NewBlocksResponseMessage(message.ResponseCodeNoMoreBlocks,
message.ResponseCodeNoMoreBlocks.String(), msg.SessionID, 0, nil, nil)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil
}

func (handler *blocksRequestHandler) PrepareBundle(m message.Message) *bundle.Bundle {
return bundle.NewBundle(m)
}

func (handler *blocksRequestHandler) respond(msg *message.BlocksResponseMessage, to peer.ID) error {
func (handler *blocksRequestHandler) respond(msg *message.BlocksResponseMessage, to peer.ID) {
if msg.ResponseCode == message.ResponseCodeRejected {
handler.logger.Debug("rejecting block request message", "msg", msg,
"to", to, "reason", msg.Reason)

_ = handler.sendTo(msg, to)
handler.sendTo(msg, to)

// There is no point in keeping this stream connection open.
// Close this connection to initiate a new handshake.
handler.network.CloseConnection(to)
} else {
handler.logger.Info("responding block request message", "msg", msg, "to", to)

return nil
handler.sendTo(msg, to)
}

handler.logger.Info("responding block request message", "msg", msg, "to", to)

return handler.sendTo(msg, to)
}
Loading
Loading