Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use Fx to start and stop the host, swarm, autorelay and quicreuse #2118

Merged
merged 16 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 138 additions & 94 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"context"
"crypto/rand"
"errors"
"fmt"
Expand Down Expand Up @@ -38,6 +39,7 @@ import (

ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/quic-go/quic-go"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
)
Expand Down Expand Up @@ -190,20 +192,11 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa
return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...)
}

func (cfg *Config) addTransports(h host.Host) error {
swrm, ok := h.Network().(transport.TransportNetwork)
if !ok {
// Should probably skip this if no transports.
return fmt.Errorf("swarm does not support transports")
}

func (cfg *Config) addTransports() ([]fx.Option, error) {
fxopts := []fx.Option{
fx.WithLogger(func() fxevent.Logger { return getFXLogger() }),
fx.Provide(fx.Annotate(tptu.New, fx.ParamTags(`name:"security"`))),
fx.Supply(cfg.Muxers),
fx.Supply(h.ID()),
fx.Provide(func() host.Host { return h }),
fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }),
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
Expand Down Expand Up @@ -265,56 +258,38 @@ func (cfg *Config) addTransports(h host.Host) error {
if cfg.QUICReuse != nil {
fxopts = append(fxopts, cfg.QUICReuse...)
} else {
fxopts = append(fxopts, fx.Provide(quicreuse.NewConnManager)) // TODO: close the ConnManager when shutting down the node
fxopts = append(fxopts,
fx.Provide(func(key quic.StatelessResetKey, tokenGenerator quic.TokenGeneratorKey, _ *swarm.Swarm, lifecycle fx.Lifecycle) (*quicreuse.ConnManager, error) {
cm, err := quicreuse.NewConnManager(key, tokenGenerator)
if err != nil {
return nil, err
}
lifecycle.Append(fx.StopHook(cm.Close))
return cm, nil
}),
)
}

fxopts = append(fxopts, fx.Invoke(
fx.Annotate(
func(tpts []transport.Transport) error {
func(swrm *swarm.Swarm, tpts []transport.Transport) error {
for _, t := range tpts {
if err := swrm.AddTransport(t); err != nil {
return err
}
}
return nil
},
fx.ParamTags(`group:"transport"`),
fx.ParamTags("", `group:"transport"`),
)),
)
if cfg.Relay {
fxopts = append(fxopts, fx.Invoke(circuitv2.AddTransport))
}
app := fx.New(fxopts...)
if err := app.Err(); err != nil {
h.Close()
return err
}
return nil
return fxopts, nil
}

// NewNode constructs a new libp2p Host from the Config.
//
// This function consumes the config. Do not reuse it (really!).
func (cfg *Config) NewNode() (host.Host, error) {
// If possible check that the resource manager conn limit is higher than the
// limit set in the conn manager.
if l, ok := cfg.ResourceManager.(connmgr.GetConnLimiter); ok {
err := cfg.ConnManager.CheckLimit(l)
if err != nil {
log.Warn(fmt.Sprintf("rcmgr limit conflicts with connmgr limit: %v", err))
}
}

eventBus := eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer))))
swrm, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
if err != nil {
return nil, err
}

if !cfg.DisableMetrics {
rcmgr.MustRegisterWith(cfg.PrometheusRegisterer)
}

