From 903b16113cf1326d8c28ea7cf6ed70556793b128 Mon Sep 17 00:00:00 2001 From: Kyle Huntsman <3432646+kylehuntsman@users.noreply.github.com> Date: Fri, 2 Jun 2023 21:40:01 -0700 Subject: [PATCH] refactor: move lassie instantiation to a common function --- cmd/lassie/daemon.go | 156 ++++++++++++++------------- cmd/lassie/fetch.go | 250 +++++++++++++++++++------------------------ cmd/lassie/flags.go | 28 ++++- cmd/lassie/main.go | 85 ++++++++++++--- pkg/lassie/lassie.go | 8 +- 5 files changed, 299 insertions(+), 228 deletions(-) diff --git a/cmd/lassie/daemon.go b/cmd/lassie/daemon.go index 867a0396..404e5c97 100644 --- a/cmd/lassie/daemon.go +++ b/cmd/lassie/daemon.go @@ -2,14 +2,12 @@ package main import ( "fmt" - "os" - "time" + "github.com/filecoin-project/lassie/pkg/aggregateeventrecorder" "github.com/filecoin-project/lassie/pkg/lassie" - "github.com/filecoin-project/lassie/pkg/net/host" - "github.com/filecoin-project/lassie/pkg/retriever" httpserver "github.com/filecoin-project/lassie/pkg/server/http" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/urfave/cli/v2" ) @@ -63,20 +61,6 @@ var daemonFlags = []cli.Flag{ DefaultText: "no limit", EnvVars: []string{"LASSIE_CONCURRENT_SP_RETRIEVALS"}, }, - &cli.DurationFlag{ - Name: "provider-timeout", - Aliases: []string{"pt"}, - Usage: "consider it an error after not receiving a response from a storage provider after this amount of time", - Value: 20 * time.Second, - EnvVars: []string{"LASSIE_PROVIDER_TIMEOUT"}, - }, - &cli.DurationFlag{ - Name: "global-timeout", - Aliases: []string{"gt"}, - Usage: "consider it an error after not completing a retrieval after this amount of time", - Value: 0, - EnvVars: []string{"LASSIE_GLOBAL_TIMEOUT"}, - }, FlagEventRecorderAuth, FlagEventRecorderInstanceId, FlagEventRecorderUrl, @@ -87,6 +71,8 @@ var daemonFlags = []cli.Flag{ FlagExcludeProviders, FlagTempDir, FlagBitswapConcurrency, + FlagGlobalTimeout, + FlagProviderTimeout, } var daemonCmd = &cli.Command{ @@ -97,74 +83,67 @@ var daemonCmd = &cli.Command{ Action: daemonCommand, } +// daemonCommand is the action for the daemon command. It sets up the +// lassie daemon and starts the http server. It is intentionally not +// performing any logic itself, but delegates to the DaemonCommandHandler +// function for testability. func daemonCommand(cctx *cli.Context) error { - ctx := cctx.Context - address := cctx.String("address") port := cctx.Uint("port") tempDir := cctx.String("tempdir") - maxBlocks := cctx.Uint64("maxblocks") libp2pLowWater := cctx.Int("libp2p-conns-lowwater") libp2pHighWater := cctx.Int("libp2p-conns-highwater") concurrentSPRetrievals := cctx.Uint("concurrent-sp-retrievals") - providerTimeout := cctx.Duration("provider-timeout") - globalTimeout := cctx.Duration("global-timeout") - bitswapConcurrency := cctx.Int("bitswap-concurrency") + maxBlocks := cctx.Uint64("maxblocks") + eventRecorderURL := cctx.String("event-recorder-url") authToken := cctx.String("event-recorder-auth") instanceID := cctx.String("event-recorder-instance-id") - lassieOpts := []lassie.LassieOption{lassie.WithProviderTimeout(providerTimeout)} - if globalTimeout > 0 { - lassieOpts = append(lassieOpts, lassie.WithGlobalTimeout(globalTimeout)) - } - if libp2pHighWater != 0 || libp2pLowWater != 0 { - connManager, err := connmgr.NewConnManager(libp2pLowWater, libp2pHighWater) - if err != nil { - return err - } - lassieOpts = append( - lassieOpts, - lassie.WithLibp2pOpts(libp2p.ConnectionManager(connManager)), - lassie.WithConcurrentSPRetrievals(concurrentSPRetrievals), - ) - } - if len(protocols) > 0 { - lassieOpts = append(lassieOpts, lassie.WithProtocols(protocols)) - } - if len(fetchProviderAddrInfos) > 0 { - host, err := host.InitHost(ctx, []libp2p.Option{}) - if err != nil { - return err - } - finderOpt := lassie.WithFinder(retriever.NewDirectCandidateFinder(host, fetchProviderAddrInfos)) - lassieOpts = append(lassieOpts, finderOpt) - } - if len(providerBlockList) > 0 { - lassieOpts = append(lassieOpts, lassie.WithProviderBlockList(providerBlockList)) - } - if tempDir == "" { - tempDir = os.TempDir() - } - if bitswapConcurrency > 0 { - lassieOpts = append(lassieOpts, lassie.WithBitswapConcurrency(bitswapConcurrency)) + lassieCfg, err := getLassieConfigForDaemon(cctx, libp2pLowWater, libp2pHighWater, concurrentSPRetrievals) + if err != nil { + return cli.Exit(err, 1) } - // create a lassie instance - lassie, err := lassie.NewLassie(ctx, lassieOpts...) + + httpServerCfg := getHttpServerConfigForDaemon(address, port, tempDir, maxBlocks) + + eventRecorderCfg := getEventRecorderConfig(eventRecorderURL, authToken, instanceID) + + err = daemonCommandHandler( + cctx, + lassieCfg, + httpServerCfg, + eventRecorderCfg, + ) if err != nil { - return err + return cli.Exit(err, 1) } - // create and subscribe an event recorder API if configured - setupLassieEventRecorder(ctx, eventRecorderURL, authToken, instanceID, lassie) + return nil +} - httpServer, err := httpserver.NewHttpServer(ctx, lassie, httpserver.HttpServerConfig{ - Address: address, - Port: port, - TempDir: tempDir, - MaxBlocksPerRequest: maxBlocks, - }) +// DeamonCommandHandler is the handler for the daemon command. +// This abstraction allows the daemon to be invoked programmatically +// for testing. +func daemonCommandHandler( + cctx *cli.Context, + lassieCfg *lassie.LassieConfig, + httpServerCfg httpserver.HttpServerConfig, + eventRecorderCfg *aggregateeventrecorder.EventRecorderConfig, +) error { + ctx := cctx.Context + + lassie, err := lassie.NewLassieWithConfig(cctx.Context, lassieCfg) + if err != nil { + return nil + } + // create and subscribe an event recorder API if an endpoint URL is set + if eventRecorderCfg.EndpointURL != "" { + setupLassieEventRecorder(ctx, eventRecorderCfg, lassie) + } + + httpServer, err := httpserver.NewHttpServer(ctx, lassie, httpServerCfg) if err != nil { logger.Errorw("failed to create http server", "err", err) return err @@ -178,7 +157,7 @@ func daemonCommand(cctx *cli.Context) error { }() select { - case <-cctx.Done(): // command was cancelled + case <-ctx.Done(): // command was cancelled case err = <-serverErrChan: // error from server logger.Errorw("failed to start http server", "err", err) } @@ -189,9 +168,40 @@ func daemonCommand(cctx *cli.Context) error { } fmt.Println("Lassie daemon stopped") - if err != nil { - return cli.Exit(err, 1) + return err +} + +// getLassieConfigForDaemon returns a LassieConfig for the daemon command. +func getLassieConfigForDaemon( + cctx *cli.Context, + libp2pLowWater int, + libp2pHighWater int, + concurrentSPRetrievals uint, +) (*lassie.LassieConfig, error) { + lassieOpts := []lassie.LassieOption{} + + if concurrentSPRetrievals > 0 { + lassieOpts = append(lassieOpts, lassie.WithConcurrentSPRetrievals(concurrentSPRetrievals)) } - return nil + libp2pOpts := []config.Option{} + if libp2pHighWater != 0 || libp2pLowWater != 0 { + connManager, err := connmgr.NewConnManager(libp2pLowWater, libp2pHighWater) + if err != nil { + return nil, err + } + libp2pOpts = append(libp2pOpts, libp2p.ConnectionManager(connManager)) + } + + return buildLassieConfigFromCLIContext(cctx, lassieOpts, libp2pOpts) +} + +// getHttpServerConfigForDaemon returns a HttpServerConfig for the daemon command. +func getHttpServerConfigForDaemon(address string, port uint, tempDir string, maxBlocks uint64) httpserver.HttpServerConfig { + return httpserver.HttpServerConfig{ + Address: address, + Port: port, + TempDir: tempDir, + MaxBlocksPerRequest: maxBlocks, + } } diff --git a/cmd/lassie/fetch.go b/cmd/lassie/fetch.go index afea0fd8..c4ad6720 100644 --- a/cmd/lassie/fetch.go +++ b/cmd/lassie/fetch.go @@ -3,22 +3,16 @@ package main import ( "fmt" "io" - "net/url" - "os" "strings" - "time" "github.com/dustin/go-humanize" + "github.com/filecoin-project/lassie/pkg/aggregateeventrecorder" "github.com/filecoin-project/lassie/pkg/events" - "github.com/filecoin-project/lassie/pkg/indexerlookup" "github.com/filecoin-project/lassie/pkg/lassie" - "github.com/filecoin-project/lassie/pkg/net/host" - "github.com/filecoin-project/lassie/pkg/retriever" "github.com/filecoin-project/lassie/pkg/storage" "github.com/filecoin-project/lassie/pkg/types" "github.com/ipfs/go-cid" cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/libp2p/go-libp2p" "github.com/urfave/cli/v2" ) @@ -34,18 +28,6 @@ var fetchCmd = &cli.Command{ Usage: "the CAR file to write to, may be an existing or a new CAR, or use '-' to write to stdout", TakesFile: true, }, - &cli.DurationFlag{ - Name: "provider-timeout", - Aliases: []string{"pt"}, - Usage: "consider it an error after not receiving a response from a storage provider after this amount of time", - Value: 20 * time.Second, - }, - &cli.DurationFlag{ - Name: "global-timeout", - Aliases: []string{"gt"}, - Usage: "consider it an error after not completing the retrieval after this amount of time", - Value: 0, - }, &cli.BoolFlag{ Name: "progress", Aliases: []string{"p"}, @@ -68,12 +50,7 @@ var fetchCmd = &cli.Command{ return nil }, }, - &cli.StringFlag{ - Name: "ipni-endpoint", - Aliases: []string{"ipni"}, - DefaultText: "Defaults to https://cid.contact", - Usage: "HTTP endpoint of the IPNI instance used to discover providers.", - }, + FlagIPNIEndpoint, FlagEventRecorderAuth, FlagEventRecorderInstanceId, FlagEventRecorderUrl, @@ -84,6 +61,8 @@ var fetchCmd = &cli.Command{ FlagExcludeProviders, FlagTempDir, FlagBitswapConcurrency, + FlagGlobalTimeout, + FlagProviderTimeout, }, } @@ -92,83 +71,142 @@ func Fetch(cctx *cli.Context) error { return fmt.Errorf("usage: lassie fetch [-o ] [-t ] [/path/to/content]") } - ctx := cctx.Context msgWriter := cctx.App.ErrWriter dataWriter := cctx.App.Writer - progress := cctx.Bool("progress") - providerTimeout := cctx.Duration("provider-timeout") - globalTimeout := cctx.Duration("global-timeout") dagScope := cctx.String("dag-scope") tempDir := cctx.String("tempdir") - bitswapConcurrency := cctx.Int("bitswap-concurrency") eventRecorderURL := cctx.String("event-recorder-url") authToken := cctx.String("event-recorder-auth") instanceID := cctx.String("event-recorder-instance-id") + progress := cctx.Bool("progress") rootCid, path, err := parseCidPath(cctx.Args().Get(0)) if err != nil { return err } - providerTimeoutOpt := lassie.WithProviderTimeout(providerTimeout) - - host, err := host.InitHost(ctx, []libp2p.Option{}) + lassieCfg, err := buildLassieConfigFromCLIContext(cctx, nil, nil) if err != nil { return err } - hostOpt := lassie.WithHost(host) - var lassieOpts = []lassie.LassieOption{providerTimeoutOpt, hostOpt} - if len(fetchProviderAddrInfos) > 0 { - finderOpt := lassie.WithFinder(retriever.NewDirectCandidateFinder(host, fetchProviderAddrInfos)) - if cctx.IsSet("ipni-endpoint") { - logger.Warn("Ignoring ipni-endpoint flag since direct provider is specified") - } - lassieOpts = append(lassieOpts, finderOpt) - } else if cctx.IsSet("ipni-endpoint") { - endpoint := cctx.String("ipni-endpoint") - endpointUrl, err := url.Parse(endpoint) - if err != nil { - logger.Errorw("Failed to parse IPNI endpoint as URL", "err", err) - return fmt.Errorf("cannot parse given IPNI endpoint %s as valid URL: %w", endpoint, err) - } - finder, err := indexerlookup.NewCandidateFinder(indexerlookup.WithHttpEndpoint(endpointUrl)) - if err != nil { - logger.Errorw("Failed to instantiate IPNI candidate finder", "err", err) - return err - } - lassieOpts = append(lassieOpts, lassie.WithFinder(finder)) - logger.Debug("Using explicit IPNI endpoint to find candidates", "endpoint", endpoint) - } + eventRecorderCfg := getEventRecorderConfig(eventRecorderURL, authToken, instanceID) - if len(providerBlockList) > 0 { - lassieOpts = append(lassieOpts, lassie.WithProviderBlockList(providerBlockList)) + err = fetchCommandHandler( + cctx, + lassieCfg, + eventRecorderCfg, + msgWriter, + dataWriter, + rootCid, + path, + dagScope, + tempDir, + progress, + ) + if err != nil { + return cli.Exit(err, 1) } - if len(protocols) > 0 { - lassieOpts = append(lassieOpts, lassie.WithProtocols(protocols)) - } + return nil +} - if globalTimeout > 0 { - lassieOpts = append(lassieOpts, lassie.WithGlobalTimeout(globalTimeout)) +func parseCidPath(cpath string) (cid.Cid, string, error) { + cstr := strings.Split(cpath, "/")[0] + path := strings.TrimPrefix(cpath, cstr) + rootCid, err := cid.Parse(cstr) + if err != nil { + return cid.Undef, "", err } + return rootCid, path, nil +} - if tempDir == "" { - tempDir = os.TempDir() - } +type progressPrinter struct { + candidatesFound int + writer io.Writer +} - if bitswapConcurrency > 0 { - lassieOpts = append(lassieOpts, lassie.WithBitswapConcurrency(bitswapConcurrency)) +func (pp *progressPrinter) subscriber(event types.RetrievalEvent) { + switch ret := event.(type) { + case events.RetrievalEventStarted: + switch ret.Phase() { + case types.IndexerPhase: + fmt.Fprintf(pp.writer, "\rQuerying indexer for %s...\n", ret.PayloadCid()) + case types.QueryPhase: + fmt.Fprintf(pp.writer, "\rQuerying [%s] (%s)...\n", types.Identifier(ret), ret.Code()) + case types.RetrievalPhase: + fmt.Fprintf(pp.writer, "\rRetrieving from [%s] (%s)...\n", types.Identifier(ret), ret.Code()) + } + case events.RetrievalEventConnected: + switch ret.Phase() { + case types.QueryPhase: + fmt.Fprintf(pp.writer, "\rQuerying [%s] (%s)...\n", types.Identifier(ret), ret.Code()) + case types.RetrievalPhase: + fmt.Fprintf(pp.writer, "\rRetrieving from [%s] (%s)...\n", types.Identifier(ret), ret.Code()) + } + case events.RetrievalEventProposed: + fmt.Fprintf(pp.writer, "\rRetrieving from [%s] (%s)...\n", types.Identifier(ret), ret.Code()) + case events.RetrievalEventAccepted: + fmt.Fprintf(pp.writer, "\rRetrieving from [%s] (%s)...\n", types.Identifier(ret), ret.Code()) + case events.RetrievalEventFirstByte: + fmt.Fprintf(pp.writer, "\rRetrieving from [%s] (%s)...\n", types.Identifier(ret), ret.Code()) + case events.RetrievalEventCandidatesFound: + pp.candidatesFound = len(ret.Candidates()) + case events.RetrievalEventCandidatesFiltered: + if len(fetchProviderAddrInfos) == 0 { + fmt.Fprintf(pp.writer, "Found %d storage provider candidate(s) in the indexer:\n", pp.candidatesFound) + } else { + fmt.Fprintf(pp.writer, "Using the specified storage provider(s):\n") + } + for _, candidate := range ret.Candidates() { + fmt.Fprintf(pp.writer, "\r\t%s, Protocols: %v\n", candidate.MinerPeer.ID, candidate.Metadata.Protocols()) + } + case events.RetrievalEventFailed: + if ret.Phase() == types.IndexerPhase { + fmt.Fprintf(pp.writer, "\rRetrieval failure from indexer: %s\n", ret.ErrorMessage()) + } else { + fmt.Fprintf(pp.writer, "\rRetrieval failure for [%s]: %s\n", types.Identifier(ret), ret.ErrorMessage()) + } + case events.RetrievalEventSuccess: + // noop, handled at return from Retrieve() } +} + +type onlyWriter struct { + w io.Writer +} + +func (ow *onlyWriter) Write(p []byte) (n int, err error) { + return ow.w.Write(p) +} - lassie, err := lassie.NewLassie(ctx, lassieOpts...) +// fetchCommandHandler is the handler for the fetch command. +// This abstraction allows the fetch command to be invoked +// programmatically for testing. +func fetchCommandHandler( + cctx *cli.Context, + lassieCfg *lassie.LassieConfig, + eventRecorderCfg *aggregateeventrecorder.EventRecorderConfig, + msgWriter io.Writer, + dataWriter io.Writer, + rootCid cid.Cid, + path string, + dagScope string, + tempDir string, + progress bool, +) error { + ctx := cctx.Context + + lassie, err := lassie.NewLassieWithConfig(cctx.Context, lassieCfg) if err != nil { return err } - // create and subscribe an event recorder API if configured - setupLassieEventRecorder(ctx, eventRecorderURL, authToken, instanceID, lassie) + // create and subscribe an event recorder API if an endpoint URL is set + if eventRecorderCfg.EndpointURL != "" { + setupLassieEventRecorder(ctx, eventRecorderCfg, lassie) + } if len(fetchProviderAddrInfos) == 0 { fmt.Fprintf(msgWriter, "Fetching %s", rootCid.String()+path) @@ -195,6 +233,7 @@ func Fetch(cctx *cli.Context) error { } else { carWriter = storage.NewDeferredCarWriterForPath(rootCid, outfile) } + tempStore := storage.NewDeferredStorageCar(tempDir) carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempStore) defer carStore.Close() @@ -241,72 +280,3 @@ func Fetch(cctx *cli.Context) error { return nil } - -func parseCidPath(cpath string) (cid.Cid, string, error) { - cstr := strings.Split(cpath, "/")[0] - path := strings.TrimPrefix(cpath, cstr) - rootCid, err := cid.Parse(cstr) - if err != nil { - return cid.Undef, "", err - } - return rootCid, path, nil -} - -type progressPrinter struct { - candidatesFound int - writer io.Writer -} - -func (pp *progressPrinter) subscriber(event types.RetrievalEvent) { - switch ret := event.(type) { - case events.RetrievalEventStarted: - switch ret.Phase() { - case types.IndexerPhase: - fmt.Fprintf(pp.writer, "\rQuerying indexer for %s...\n", ret.PayloadCid()) - case types.QueryPhase: - fmt.Fprintf(pp.writer, "\rQuerying [%s] (%s)...\n", types.Identifier(ret), ret.Code()) - case types.RetrievalPhase: - fmt.Fprintf(pp.writer, "\rRetrieving from [%s] (%s)...\n", types.Identifier(ret), ret.Code()) - } - case events.RetrievalEventConnected: - switch ret.Phase() { - case types.QueryPhase: - fmt.Fprintf(pp.writer, "\rQuerying [%s] (%s)...\n", types.Identifier(ret), ret.Code()) - case types.RetrievalPhase: - fmt.Fprintf(pp.writer, "\rRetrieving from [%s] (%s)...\n", types.Identifier(ret), ret.Code()) - } - case events.RetrievalEventProposed: - fmt.Fprintf(pp.writer, "\rRetrieving from [%s] (%s)...\n", types.Identifier(ret), ret.Code()) - case events.RetrievalEventAccepted: - fmt.Fprintf(pp.writer, "\rRetrieving from [%s] (%s)...\n", types.Identifier(ret), ret.Code()) - case events.RetrievalEventFirstByte: - fmt.Fprintf(pp.writer, "\rRetrieving from [%s] (%s)...\n", types.Identifier(ret), ret.Code()) - case events.RetrievalEventCandidatesFound: - pp.candidatesFound = len(ret.Candidates()) - case events.RetrievalEventCandidatesFiltered: - if len(fetchProviderAddrInfos) == 0 { - fmt.Fprintf(pp.writer, "Found %d storage provider candidate(s) in the indexer:\n", pp.candidatesFound) - } else { - fmt.Fprintf(pp.writer, "Using the specified storage provider(s):\n") - } - for _, candidate := range ret.Candidates() { - fmt.Fprintf(pp.writer, "\r\t%s, Protocols: %v\n", candidate.MinerPeer.ID, candidate.Metadata.Protocols()) - } - case events.RetrievalEventFailed: - if ret.Phase() == types.IndexerPhase { - fmt.Fprintf(pp.writer, "\rRetrieval failure from indexer: %s\n", ret.ErrorMessage()) - } else { - fmt.Fprintf(pp.writer, "\rRetrieval failure for [%s]: %s\n", types.Identifier(ret), ret.ErrorMessage()) - } - case events.RetrievalEventSuccess: - // noop, handled at return from Retrieve() - } -} - -type onlyWriter struct { - w io.Writer -} - -func (ow *onlyWriter) Write(p []byte) (n int, err error) { - return ow.w.Write(p) -} diff --git a/cmd/lassie/flags.go b/cmd/lassie/flags.go index 7553c507..f54f7c69 100644 --- a/cmd/lassie/flags.go +++ b/cmd/lassie/flags.go @@ -1,7 +1,9 @@ package main import ( + "os" "strings" + "time" "github.com/filecoin-project/lassie/pkg/types" "github.com/libp2p/go-libp2p/core/peer" @@ -64,7 +66,6 @@ var FlagEventRecorderUrl = &cli.StringFlag{ } var providerBlockList map[peer.ID]bool - var FlagExcludeProviders = &cli.StringFlag{ Name: "exclude-providers", DefaultText: "All providers allowed", @@ -131,7 +132,7 @@ var FlagTempDir = &cli.StringFlag{ Name: "tempdir", Aliases: []string{"td"}, Usage: "directory to store temporary files while downloading", - Value: "", + Value: os.TempDir(), DefaultText: "os temp directory", EnvVars: []string{"LASSIE_TEMP_DIRECTORY"}, } @@ -142,3 +143,26 @@ var FlagBitswapConcurrency = &cli.IntFlag{ Value: 6, EnvVars: []string{"LASSIE_BITSWAP_CONCURRENCY"}, } + +var FlagGlobalTimeout = &cli.DurationFlag{ + Name: "global-timeout", + Aliases: []string{"gt"}, + Usage: "consider it an error after not completing a retrieval after this amount of time", + Value: 0, + EnvVars: []string{"LASSIE_GLOBAL_TIMEOUT"}, +} + +var FlagProviderTimeout = &cli.DurationFlag{ + Name: "provider-timeout", + Aliases: []string{"pt"}, + Usage: "consider it an error after not receiving a response from a storage provider after this amount of time", + Value: 20 * time.Second, + EnvVars: []string{"LASSIE_PROVIDER_TIMEOUT"}, +} + +var FlagIPNIEndpoint = &cli.StringFlag{ + Name: "ipni-endpoint", + Aliases: []string{"ipni"}, + DefaultText: "Defaults to https://cid.contact", + Usage: "HTTP endpoint of the IPNI instance used to discover providers.", +} diff --git a/cmd/lassie/main.go b/cmd/lassie/main.go index 5d009adc..2460cc7a 100644 --- a/cmd/lassie/main.go +++ b/cmd/lassie/main.go @@ -3,14 +3,19 @@ package main import ( "context" "fmt" + "net/url" "os" "os/signal" "syscall" "github.com/filecoin-project/lassie/pkg/aggregateeventrecorder" + "github.com/filecoin-project/lassie/pkg/indexerlookup" "github.com/filecoin-project/lassie/pkg/lassie" + "github.com/filecoin-project/lassie/pkg/net/host" + "github.com/filecoin-project/lassie/pkg/retriever" "github.com/google/uuid" "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p/config" "github.com/urfave/cli/v2" ) @@ -85,29 +90,85 @@ func before(cctx *cli.Context) error { return nil } +func buildLassieConfigFromCLIContext(cctx *cli.Context, lassieOpts []lassie.LassieOption, libp2pOpts []config.Option) (*lassie.LassieConfig, error) { + providerTimeout := cctx.Duration("provider-timeout") + globalTimeout := cctx.Duration("global-timeout") + bitswapConcurrency := cctx.Int("bitswap-concurrency") + + lassieOpts = append(lassieOpts, lassie.WithProviderTimeout(providerTimeout)) + + if globalTimeout > 0 { + lassieOpts = append(lassieOpts, lassie.WithGlobalTimeout(globalTimeout)) + } + + if len(protocols) > 0 { + lassieOpts = append(lassieOpts, lassie.WithProtocols(protocols)) + } + + host, err := host.InitHost(cctx.Context, libp2pOpts) + if err != nil { + return nil, err + } + lassieOpts = append(lassieOpts, lassie.WithHost(host)) + + if len(fetchProviderAddrInfos) > 0 { + finderOpt := lassie.WithFinder(retriever.NewDirectCandidateFinder(host, fetchProviderAddrInfos)) + if cctx.IsSet("ipni-endpoint") { + logger.Warn("Ignoring ipni-endpoint flag since direct provider is specified") + } + lassieOpts = append(lassieOpts, finderOpt) + } else if cctx.IsSet("ipni-endpoint") { + endpoint := cctx.String("ipni-endpoint") + endpointUrl, err := url.Parse(endpoint) + if err != nil { + logger.Errorw("Failed to parse IPNI endpoint as URL", "err", err) + return nil, fmt.Errorf("cannot parse given IPNI endpoint %s as valid URL: %w", endpoint, err) + } + finder, err := indexerlookup.NewCandidateFinder(indexerlookup.WithHttpEndpoint(endpointUrl)) + if err != nil { + logger.Errorw("Failed to instantiate IPNI candidate finder", "err", err) + return nil, err + } + lassieOpts = append(lassieOpts, lassie.WithFinder(finder)) + logger.Debug("Using explicit IPNI endpoint to find candidates", "endpoint", endpoint) + } + + if len(providerBlockList) > 0 { + lassieOpts = append(lassieOpts, lassie.WithProviderBlockList(providerBlockList)) + } + + if bitswapConcurrency > 0 { + lassieOpts = append(lassieOpts, lassie.WithBitswapConcurrency(bitswapConcurrency)) + } + + return lassie.NewLassieConfig(lassieOpts...), nil +} + +func getEventRecorderConfig(endpointURL string, authToken string, instanceID string) *aggregateeventrecorder.EventRecorderConfig { + return &aggregateeventrecorder.EventRecorderConfig{ + InstanceID: instanceID, + EndpointURL: endpointURL, + EndpointAuthorization: authToken, + } +} + // setupLassieEventRecorder creates and subscribes an EventRecorder if an event recorder URL is given func setupLassieEventRecorder( ctx context.Context, - eventRecorderURL string, - authToken string, - instanceID string, + cfg *aggregateeventrecorder.EventRecorderConfig, lassie *lassie.Lassie, ) { - if eventRecorderURL != "" { - if instanceID == "" { + if cfg.EndpointURL != "" { + if cfg.InstanceID == "" { uuid, err := uuid.NewRandom() if err != nil { logger.Warnw("failed to generate default event recorder instance ID UUID, no instance ID will be provided", "err", err) } - instanceID = uuid.String() // returns "" if uuid is invalid + cfg.InstanceID = uuid.String() // returns "" if uuid is invalid } - eventRecorder := aggregateeventrecorder.NewAggregateEventRecorder(ctx, aggregateeventrecorder.EventRecorderConfig{ - InstanceID: instanceID, - EndpointURL: eventRecorderURL, - EndpointAuthorization: authToken, - }) + eventRecorder := aggregateeventrecorder.NewAggregateEventRecorder(ctx, *cfg) lassie.RegisterSubscriber(eventRecorder.RetrievalEventSubscriber()) - logger.Infow("Reporting retrieval events to event recorder API", "url", eventRecorderURL, "instance_id", instanceID) + logger.Infow("Reporting retrieval events to event recorder API", "url", cfg.EndpointURL, "instance_id", cfg.InstanceID) } } diff --git a/pkg/lassie/lassie.go b/pkg/lassie/lassie.go index d14f945b..b1215a4e 100644 --- a/pkg/lassie/lassie.go +++ b/pkg/lassie/lassie.go @@ -43,11 +43,17 @@ type LassieOption func(cfg *LassieConfig) // NewLassie creates a new Lassie instance. func NewLassie(ctx context.Context, opts ...LassieOption) (*Lassie, error) { + cfg := NewLassieConfig(opts...) + return NewLassieWithConfig(ctx, cfg) +} + +// NewLassieConfig creates a new LassieConfig instance with the given LassieOptions. +func NewLassieConfig(opts ...LassieOption) *LassieConfig { cfg := &LassieConfig{} for _, opt := range opts { opt(cfg) } - return NewLassieWithConfig(ctx, cfg) + return cfg } // NewLassieWithConfig creates a new Lassie instance with a custom