Skip to content

Commit

Permalink
fix(share/discovery): add loop to read events from the channel multip…
Browse files Browse the repository at this point in the history
…le times (#1684)
  • Loading branch information
vgonkivs authored Feb 2, 2023
1 parent a60c24f commit 484933e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 20 deletions.
1 change: 1 addition & 0 deletions nodebuilder/tests/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func TestRestartNodeDiscovery(t *testing.T) {
select {
case <-ctx.Done():
require.True(t, nodes[0].Host.Network().Connectedness(node.Host.ID()) == network.Connected)
return
case conn := <-connectSub.Out():
status := conn.(event.EvtPeerConnectednessChanged)
if status.Peer != node.Host.ID() {
Expand Down
42 changes: 22 additions & 20 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,27 +143,29 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
// starting to listen to subscriptions async will help us to avoid any blocking
// in the case when we will not have the needed amount of FNs and will be blocked in `FindPeers`.
go func() {
select {
case <-ctx.Done():
log.Debug("Context canceled. Finish listening for connectedness events.")
return
case e, ok := <-sub.Out():
if !ok {
log.Debug("Subscription for connectedness events is closed.")
for {
select {
case <-ctx.Done():
log.Debug("Context canceled. Finish listening for connectedness events.")
return
}
// listen to disconnect event to remove peer from set and reset backoff time
// reset timer in order to restart the discovery, once stored peer is disconnected
connStatus := e.(event.EvtPeerConnectednessChanged)
if connStatus.Connectedness == network.NotConnected {
if d.set.Contains(connStatus.Peer) {
log.Debugw("removing the peer from the peer set",
"peer", connStatus.Peer, "status", connStatus.Connectedness.String())
d.connector.RestartBackoff(connStatus.Peer)
d.set.Remove(connStatus.Peer)
d.onUpdatedPeers(connStatus.Peer, false)
d.host.ConnManager().UntagPeer(connStatus.Peer, topic)
t.Reset(d.discoveryInterval)
case e, ok := <-sub.Out():
if !ok {
log.Debug("Subscription for connectedness events is closed.")
return
}
// listen to disconnect event to remove peer from set and reset backoff time
// reset timer in order to restart the discovery, once stored peer is disconnected
connStatus := e.(event.EvtPeerConnectednessChanged)
if connStatus.Connectedness == network.NotConnected {
if d.set.Contains(connStatus.Peer) {
log.Debugw("removing the peer from the peer set",
"peer", connStatus.Peer, "status", connStatus.Connectedness.String())
d.connector.RestartBackoff(connStatus.Peer)
d.set.Remove(connStatus.Peer)
d.onUpdatedPeers(connStatus.Peer, false)
d.host.ConnManager().UntagPeer(connStatus.Peer, topic)
t.Reset(d.discoveryInterval)
}
}
}
}
Expand Down

0 comments on commit 484933e

Please sign in to comment.