Skip to content

Commit

Permalink
refactor: move lassie instantiation to a common function
Browse files Browse the repository at this point in the history
  • Loading branch information
kylehuntsman authored and hannahhoward committed Jun 22, 2023
1 parent 9329871 commit 903b161
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 228 deletions.
156 changes: 83 additions & 73 deletions cmd/lassie/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package main

import (
"fmt"
"os"
"time"

"github.com/filecoin-project/lassie/pkg/aggregateeventrecorder"
"github.com/filecoin-project/lassie/pkg/lassie"
"github.com/filecoin-project/lassie/pkg/net/host"
"github.com/filecoin-project/lassie/pkg/retriever"
httpserver "github.com/filecoin-project/lassie/pkg/server/http"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -63,20 +61,6 @@ var daemonFlags = []cli.Flag{
DefaultText: "no limit",
EnvVars: []string{"LASSIE_CONCURRENT_SP_RETRIEVALS"},
},
&cli.DurationFlag{
Name: "provider-timeout",
Aliases: []string{"pt"},
Usage: "consider it an error after not receiving a response from a storage provider after this amount of time",
Value: 20 * time.Second,
EnvVars: []string{"LASSIE_PROVIDER_TIMEOUT"},
},
&cli.DurationFlag{
Name: "global-timeout",
Aliases: []string{"gt"},
Usage: "consider it an error after not completing a retrieval after this amount of time",
Value: 0,
EnvVars: []string{"LASSIE_GLOBAL_TIMEOUT"},
},
FlagEventRecorderAuth,
FlagEventRecorderInstanceId,
FlagEventRecorderUrl,
Expand All @@ -87,6 +71,8 @@ var daemonFlags = []cli.Flag{
FlagExcludeProviders,
FlagTempDir,
FlagBitswapConcurrency,
FlagGlobalTimeout,
FlagProviderTimeout,
}

var daemonCmd = &cli.Command{
Expand All @@ -97,74 +83,67 @@ var daemonCmd = &cli.Command{
Action: daemonCommand,
}

// daemonCommand is the action for the daemon command. It sets up the
// lassie daemon and starts the http server. It is intentionally not
// performing any logic itself, but delegates to the DaemonCommandHandler
// function for testability.
func daemonCommand(cctx *cli.Context) error {
ctx := cctx.Context

address := cctx.String("address")
port := cctx.Uint("port")
tempDir := cctx.String("tempdir")
maxBlocks := cctx.Uint64("maxblocks")
libp2pLowWater := cctx.Int("libp2p-conns-lowwater")
libp2pHighWater := cctx.Int("libp2p-conns-highwater")
concurrentSPRetrievals := cctx.Uint("concurrent-sp-retrievals")
providerTimeout := cctx.Duration("provider-timeout")
globalTimeout := cctx.Duration("global-timeout")
bitswapConcurrency := cctx.Int("bitswap-concurrency")
maxBlocks := cctx.Uint64("maxblocks")

eventRecorderURL := cctx.String("event-recorder-url")
authToken := cctx.String("event-recorder-auth")
instanceID := cctx.String("event-recorder-instance-id")

lassieOpts := []lassie.LassieOption{lassie.WithProviderTimeout(providerTimeout)}
if globalTimeout > 0 {
lassieOpts = append(lassieOpts, lassie.WithGlobalTimeout(globalTimeout))
}
if libp2pHighWater != 0 || libp2pLowWater != 0 {
connManager, err := connmgr.NewConnManager(libp2pLowWater, libp2pHighWater)
if err != nil {
return err
}
lassieOpts = append(
lassieOpts,
lassie.WithLibp2pOpts(libp2p.ConnectionManager(connManager)),
lassie.WithConcurrentSPRetrievals(concurrentSPRetrievals),
)
}
if len(protocols) > 0 {
lassieOpts = append(lassieOpts, lassie.WithProtocols(protocols))
}
if len(fetchProviderAddrInfos) > 0 {
host, err := host.InitHost(ctx, []libp2p.Option{})
if err != nil {
return err
}
finderOpt := lassie.WithFinder(retriever.NewDirectCandidateFinder(host, fetchProviderAddrInfos))
lassieOpts = append(lassieOpts, finderOpt)
}
if len(providerBlockList) > 0 {
lassieOpts = append(lassieOpts, lassie.WithProviderBlockList(providerBlockList))
}
if tempDir == "" {
tempDir = os.TempDir()
}
if bitswapConcurrency > 0 {
lassieOpts = append(lassieOpts, lassie.WithBitswapConcurrency(bitswapConcurrency))
lassieCfg, err := getLassieConfigForDaemon(cctx, libp2pLowWater, libp2pHighWater, concurrentSPRetrievals)
if err != nil {
return cli.Exit(err, 1)
}
// create a lassie instance
lassie, err := lassie.NewLassie(ctx, lassieOpts...)

httpServerCfg := getHttpServerConfigForDaemon(address, port, tempDir, maxBlocks)

eventRecorderCfg := getEventRecorderConfig(eventRecorderURL, authToken, instanceID)

err = daemonCommandHandler(
cctx,
lassieCfg,
httpServerCfg,
eventRecorderCfg,
)
if err != nil {
return err
return cli.Exit(err, 1)
}

// create and subscribe an event recorder API if configured
setupLassieEventRecorder(ctx, eventRecorderURL, authToken, instanceID, lassie)
return nil
}

httpServer, err := httpserver.NewHttpServer(ctx, lassie, httpserver.HttpServerConfig{
Address: address,
Port: port,
TempDir: tempDir,
MaxBlocksPerRequest: maxBlocks,
})
// DeamonCommandHandler is the handler for the daemon command.
// This abstraction allows the daemon to be invoked programmatically
// for testing.
func daemonCommandHandler(
cctx *cli.Context,
lassieCfg *lassie.LassieConfig,
httpServerCfg httpserver.HttpServerConfig,
eventRecorderCfg *aggregateeventrecorder.EventRecorderConfig,
) error {
ctx := cctx.Context

lassie, err := lassie.NewLassieWithConfig(cctx.Context, lassieCfg)
if err != nil {
return nil
}

// create and subscribe an event recorder API if an endpoint URL is set
if eventRecorderCfg.EndpointURL != "" {
setupLassieEventRecorder(ctx, eventRecorderCfg, lassie)
}

httpServer, err := httpserver.NewHttpServer(ctx, lassie, httpServerCfg)
if err != nil {
logger.Errorw("failed to create http server", "err", err)
return err
Expand All @@ -178,7 +157,7 @@ func daemonCommand(cctx *cli.Context) error {
}()

select {
case <-cctx.Done(): // command was cancelled
case <-ctx.Done(): // command was cancelled
case err = <-serverErrChan: // error from server
logger.Errorw("failed to start http server", "err", err)
}
Expand All @@ -189,9 +168,40 @@ func daemonCommand(cctx *cli.Context) error {
}

fmt.Println("Lassie daemon stopped")
if err != nil {
return cli.Exit(err, 1)
return err
}

// getLassieConfigForDaemon returns a LassieConfig for the daemon command.
func getLassieConfigForDaemon(
cctx *cli.Context,
libp2pLowWater int,
libp2pHighWater int,
concurrentSPRetrievals uint,
) (*lassie.LassieConfig, error) {
lassieOpts := []lassie.LassieOption{}

if concurrentSPRetrievals > 0 {
lassieOpts = append(lassieOpts, lassie.WithConcurrentSPRetrievals(concurrentSPRetrievals))
}

return nil
libp2pOpts := []config.Option{}
if libp2pHighWater != 0 || libp2pLowWater != 0 {
connManager, err := connmgr.NewConnManager(libp2pLowWater, libp2pHighWater)
if err != nil {
return nil, err
}
libp2pOpts = append(libp2pOpts, libp2p.ConnectionManager(connManager))
}

return buildLassieConfigFromCLIContext(cctx, lassieOpts, libp2pOpts)
}

// getHttpServerConfigForDaemon returns a HttpServerConfig for the daemon command.
func getHttpServerConfigForDaemon(address string, port uint, tempDir string, maxBlocks uint64) httpserver.HttpServerConfig {
return httpserver.HttpServerConfig{
Address: address,
Port: port,
TempDir: tempDir,
MaxBlocksPerRequest: maxBlocks,
}
}
Loading

0 comments on commit 903b161

Please sign in to comment.