sukunrt marked this conversation as resolved.
Show resolved Hide resolved
func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.BasicHost, error) {
h, err := bhost.NewHost(swrm, &bhost.HostOpts{
EventBus: eventBus,
ConnManager: cfg.ConnManager,
Expand All @@ -331,10 +306,8 @@ func (cfg *Config) NewNode() (host.Host, error) {
PrometheusRegisterer: cfg.PrometheusRegisterer,
})
if err != nil {
swrm.Close()
return nil, err
}

if cfg.Relay {
// If we've enabled the relay, we should filter out relay
// addresses by default.
Expand All @@ -345,60 +318,137 @@ func (cfg *Config) NewNode() (host.Host, error) {
return oldFactory(autorelay.Filter(addrs))
}
}
return h, nil
}

if err := cfg.addTransports(h); err != nil {
h.Close()
return nil, err
// NewNode constructs a new libp2p Host from the Config.
//
// This function consumes the config. Do not reuse it (really!).
func (cfg *Config) NewNode() (host.Host, error) {
if cfg.EnableAutoRelay && !cfg.Relay {
return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled")
}
// If possible check that the resource manager conn limit is higher than the
// limit set in the conn manager.
if l, ok := cfg.ResourceManager.(connmgr.GetConnLimiter); ok {
err := cfg.ConnManager.CheckLimit(l)
if err != nil {
log.Warn(fmt.Sprintf("rcmgr limit conflicts with connmgr limit: %v", err))
}
}

// TODO: This method succeeds if listening on one address succeeds. We
// should probably fail if listening on *any* addr fails.
if err := h.Network().Listen(cfg.ListenAddrs...); err != nil {
h.Close()
if !cfg.DisableMetrics {
rcmgr.MustRegisterWith(cfg.PrometheusRegisterer)
}

fxopts := []fx.Option{
fx.Provide(func() event.Bus {
return eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer))))
}),
fx.Provide(func(eventBus event.Bus, lifecycle fx.Lifecycle) (*swarm.Swarm, error) {
sw, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
if err != nil {
return nil, err
}
lifecycle.Append(fx.StopHook(sw.Close))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we closing the swarm twice?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned elsewhere, we are but that's okay since the swarm close is guarded to only close once. The closing twice is useful because each close is logically for a different reason.

  1. Here we are closing because we created the swarm.
  2. Below, in decorate we are closing to ensure we close before the quicreuse conn manager.

By closing twice we prevent these different procedures from being tangled.

return sw, nil
}),
// Make sure the swarm constructor depends on the quicreuse.ConnManager.
// That way, the ConnManager will be started before the swarm, and more importantly,
// the swarm will be stopped before the ConnManager.
fx.Decorate(func(sw *swarm.Swarm, _ *quicreuse.ConnManager, lifecycle fx.Lifecycle) *swarm.Swarm {
lifecycle.Append(fx.Hook{
OnStart: func(context.Context) error {
// TODO: This method succeeds if listening on one address succeeds. We
// should probably fail if listening on *any* addr fails.
return sw.Listen(cfg.ListenAddrs...)
},
OnStop: func(context.Context) error {
return sw.Close()
},
})
return sw
}),
fx.Provide(cfg.newBasicHost),
fx.Provide(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) host.Host {
lifecycle.Append(fx.StartHook(h.Start))
return h
}),
fx.Provide(func(h host.Host) peer.ID { return h.ID() }),
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
fx.Provide(func(h host.Host) crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }),
}
transportOpts, err := cfg.addTransports()
if err != nil {
return nil, err
}
fxopts = append(fxopts, transportOpts...)

// Configure routing and autorelay
var router routing.PeerRouting
if cfg.Routing != nil {
router, err = cfg.Routing(h)
if err != nil {
h.Close()
return nil, err
}
fxopts = append(fxopts,
fx.Provide(cfg.Routing),
fx.Provide(func(h host.Host, router routing.PeerRouting) *routed.RoutedHost {
return routed.Wrap(h, router)
}),
)
}

// Note: h.AddrsFactory may be changed by relayFinder, but non-relay version is
// used by AutoNAT below.
var ar *autorelay.AutoRelay
addrF := h.AddrsFactory
if cfg.EnableAutoRelay {
if !cfg.Relay {
h.Close()
return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled")
}
if !cfg.DisableMetrics {
mt := autorelay.WithMetricsTracer(
autorelay.NewMetricsTracer(autorelay.WithRegisterer(cfg.PrometheusRegisterer)))
mtOpts := []autorelay.Option{mt}
cfg.AutoRelayOpts = append(mtOpts, cfg.AutoRelayOpts...)
}
fxopts = append(fxopts,
fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) (*autorelay.AutoRelay, error) {
ar, err := autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...)
if err != nil {
return nil, err
}
lifecycle.Append(fx.StartStopHook(ar.Start, ar.Close))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

autorelay depends on Identify to start first, how can we enforce that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is guaranteed because we construct the host first, then autorelay (since the autorelay constructor depends on the host). Fx executes startup hooks in the order they were added, so this determines that host.Start is called before autorelay.Start. Now I admit that this is pretty subtle.

The correct solution is to remove the IDService() from the host, and instead pass a reference to identify to the autorelay constructor. That would make the dependency tree more obvious. Let's do that in a follow-up PR?

return ar, nil
}),
)
}

ar, err = autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...)
if err != nil {
return nil, err
}
var bh *bhost.BasicHost
fxopts = append(fxopts, fx.Invoke(func(bho *bhost.BasicHost) { bh = bho }))

