Skip to content

Commit

Permalink
Added context to be done checks when searching for peers (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
icellan authored Sep 3, 2024
1 parent f24f3ab commit b161ae8
Showing 1 changed file with 90 additions and 57 deletions.
147 changes: 90 additions & 57 deletions app/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,23 @@ func (s *Server) Start(ctx context.Context) error {
})

s.config.Services.Log.Debugf("stream handler set")
for !s.connected {
time.Sleep(5 * time.Second)

OUTER:
for {
select {
// If the context is done, stop the service
case <-ctx.Done():
s.config.Services.Log.Infof("stopping p2p service")
return nil
default:
if !s.connected {
time.Sleep(5 * time.Second)
} else {
break OUTER
}
}
}

for _, topicName := range s.topicNames {
var topic *pubsub.Topic
if topic, err = ps.Join(topicName); err != nil {
Expand Down Expand Up @@ -428,6 +442,10 @@ func (s *Server) RunPeerDiscovery(ctx context.Context, routingDiscovery *droutin
}
for {
select {
case <-ctx.Done():
s.config.Services.Log.Infof("stopping peer discovery process")
ticker.Stop()
return
case <-ticker.C:
err := s.discoverPeers(ctx, routingDiscovery)
if err != nil {
Expand Down Expand Up @@ -498,64 +516,79 @@ func (s *Server) discoverPeers(ctx context.Context, routingDiscovery *drouting.R

// Look for others who have announced and attempt to connect to them
connected := 0
for connected < 2 {
for _, topicName := range s.topicNames {
s.config.Services.Log.Debugf("searching for peers for topic %s..\n", topicName)

var peerChan <-chan peer.AddrInfo
var err error
if peerChan, err = routingDiscovery.FindPeers(ctx, topicName, discovery.TTL(1*time.Minute)); err != nil {
return err
}

// Loop through all peers found
for foundPeer := range peerChan {

// Don't connect to ourselves
if foundPeer.ID == s.host.ID() {
continue // No self connection
}

// Failed to connect to peer
s.config.Services.Log.Debugf("attempting connection to %s", foundPeer.ID.String())

if err = s.host.Connect(ctx, foundPeer); err != nil {
// we fail to connect to a lot of peers. Just ignore it for now.
s.config.Services.Log.Debugf("failed connecting to %s, error: %s", foundPeer.ID.String(), err.Error())
continue
}

// Connected to peer
s.config.Services.Log.Infof("connected to: %s", foundPeer.ID.String())

// Open a stream to the peer
var stream network.Stream
if stream, err = s.host.NewStream(ctx, foundPeer.ID, protocol.ID(s.config.P2P.AlertSystemProtocolID)); err != nil {
s.config.Services.Log.Debugf("failed new stream to %s error: %s", foundPeer.ID.String(), err.Error())
continue
}

// Sync the stream thread
t := StreamThread{
config: s.config,
ctx: ctx,
peer: foundPeer.ID,
stream: stream,
quitChannel: s.quitPeerDiscoveryChannel,
}

// Sync the stream thread
if err = t.Sync(ctx); err != nil {
s.config.Services.Log.Debugf("failed to start stream thread to %s error: %s", foundPeer.ID.String(), err.Error())
continue
OUTER:
for {
select {
case <-s.quitPeerDiscoveryChannel:
s.config.Services.Log.Infof("stopping peer discovery process from channel")
return nil
case <-ctx.Done():
s.config.Services.Log.Infof("stopping peer discovery process from context")
return nil
default:
if connected < 2 {
for _, topicName := range s.topicNames {
s.config.Services.Log.Debugf("searching for peers for topic %s..\n", topicName)

var peerChan <-chan peer.AddrInfo
var err error
if peerChan, err = routingDiscovery.FindPeers(ctx, topicName, discovery.TTL(1*time.Minute)); err != nil {
return err
}

// Loop through all peers found
for foundPeer := range peerChan {

// Don't connect to ourselves
if foundPeer.ID == s.host.ID() {
continue // No self connection
}

// Failed to connect to peer
s.config.Services.Log.Debugf("attempting connection to %s", foundPeer.ID.String())

if err = s.host.Connect(ctx, foundPeer); err != nil {
// we fail to connect to a lot of peers. Just ignore it for now.
s.config.Services.Log.Debugf("failed connecting to %s, error: %s", foundPeer.ID.String(), err.Error())
continue
}

// Connected to peer
s.config.Services.Log.Infof("connected to: %s", foundPeer.ID.String())

// Open a stream to the peer
var stream network.Stream
if stream, err = s.host.NewStream(ctx, foundPeer.ID, protocol.ID(s.config.P2P.AlertSystemProtocolID)); err != nil {
s.config.Services.Log.Debugf("failed new stream to %s error: %s", foundPeer.ID.String(), err.Error())
continue
}

// Sync the stream thread
t := StreamThread{
config: s.config,
ctx: ctx,
peer: foundPeer.ID,
stream: stream,
quitChannel: s.quitPeerDiscoveryChannel,
}

// Sync the stream thread
if err = t.Sync(ctx); err != nil {
s.config.Services.Log.Debugf("failed to start stream thread to %s error: %s", foundPeer.ID.String(), err.Error())
continue
}

s.config.Services.Log.Infof("successfully synced up to %d from peer %s", t.LatestSequence(), foundPeer.ID.String())

// Set the flag
connected++
}
time.Sleep(1 * time.Second)
}

s.config.Services.Log.Infof("successfully synced up to %d from peer %s", t.LatestSequence(), foundPeer.ID.String())

// Set the flag
connected++
} else {
break OUTER
}
time.Sleep(1 * time.Second)
}
}

Expand Down

0 comments on commit b161ae8

Please sign in to comment.