diff --git a/core/node/groups.go b/core/node/groups.go index 5e87cf0e3f0..def812b905a 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -174,7 +174,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics), maybeProvide(libp2p.NatPortMap, !cfg.Swarm.DisableNatPortMap), maybeProvide(libp2p.AutoRelay(cfg.Swarm.RelayClient.StaticRelays, peerChan), enableRelayClient), - maybeInvoke(libp2p.AutoRelayFeeder, enableRelayClient), + maybeInvoke(libp2p.AutoRelayFeeder(cfg.Peering), enableRelayClient), autonat, connmgr, ps, diff --git a/core/node/libp2p/routing.go b/core/node/libp2p/routing.go index cf08b05788c..3f0243ab9c6 100644 --- a/core/node/libp2p/routing.go +++ b/core/node/libp2p/routing.go @@ -2,11 +2,14 @@ package libp2p import ( "context" + "fmt" + "runtime/debug" "sort" "time" "github.com/ipfs/go-ipfs/core/node/helpers" + config "github.com/ipfs/go-ipfs/config" "github.com/ipfs/go-ipfs/repo" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" @@ -182,56 +185,79 @@ func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) ( }, psRouter, nil } -func AutoRelayFeeder(lc fx.Lifecycle, h host.Host, peerChan chan peer.AddrInfo, dht *ddht.DHT) { - ctx, cancel := context.WithCancel(context.Background()) - - done := make(chan struct{}) - go func() { - defer close(done) - - // Feed peers more often right after the bootstrap, then backoff - bo := backoff.NewExponentialBackOff() - bo.InitialInterval = 15 * time.Second - bo.Multiplier = 3 - bo.MaxInterval = 1 * time.Hour - bo.MaxElapsedTime = 0 // never stop - t := backoff.NewTicker(bo) - defer t.Stop() - for { - select { - case <-t.C: - case <-ctx.Done(): - return - } - if dht == nil { - /* noop due to missing dht.WAN. happens in some unit tests, - not worth fixing as we will refactor this after go-libp2p 0.20 */ - continue - } - closestPeers, err := dht.WAN.GetClosestPeers(ctx, h.ID().String()) - if err != nil { - // usually 'failed to find any peer in table', no no-op - continue +func AutoRelayFeeder(cfgPeering config.Peering) func(fx.Lifecycle, host.Host, chan peer.AddrInfo, *ddht.DHT) { + return func(lc fx.Lifecycle, h host.Host, peerChan chan peer.AddrInfo, dht *ddht.DHT) { + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + + defer func() { + if r := recover(); r != nil { + fmt.Println("Recovering from unexpected error in AutoRelayFeeder:", r) + debug.PrintStack() } - for _, p := range closestPeers { - addrs := h.Peerstore().Addrs(p) - if len(addrs) == 0 { - continue - } + }() + go func() { + defer close(done) + + // Feed peers more often right after the bootstrap, then backoff + bo := backoff.NewExponentialBackOff() + bo.InitialInterval = 15 * time.Second + bo.Multiplier = 3 + bo.MaxInterval = 1 * time.Hour + bo.MaxElapsedTime = 0 // never stop + t := backoff.NewTicker(bo) + defer t.Stop() + for { select { - case peerChan <- peer.AddrInfo{ID: p, Addrs: addrs}: + case <-t.C: case <-ctx.Done(): return } + + // Always feed trusted IDs (Peering.Peers in the config) + for _, trustedPeer := range cfgPeering.Peers { + if len(trustedPeer.Addrs) == 0 { + continue + } + select { + case peerChan <- trustedPeer: + case <-ctx.Done(): + return + } + } + + // Additionally, feed closest peers discovered via DHT + if dht == nil { + /* noop due to missing dht.WAN. happens in some unit tests, + not worth fixing as we will refactor this after go-libp2p 0.20 */ + continue + } + closestPeers, err := dht.WAN.GetClosestPeers(ctx, h.ID().String()) + if err != nil { + // no-op: usually 'failed to find any peer in table' during startup + continue + } + for _, p := range closestPeers { + addrs := h.Peerstore().Addrs(p) + if len(addrs) == 0 { + continue + } + dhtPeer := peer.AddrInfo{ID: p, Addrs: addrs} + select { + case peerChan <- dhtPeer: + case <-ctx.Done(): + return + } + } } - } - }() + }() - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - cancel() - <-done - return nil - }, - }) + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + cancel() + <-done + return nil + }, + }) + } }