var rh *routed.RoutedHost
if cfg.Routing != nil {
fxopts = append(fxopts, fx.Invoke(func(bho *routed.RoutedHost) { rh = bho }))
}

app := fx.New(fxopts...)
if err := app.Start(context.Background()); err != nil {
return nil, err
}

if err := cfg.addAutoNAT(bh); err != nil {
rh.Close()
return nil, err
}

if cfg.Routing != nil {
return &closableRoutedHost{App: app, RoutedHost: rh}, nil
}
return &closableBasicHost{App: app, BasicHost: bh}, nil
}

func (cfg *Config) addAutoNAT(h *bhost.BasicHost) error {
addrF := h.AddrsFactory
autonatOpts := []autonat.Option{
autonat.UsingAddresses(func() []ma.Multiaddr {
return addrF(h.AllAddrs())
}),
}
if !cfg.DisableMetrics {
autonatOpts = append(autonatOpts,
autonat.WithMetricsTracer(
autonat.NewMetricsTracer(autonat.WithRegisterer(cfg.PrometheusRegisterer))))
autonatOpts = append(autonatOpts, autonat.WithMetricsTracer(
autonat.NewMetricsTracer(autonat.WithRegisterer(cfg.PrometheusRegisterer)),
))
}
if cfg.AutoNATConfig.ThrottleInterval != 0 {
autonatOpts = append(autonatOpts,
Expand All @@ -408,11 +458,11 @@ func (cfg *Config) NewNode() (host.Host, error) {
if cfg.AutoNATConfig.EnableService {
autonatPrivKey, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
return err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
return err
}

// Pull out the pieces of the config that we _actually_ care about.
Expand All @@ -438,14 +488,23 @@ func (cfg *Config) NewNode() (host.Host, error) {

dialer, err := autoNatCfg.makeSwarm(eventbus.NewBus(), false)
if err != nil {
h.Close()
return nil, err
return err
}
dialerHost := blankhost.NewBlankHost(dialer)
if err := autoNatCfg.addTransports(dialerHost); err != nil {
fxopts, err := autoNatCfg.addTransports()
if err != nil {
dialerHost.Close()
return err
}
fxopts = append(fxopts,
fx.Supply(dialerHost.ID()),
fx.Supply(dialer),
fx.Provide(func() crypto.PrivKey { return autonatPrivKey }),
)
app := fx.New(fxopts...)
if err := app.Err(); err != nil {
dialerHost.Close()
h.Close()
return nil, err
return err
}
// NOTE: We're dropping the blank host here but that's fine. It
// doesn't really _do_ anything and doesn't even need to be
Expand All @@ -458,25 +517,10 @@ func (cfg *Config) NewNode() (host.Host, error) {

autonat, err := autonat.New(h, autonatOpts...)
if err != nil {
h.Close()
return nil, fmt.Errorf("cannot enable autorelay; autonat failed to start: %v", err)
return fmt.Errorf("cannot enable autorelay; autonat failed to start: %v", err)
}
h.SetAutoNat(autonat)

// start the host background tasks
h.Start()

var ho host.Host
ho = h
if router != nil {
ho = routed.Wrap(h, router)
}
if ar != nil {
arh := autorelay.NewAutoRelayHost(ho, ar)
arh.Start()
ho = arh
}
return ho, nil
return nil
}

// Option is a libp2p config option that can be given to the libp2p constructor
Expand Down
30 changes: 30 additions & 0 deletions config/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package config

import (
"context"

basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"

"go.uber.org/fx"
)

type closableBasicHost struct {
*fx.App
*basichost.BasicHost
}
Comment on lines +12 to +15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can have just one type which uses host.Host interface instead of a separate type for closableBasicHost and closableRoutedHost

type closableHost struct {
	*fx.App
	host.Host
}


func (h *closableBasicHost) Close() error {
_ = h.App.Stop(context.Background())
return h.BasicHost.Close()
}

type closableRoutedHost struct {
*fx.App
*routed.RoutedHost
}

func (h *closableRoutedHost) Close() error {
_ = h.App.Stop(context.Background())
return h.RoutedHost.Close()
}
1 change: 1 addition & 0 deletions leaky_tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Tests that leak goroutines for various reasons. Mostly because libp2p node shutdown logic doesn't run if we fail to construct the node.
Loading
Loading