Skip to content

Commit

Permalink
fix data race issues (#145)
Browse files Browse the repository at this point in the history
* fix: fix data race in DHT connected

* fix: fix data race in peer discovery

* chore: clean
  • Loading branch information
gokutheengineer authored Oct 21, 2024
1 parent a4c51f4 commit fa2208f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
9 changes: 5 additions & 4 deletions app/p2p/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/bitcoin-sv/alert-system/app/config"
Expand Down Expand Up @@ -47,8 +48,8 @@ func (s *Server) initDHT(ctx context.Context) (*dht.IpfsDHT, error) {
}

// Connect to the chosen ipfs nodes
connected := false
for !connected {
connected := uint32(0)
for atomic.LoadUint32(&connected) == 0 {
select {
case <-s.quitPeerInitializationChannel:
return kademliaDHT, nil
Expand All @@ -60,14 +61,14 @@ func (s *Server) initDHT(ctx context.Context) (*dht.IpfsDHT, error) {
return nil, err
}
wg.Add(1)
go func(logger config.LoggerInterface) {
go func(logger config.LoggerInterface) { // connect to peer in a goroutine
defer wg.Done()
if err = s.host.Connect(ctx, *peerInfo); err != nil {
logger.Errorf("bootstrap warning: %s", err.Error())
return
}
logger.Infof("connected to peer %v", peerInfo.ID)
connected = true
atomic.StoreUint32(&connected, 1)
}(logger)
}
time.Sleep(1 * time.Second)
Expand Down
12 changes: 7 additions & 5 deletions app/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ func (s *Server) Start(ctx context.Context) error {
dutil.Advertise(ctx, routingDiscovery, topicName)
}

s.quitPeerDiscoveryChannel = s.RunPeerDiscovery(ctx, routingDiscovery)
// initialize the channel before use in discoverPeers is called
s.RunPeerDiscovery(ctx, routingDiscovery)
s.quitAlertProcessingChannel = s.RunAlertProcessingCron(ctx)

ps, err := pubsub.NewGossipSub(ctx, s.host, pubsub.WithDiscovery(routingDiscovery))
Expand Down Expand Up @@ -438,9 +439,11 @@ func (s *Server) processAlerts(ctx context.Context) error {
}

// RunPeerDiscovery starts a cron job to resync peers and update routable peers
func (s *Server) RunPeerDiscovery(ctx context.Context, routingDiscovery *drouting.RoutingDiscovery) chan bool {
func (s *Server) RunPeerDiscovery(ctx context.Context, routingDiscovery *drouting.RoutingDiscovery) {
ticker := time.NewTicker(s.config.P2P.PeerDiscoveryInterval)
quit := make(chan bool, 1)

// assign quit channel before any go routines are started
s.quitPeerDiscoveryChannel = make(chan bool, 1)
go func() {
err := s.discoverPeers(ctx, routingDiscovery)
if err != nil {
Expand All @@ -457,14 +460,13 @@ func (s *Server) RunPeerDiscovery(ctx context.Context, routingDiscovery *droutin
if err != nil {
s.config.Services.Log.Errorf("error discovering peers: %v", err.Error())
}
case <-quit:
case <-s.quitPeerDiscoveryChannel:
s.config.Services.Log.Infof("stopping peer discovery process")
ticker.Stop()
return
}
}
}()
return quit
}

// generatePrivateKey generates a private key and stores it in `private_key` file
Expand Down

0 comments on commit fa2208f

Please sign in to comment.