diff --git a/cmd/ipfs/main.go b/cmd/ipfs/main.go index d975d18971f9..4ab2dcf5b34c 100644 --- a/cmd/ipfs/main.go +++ b/cmd/ipfs/main.go @@ -20,6 +20,8 @@ import ( loader "github.com/ipfs/go-ipfs/plugin/loader" repo "github.com/ipfs/go-ipfs/repo" fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" + "github.com/ipfs/go-ipfs/tracing" + "go.opentelemetry.io/otel" cmds "github.com/ipfs/go-ipfs-cmds" "github.com/ipfs/go-ipfs-cmds/cli" @@ -71,21 +73,30 @@ func main() { os.Exit(mainRet()) } -func mainRet() int { +func printErr(err error) int { + fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error()) + return 1 +} + +func mainRet() (exitCode int) { rand.Seed(time.Now().UnixNano()) ctx := logging.ContextWithLoggable(context.Background(), loggables.Uuid("session")) var err error - // we'll call this local helper to output errors. - // this is so we control how to print errors in one place. - printErr := func(err error) { - fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error()) + tp, err := tracing.NewTracerProvider(ctx) + if err != nil { + return printErr(err) } + defer func() { + if err := tp.Shutdown(ctx); err != nil { + exitCode = printErr(err) + } + }() + otel.SetTracerProvider(tp) stopFunc, err := profileIfEnabled() if err != nil { - printErr(err) - return 1 + return printErr(err) } defer stopFunc() // to be executed as late as possible diff --git a/core/coreapi/block.go b/core/coreapi/block.go index e1ad9badc05b..b78197d0501a 100644 --- a/core/coreapi/block.go +++ b/core/coreapi/block.go @@ -13,8 +13,11 @@ import ( coreiface "github.com/ipfs/interface-go-ipfs-core" caopts "github.com/ipfs/interface-go-ipfs-core/options" path "github.com/ipfs/interface-go-ipfs-core/path" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" util "github.com/ipfs/go-ipfs/blocks/blockstoreutil" + "github.com/ipfs/go-ipfs/tracing" ) type BlockAPI CoreAPI @@ -25,6 +28,9 @@ type BlockStat struct { } func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.BlockPutOption) (coreiface.BlockStat, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.BlockAPI", "Put") + defer span.End() + settings, pref, err := caopts.BlockPutOptions(opts...) if err != nil { return nil, err @@ -65,6 +71,8 @@ func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Bloc } func (api *BlockAPI) Get(ctx context.Context, p path.Path) (io.Reader, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.BlockAPI", "Get", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() rp, err := api.core().ResolvePath(ctx, p) if err != nil { return nil, err @@ -79,6 +87,9 @@ func (api *BlockAPI) Get(ctx context.Context, p path.Path) (io.Reader, error) { } func (api *BlockAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.BlockRmOption) error { + ctx, span := tracing.Span(ctx, "CoreAPI.BlockAPI", "Rm", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + rp, err := api.core().ResolvePath(ctx, p) if err != nil { return err @@ -117,6 +128,9 @@ func (api *BlockAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.BlockRm } func (api *BlockAPI) Stat(ctx context.Context, p path.Path) (coreiface.BlockStat, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.BlockAPI", "Stat", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + rp, err := api.core().ResolvePath(ctx, p) if err != nil { return nil, err diff --git a/core/coreapi/dag.go b/core/coreapi/dag.go index d056e8e6e0a7..696c5bab76c2 100644 --- a/core/coreapi/dag.go +++ b/core/coreapi/dag.go @@ -5,8 +5,11 @@ import ( cid "github.com/ipfs/go-cid" pin "github.com/ipfs/go-ipfs-pinner" + "github.com/ipfs/go-ipfs/tracing" ipld "github.com/ipfs/go-ipld-format" dag "github.com/ipfs/go-merkledag" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type dagAPI struct { @@ -18,6 +21,8 @@ type dagAPI struct { type pinningAdder CoreAPI func (adder *pinningAdder) Add(ctx context.Context, nd ipld.Node) error { + ctx, span := tracing.Span(ctx, "CoreAPI.PinningAdder", "Add", trace.WithAttributes(attribute.String("node", nd.String()))) + defer span.End() defer adder.blockstore.PinLock(ctx).Unlock(ctx) if err := adder.dag.Add(ctx, nd); err != nil { @@ -30,6 +35,8 @@ func (adder *pinningAdder) Add(ctx context.Context, nd ipld.Node) error { } func (adder *pinningAdder) AddMany(ctx context.Context, nds []ipld.Node) error { + ctx, span := tracing.Span(ctx, "CoreAPI.PinningAdder", "AddMany", trace.WithAttributes(attribute.Int("nodes.count", len(nds)))) + defer span.End() defer adder.blockstore.PinLock(ctx).Unlock(ctx) if err := adder.dag.AddMany(ctx, nds); err != nil { diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index 3f10a0ffcf8b..c196aba9bd83 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -9,17 +9,22 @@ import ( cidutil "github.com/ipfs/go-cidutil" blockstore "github.com/ipfs/go-ipfs-blockstore" offline "github.com/ipfs/go-ipfs-exchange-offline" + "github.com/ipfs/go-ipfs/tracing" dag "github.com/ipfs/go-merkledag" coreiface "github.com/ipfs/interface-go-ipfs-core" caopts "github.com/ipfs/interface-go-ipfs-core/options" path "github.com/ipfs/interface-go-ipfs-core/path" peer "github.com/libp2p/go-libp2p-core/peer" routing "github.com/libp2p/go-libp2p-core/routing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type DhtAPI CoreAPI func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "FindPeer", trace.WithAttributes(attribute.String("peer", p.String()))) + defer span.End() err := api.checkOnline(false) if err != nil { return peer.AddrInfo{}, err @@ -34,10 +39,14 @@ func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, erro } func (api *DhtAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.AddrInfo, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "FindProviders", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + settings, err := caopts.DhtFindProvidersOptions(opts...) if err != nil { return nil, err } + span.SetAttributes(attribute.Int("numproviders", settings.NumProviders)) err = api.checkOnline(false) if err != nil { @@ -59,10 +68,14 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopt } func (api *DhtAPI) Provide(ctx context.Context, path path.Path, opts ...caopts.DhtProvideOption) error { + ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "Provide", trace.WithAttributes(attribute.String("path", path.String()))) + defer span.End() + settings, err := caopts.DhtProvideOptions(opts...) if err != nil { return err } + span.SetAttributes(attribute.Bool("recursive", settings.Recursive)) err = api.checkOnline(false) if err != nil { diff --git a/core/coreapi/key.go b/core/coreapi/key.go index 9b4045ed04a5..1468e6c0c5a3 100644 --- a/core/coreapi/key.go +++ b/core/coreapi/key.go @@ -7,12 +7,15 @@ import ( "fmt" "sort" + "github.com/ipfs/go-ipfs/tracing" ipfspath "github.com/ipfs/go-path" coreiface "github.com/ipfs/interface-go-ipfs-core" caopts "github.com/ipfs/interface-go-ipfs-core/options" path "github.com/ipfs/interface-go-ipfs-core/path" crypto "github.com/libp2p/go-libp2p-core/crypto" peer "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type KeyAPI CoreAPI @@ -40,6 +43,9 @@ func (k *key) ID() peer.ID { // Generate generates new key, stores it in the keystore under the specified // name and returns a base58 encoded multihash of its public key. func (api *KeyAPI) Generate(ctx context.Context, name string, opts ...caopts.KeyGenerateOption) (coreiface.Key, error) { + _, span := tracing.Span(ctx, "CoreAPI.KeyAPI", "Generate", trace.WithAttributes(attribute.String("name", name))) + defer span.End() + options, err := caopts.KeyGenerateOptions(opts...) if err != nil { return nil, err @@ -97,6 +103,9 @@ func (api *KeyAPI) Generate(ctx context.Context, name string, opts ...caopts.Key // List returns a list keys stored in keystore. func (api *KeyAPI) List(ctx context.Context) ([]coreiface.Key, error) { + _, span := tracing.Span(ctx, "CoreAPI.KeyAPI", "List") + defer span.End() + keys, err := api.repo.Keystore().List() if err != nil { return nil, err @@ -128,10 +137,14 @@ func (api *KeyAPI) List(ctx context.Context) ([]coreiface.Key, error) { // Rename renames `oldName` to `newName`. Returns the key and whether another // key was overwritten, or an error. func (api *KeyAPI) Rename(ctx context.Context, oldName string, newName string, opts ...caopts.KeyRenameOption) (coreiface.Key, bool, error) { + _, span := tracing.Span(ctx, "CoreAPI.KeyAPI", "Rename", trace.WithAttributes(attribute.String("oldname", oldName), attribute.String("newname", newName))) + defer span.End() + options, err := caopts.KeyRenameOptions(opts...) if err != nil { return nil, false, err } + span.SetAttributes(attribute.Bool("force", options.Force)) ks := api.repo.Keystore() @@ -187,6 +200,9 @@ func (api *KeyAPI) Rename(ctx context.Context, oldName string, newName string, o // Remove removes keys from keystore. Returns ipns path of the removed key. func (api *KeyAPI) Remove(ctx context.Context, name string) (coreiface.Key, error) { + _, span := tracing.Span(ctx, "CoreAPI.KeyAPI", "Remove", trace.WithAttributes(attribute.String("name", name))) + defer span.End() + ks := api.repo.Keystore() if name == "self" { diff --git a/core/coreapi/name.go b/core/coreapi/name.go index b007ccd7d5cf..918e896c1af8 100644 --- a/core/coreapi/name.go +++ b/core/coreapi/name.go @@ -6,8 +6,11 @@ import ( "strings" "time" - "github.com/ipfs/go-ipfs-keystore" + keystore "github.com/ipfs/go-ipfs-keystore" + "github.com/ipfs/go-ipfs/tracing" "github.com/ipfs/go-namesys" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ipath "github.com/ipfs/go-path" coreiface "github.com/ipfs/interface-go-ipfs-core" @@ -36,6 +39,9 @@ func (e *ipnsEntry) Value() path.Path { // Publish announces new IPNS name and returns the new IPNS entry. func (api *NameAPI) Publish(ctx context.Context, p path.Path, opts ...caopts.NamePublishOption) (coreiface.IpnsEntry, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.NameAPI", "Publish", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + if err := api.checkPublishAllowed(); err != nil { return nil, err } @@ -44,6 +50,12 @@ func (api *NameAPI) Publish(ctx context.Context, p path.Path, opts ...caopts.Nam if err != nil { return nil, err } + span.SetAttributes( + attribute.Bool("allowoffline", options.AllowOffline), + attribute.String("key", options.Key), + attribute.Float64("ttl", options.TTL.Seconds()), + attribute.Float64("validtime", options.ValidTime.Seconds()), + ) err = api.checkOnline(options.AllowOffline) if err != nil { @@ -82,11 +94,16 @@ func (api *NameAPI) Publish(ctx context.Context, p path.Path, opts ...caopts.Nam } func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.NameResolveOption) (<-chan coreiface.IpnsResult, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.NameAPI", "Search", trace.WithAttributes(attribute.String("name", name))) + defer span.End() + options, err := caopts.NameResolveOptions(opts...) if err != nil { return nil, err } + span.SetAttributes(attribute.Bool("cache", options.Cache)) + err = api.checkOnline(true) if err != nil { return nil, err @@ -124,6 +141,9 @@ func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.Name // Resolve attempts to resolve the newest version of the specified name and // returns its path. func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.NameResolveOption) (path.Path, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.NameAPI", "Resolve", trace.WithAttributes(attribute.String("name", name))) + defer span.End() + ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/core/coreapi/object.go b/core/coreapi/object.go index 62d31daede1c..8c3a2e0aa0ca 100644 --- a/core/coreapi/object.go +++ b/core/coreapi/object.go @@ -13,6 +13,7 @@ import ( cid "github.com/ipfs/go-cid" pin "github.com/ipfs/go-ipfs-pinner" + "github.com/ipfs/go-ipfs/tracing" ipld "github.com/ipfs/go-ipld-format" dag "github.com/ipfs/go-merkledag" "github.com/ipfs/go-merkledag/dagutils" @@ -20,6 +21,8 @@ import ( coreiface "github.com/ipfs/interface-go-ipfs-core" caopts "github.com/ipfs/interface-go-ipfs-core/options" ipath "github.com/ipfs/interface-go-ipfs-core/path" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) const inputLimit = 2 << 20 @@ -37,6 +40,9 @@ type Node struct { } func (api *ObjectAPI) New(ctx context.Context, opts ...caopts.ObjectNewOption) (ipld.Node, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.ObjectAPI", "New") + defer span.End() + options, err := caopts.ObjectNewOptions(opts...) if err != nil { return nil, err @@ -60,10 +66,18 @@ func (api *ObjectAPI) New(ctx context.Context, opts ...caopts.ObjectNewOption) ( } func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.ObjectPutOption) (ipath.Resolved, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.ObjectAPI", "Put") + defer span.End() + options, err := caopts.ObjectPutOptions(opts...) if err != nil { return nil, err } + span.SetAttributes( + attribute.Bool("pin", options.Pin), + attribute.String("datatype", options.DataType), + attribute.String("inputenc", options.InputEnc), + ) data, err := ioutil.ReadAll(io.LimitReader(src, inputLimit+10)) if err != nil { @@ -130,10 +144,15 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj } func (api *ObjectAPI) Get(ctx context.Context, path ipath.Path) (ipld.Node, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.ObjectAPI", "Get", trace.WithAttributes(attribute.String("path", path.String()))) + defer span.End() return api.core().ResolveNode(ctx, path) } func (api *ObjectAPI) Data(ctx context.Context, path ipath.Path) (io.Reader, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.ObjectAPI", "Data", trace.WithAttributes(attribute.String("path", path.String()))) + defer span.End() + nd, err := api.core().ResolveNode(ctx, path) if err != nil { return nil, err @@ -148,6 +167,9 @@ func (api *ObjectAPI) Data(ctx context.Context, path ipath.Path) (io.Reader, err } func (api *ObjectAPI) Links(ctx context.Context, path ipath.Path) ([]*ipld.Link, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.ObjectAPI", "Links", trace.WithAttributes(attribute.String("path", path.String()))) + defer span.End() + nd, err := api.core().ResolveNode(ctx, path) if err != nil { return nil, err @@ -163,6 +185,9 @@ func (api *ObjectAPI) Links(ctx context.Context, path ipath.Path) ([]*ipld.Link, } func (api *ObjectAPI) Stat(ctx context.Context, path ipath.Path) (*coreiface.ObjectStat, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.ObjectAPI", "Stat", trace.WithAttributes(attribute.String("path", path.String()))) + defer span.End() + nd, err := api.core().ResolveNode(ctx, path) if err != nil { return nil, err @@ -186,10 +211,18 @@ func (api *ObjectAPI) Stat(ctx context.Context, path ipath.Path) (*coreiface.Obj } func (api *ObjectAPI) AddLink(ctx context.Context, base ipath.Path, name string, child ipath.Path, opts ...caopts.ObjectAddLinkOption) (ipath.Resolved, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.ObjectAPI", "AddLink", trace.WithAttributes( + attribute.String("base", base.String()), + attribute.String("name", name), + attribute.String("child", child.String()), + )) + defer span.End() + options, err := caopts.ObjectAddLinkOptions(opts...) if err != nil { return nil, err } + span.SetAttributes(attribute.Bool("create", options.Create)) baseNd, err := api.core().ResolveNode(ctx, base) if err != nil { @@ -227,6 +260,12 @@ func (api *ObjectAPI) AddLink(ctx context.Context, base ipath.Path, name string, } func (api *ObjectAPI) RmLink(ctx context.Context, base ipath.Path, link string) (ipath.Resolved, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.ObjectAPI", "RmLink", trace.WithAttributes( + attribute.String("base", base.String()), + attribute.String("link", link)), + ) + defer span.End() + baseNd, err := api.core().ResolveNode(ctx, base) if err != nil { return nil, err @@ -253,10 +292,16 @@ func (api *ObjectAPI) RmLink(ctx context.Context, base ipath.Path, link string) } func (api *ObjectAPI) AppendData(ctx context.Context, path ipath.Path, r io.Reader) (ipath.Resolved, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.ObjectAPI", "AppendData", trace.WithAttributes(attribute.String("path", path.String()))) + defer span.End() + return api.patchData(ctx, path, r, true) } func (api *ObjectAPI) SetData(ctx context.Context, path ipath.Path, r io.Reader) (ipath.Resolved, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.ObjectAPI", "SetData", trace.WithAttributes(attribute.String("path", path.String()))) + defer span.End() + return api.patchData(ctx, path, r, false) } @@ -290,6 +335,12 @@ func (api *ObjectAPI) patchData(ctx context.Context, path ipath.Path, r io.Reade } func (api *ObjectAPI) Diff(ctx context.Context, before ipath.Path, after ipath.Path) ([]coreiface.ObjectChange, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.ObjectAPI", "Diff", trace.WithAttributes( + attribute.String("before", before.String()), + attribute.String("after", after.String()), + )) + defer span.End() + beforeNd, err := api.core().ResolveNode(ctx, before) if err != nil { return nil, err diff --git a/core/coreapi/path.go b/core/coreapi/path.go index b9bf83e0df64..5f2b41007897 100644 --- a/core/coreapi/path.go +++ b/core/coreapi/path.go @@ -5,8 +5,12 @@ import ( "fmt" gopath "path" + "github.com/ipfs/go-ipfs/tracing" "github.com/ipfs/go-namesys/resolve" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "github.com/ipfs/go-cid" "github.com/ipfs/go-fetcher" ipld "github.com/ipfs/go-ipld-format" @@ -19,6 +23,9 @@ import ( // ResolveNode resolves the path `p` using Unixfs resolver, gets and returns the // resolved Node. func (api *CoreAPI) ResolveNode(ctx context.Context, p path.Path) (ipld.Node, error) { + ctx, span := tracing.Span(ctx, "CoreAPI", "ResolveNode", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + rp, err := api.ResolvePath(ctx, p) if err != nil { return nil, err @@ -34,6 +41,9 @@ func (api *CoreAPI) ResolveNode(ctx context.Context, p path.Path) (ipld.Node, er // ResolvePath resolves the path `p` using Unixfs resolver, returns the // resolved path. func (api *CoreAPI) ResolvePath(ctx context.Context, p path.Path) (path.Resolved, error) { + ctx, span := tracing.Span(ctx, "CoreAPI", "ResolvePath", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + if _, ok := p.(path.Resolved); ok { return p.(path.Resolved), nil } diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 52ea6a6a453a..51667c4b71f1 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -8,15 +8,21 @@ import ( "github.com/ipfs/go-cid" offline "github.com/ipfs/go-ipfs-exchange-offline" pin "github.com/ipfs/go-ipfs-pinner" + "github.com/ipfs/go-ipfs/tracing" "github.com/ipfs/go-merkledag" coreiface "github.com/ipfs/interface-go-ipfs-core" caopts "github.com/ipfs/interface-go-ipfs-core/options" "github.com/ipfs/interface-go-ipfs-core/path" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type PinAPI CoreAPI func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOption) error { + ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Add", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + dagNode, err := api.core().ResolveNode(ctx, p) if err != nil { return fmt.Errorf("pin: %s", err) @@ -27,6 +33,8 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp return err } + span.SetAttributes(attribute.Bool("recursive", settings.Recursive)) + defer api.blockstore.PinLock(ctx).Unlock(ctx) err = api.pinning.Pin(ctx, dagNode, settings.Recursive) @@ -42,11 +50,16 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp } func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Ls") + defer span.End() + settings, err := caopts.PinLsOptions(opts...) if err != nil { return nil, err } + span.SetAttributes(attribute.String("type", settings.Type)) + switch settings.Type { case "all", "direct", "indirect", "recursive": default: @@ -57,6 +70,9 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan c } func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "IsPinned", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + resolved, err := api.core().ResolvePath(ctx, p) if err != nil { return "", false, fmt.Errorf("error resolving path: %s", err) @@ -67,6 +83,8 @@ func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.Pin return "", false, err } + span.SetAttributes(attribute.String("withtype", settings.WithType)) + mode, ok := pin.StringToMode(settings.WithType) if !ok { return "", false, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.WithType) @@ -77,6 +95,9 @@ func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.Pin // Rm pin rm api func (api *PinAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.PinRmOption) error { + ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Rm", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + rp, err := api.core().ResolvePath(ctx, p) if err != nil { return err @@ -87,6 +108,8 @@ func (api *PinAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.PinRmOpti return err } + span.SetAttributes(attribute.Bool("recursive", settings.Recursive)) + // Note: after unpin the pin sets are flushed to the blockstore, so we need // to take a lock to prevent a concurrent garbage collection defer api.blockstore.PinLock(ctx).Unlock(ctx) @@ -99,11 +122,19 @@ func (api *PinAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.PinRmOpti } func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opts ...caopts.PinUpdateOption) error { + ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Update", trace.WithAttributes( + attribute.String("from", from.String()), + attribute.String("to", to.String()), + )) + defer span.End() + settings, err := caopts.PinUpdateOptions(opts...) if err != nil { return err } + span.SetAttributes(attribute.Bool("unpin", settings.Unpin)) + fp, err := api.core().ResolvePath(ctx, from) if err != nil { return err @@ -153,6 +184,9 @@ func (n *badNode) Err() error { } func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Verify") + defer span.End() + visited := make(map[cid.Cid]*pinStatus) bs := api.blockstore DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) @@ -164,6 +198,9 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro var checkPin func(root cid.Cid) *pinStatus checkPin = func(root cid.Cid) *pinStatus { + ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Verify.CheckPin", trace.WithAttributes(attribute.String("cid", root.String()))) + defer span.End() + if status, ok := visited[root]; ok { return status } diff --git a/core/coreapi/provider.go b/core/coreapi/provider.go deleted file mode 100644 index 8148c87892ef..000000000000 --- a/core/coreapi/provider.go +++ /dev/null @@ -1,13 +0,0 @@ -package coreapi - -import ( - cid "github.com/ipfs/go-cid" -) - -// ProviderAPI brings Provider behavior to CoreAPI -type ProviderAPI CoreAPI - -// Provide the given cid using the current provider -func (api *ProviderAPI) Provide(cid cid.Cid) error { - return api.provider.Provide(cid) -} diff --git a/core/coreapi/pubsub.go b/core/coreapi/pubsub.go index a75db36296b5..99658b599522 100644 --- a/core/coreapi/pubsub.go +++ b/core/coreapi/pubsub.go @@ -4,11 +4,14 @@ import ( "context" "errors" + "github.com/ipfs/go-ipfs/tracing" coreiface "github.com/ipfs/interface-go-ipfs-core" caopts "github.com/ipfs/interface-go-ipfs-core/options" peer "github.com/libp2p/go-libp2p-core/peer" routing "github.com/libp2p/go-libp2p-core/routing" pubsub "github.com/libp2p/go-libp2p-pubsub" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type PubSubAPI CoreAPI @@ -22,6 +25,9 @@ type pubSubMessage struct { } func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) { + _, span := tracing.Span(ctx, "CoreAPI.PubSubAPI", "Ls") + defer span.End() + _, err := api.checkNode() if err != nil { return nil, err @@ -31,6 +37,9 @@ func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) { } func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) { + _, span := tracing.Span(ctx, "CoreAPI.PubSubAPI", "Peers") + defer span.End() + _, err := api.checkNode() if err != nil { return nil, err @@ -41,10 +50,15 @@ func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio return nil, err } + span.SetAttributes(attribute.String("topic", settings.Topic)) + return api.pubSub.ListPeers(settings.Topic), nil } func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) error { + _, span := tracing.Span(ctx, "CoreAPI.PubSubAPI", "Publish", trace.WithAttributes(attribute.String("topic", topic))) + defer span.End() + _, err := api.checkNode() if err != nil { return err @@ -55,6 +69,9 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er } func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) { + _, span := tracing.Span(ctx, "CoreAPI.PubSubAPI", "Subscribe", trace.WithAttributes(attribute.String("topic", topic))) + defer span.End() + // Parse the options to avoid introducing silent failures for invalid // options. However, we don't currently have any use for them. The only // subscription option, discovery, is now a no-op as it's handled by @@ -97,6 +114,9 @@ func (sub *pubSubSubscription) Close() error { } func (sub *pubSubSubscription) Next(ctx context.Context) (coreiface.PubSubMessage, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.PubSubSubscription", "Next") + defer span.End() + msg, err := sub.subscription.Next(ctx) if err != nil { return nil, err diff --git a/core/coreapi/swarm.go b/core/coreapi/swarm.go index 3c3c40ddbdce..2aea3152ca19 100644 --- a/core/coreapi/swarm.go +++ b/core/coreapi/swarm.go @@ -5,6 +5,7 @@ import ( "sort" "time" + "github.com/ipfs/go-ipfs/tracing" coreiface "github.com/ipfs/interface-go-ipfs-core" inet "github.com/libp2p/go-libp2p-core/network" peer "github.com/libp2p/go-libp2p-core/peer" @@ -12,6 +13,8 @@ import ( protocol "github.com/libp2p/go-libp2p-core/protocol" swarm "github.com/libp2p/go-libp2p-swarm" ma "github.com/multiformats/go-multiaddr" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type SwarmAPI CoreAPI @@ -30,6 +33,9 @@ const connectionManagerTag = "user-connect" const connectionManagerWeight = 100 func (api *SwarmAPI) Connect(ctx context.Context, pi peer.AddrInfo) error { + ctx, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "Connect", trace.WithAttributes(attribute.String("peerid", pi.ID.String()))) + defer span.End() + if api.peerHost == nil { return coreiface.ErrOffline } @@ -47,6 +53,9 @@ func (api *SwarmAPI) Connect(ctx context.Context, pi peer.AddrInfo) error { } func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error { + _, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "Disconnect", trace.WithAttributes(attribute.String("addr", addr.String()))) + defer span.End() + if api.peerHost == nil { return coreiface.ErrOffline } @@ -56,6 +65,8 @@ func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error { return peer.ErrInvalidAddr } + span.SetAttributes(attribute.String("peerid", id.String())) + net := api.peerHost.Network() if taddr == nil { if net.Connectedness(id) != inet.Connected { @@ -76,7 +87,10 @@ func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error { return coreiface.ErrConnNotFound } -func (api *SwarmAPI) KnownAddrs(context.Context) (map[peer.ID][]ma.Multiaddr, error) { +func (api *SwarmAPI) KnownAddrs(ctx context.Context) (map[peer.ID][]ma.Multiaddr, error) { + _, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "KnownAddrs") + defer span.End() + if api.peerHost == nil { return nil, coreiface.ErrOffline } @@ -93,7 +107,10 @@ func (api *SwarmAPI) KnownAddrs(context.Context) (map[peer.ID][]ma.Multiaddr, er return addrs, nil } -func (api *SwarmAPI) LocalAddrs(context.Context) ([]ma.Multiaddr, error) { +func (api *SwarmAPI) LocalAddrs(ctx context.Context) ([]ma.Multiaddr, error) { + _, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "LocalAddrs") + defer span.End() + if api.peerHost == nil { return nil, coreiface.ErrOffline } @@ -101,7 +118,10 @@ func (api *SwarmAPI) LocalAddrs(context.Context) ([]ma.Multiaddr, error) { return api.peerHost.Addrs(), nil } -func (api *SwarmAPI) ListenAddrs(context.Context) ([]ma.Multiaddr, error) { +func (api *SwarmAPI) ListenAddrs(ctx context.Context) ([]ma.Multiaddr, error) { + _, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "ListenAddrs") + defer span.End() + if api.peerHost == nil { return nil, coreiface.ErrOffline } @@ -109,7 +129,10 @@ func (api *SwarmAPI) ListenAddrs(context.Context) ([]ma.Multiaddr, error) { return api.peerHost.Network().InterfaceListenAddresses() } -func (api *SwarmAPI) Peers(context.Context) ([]coreiface.ConnectionInfo, error) { +func (api *SwarmAPI) Peers(ctx context.Context) ([]coreiface.ConnectionInfo, error) { + _, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "Peers") + defer span.End() + if api.peerHost == nil { return nil, coreiface.ErrOffline } diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index 55410dcb0987..a10308129a41 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -6,6 +6,9 @@ import ( "sync" "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-ipfs/core/coreunix" @@ -55,11 +58,30 @@ func getOrCreateNilNode() (*core.IpfsNode, error) { // Add builds a merkledag node from a reader, adds it to the blockstore, // and returns the key representing that node. func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options.UnixfsAddOption) (path.Resolved, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Add") + defer span.End() + settings, prefix, err := options.UnixfsAddOptions(opts...) if err != nil { return nil, err } + span.SetAttributes( + attribute.String("chunker", settings.Chunker), + attribute.Int("cidversion", settings.CidVersion), + attribute.Bool("inline", settings.Inline), + attribute.Int("inlinelimit", settings.InlineLimit), + attribute.Bool("rawleaves", settings.RawLeaves), + attribute.Bool("rawleavesset", settings.RawLeavesSet), + attribute.Int("layout", int(settings.Layout)), + attribute.Bool("pin", settings.Pin), + attribute.Bool("onlyhash", settings.OnlyHash), + attribute.Bool("fscache", settings.FsCache), + attribute.Bool("nocopy", settings.NoCopy), + attribute.Bool("silent", settings.Silent), + attribute.Bool("progress", settings.Progress), + ) + cfg, err := api.repo.Config() if err != nil { return nil, err @@ -179,6 +201,9 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options } func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Get", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + ses := api.core().getSession(ctx) nd, err := ses.ResolveNode(ctx, p) @@ -192,11 +217,16 @@ func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error) // Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format: // ` ` func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.DirEntry, error) { + ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Ls", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + settings, err := options.UnixfsLsOptions(opts...) if err != nil { return nil, err } + span.SetAttributes(attribute.Bool("resolvechildren", settings.ResolveChildren)) + ses := api.core().getSession(ctx) uses := (*UnixfsAPI)(ses) @@ -217,6 +247,12 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.Unixf } func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) coreiface.DirEntry { + ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "ProcessLink", trace.WithAttributes( + attribute.String("linkname", linkres.Link.Name), + attribute.String("cid", linkres.Link.Cid.String()), + )) + defer span.End() + if linkres.Err != nil { return coreiface.DirEntry{Err: linkres.Err} } diff --git a/core/corehttp/gateway.go b/core/corehttp/gateway.go index fb1524da529e..2e794b53ffc8 100644 --- a/core/corehttp/gateway.go +++ b/core/corehttp/gateway.go @@ -9,6 +9,7 @@ import ( version "github.com/ipfs/go-ipfs" core "github.com/ipfs/go-ipfs/core" coreapi "github.com/ipfs/go-ipfs/core/coreapi" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" options "github.com/ipfs/interface-go-ipfs-core/options" id "github.com/libp2p/go-libp2p/p2p/protocol/identify" @@ -87,12 +88,14 @@ func GatewayOption(writable bool, paths ...string) ServeOption { "X-Stream-Output", }, headers[ACEHeadersName]...)) - gateway := newGatewayHandler(GatewayConfig{ + var gateway http.Handler = newGatewayHandler(GatewayConfig{ Headers: headers, Writable: writable, PathPrefixes: cfg.Gateway.PathPrefixes, }, api) + gateway = otelhttp.NewHandler(gateway, "Gateway.Request") + for _, p := range paths { mux.Handle(p+"/", gateway) } diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index f5ee54d8cfd1..d3ef3919b422 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -21,6 +21,7 @@ import ( "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" assets "github.com/ipfs/go-ipfs/assets" + "github.com/ipfs/go-ipfs/tracing" dag "github.com/ipfs/go-merkledag" mfs "github.com/ipfs/go-mfs" path "github.com/ipfs/go-path" @@ -29,6 +30,7 @@ import ( ipath "github.com/ipfs/interface-go-ipfs-core/path" routing "github.com/libp2p/go-libp2p-core/routing" prometheus "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel" ) const ( @@ -136,6 +138,7 @@ func (i *gatewayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // the hour is a hard fallback, we don't expect it to happen, but just in case ctx, cancel := context.WithTimeout(r.Context(), time.Hour) defer cancel() + r = r.WithContext(ctx) defer func() { @@ -194,6 +197,9 @@ func (i *gatewayHandler) optionsHandler(w http.ResponseWriter, r *http.Request) } func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request) { + ctx, span := tracing.Span(r.Context(), "Gateway", "GetOrHeadHandler") + defer span.End() + begin := time.Now() urlPath := r.URL.Path escapedURLPath := r.URL.EscapedPath() @@ -271,7 +277,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request } // Resolve path to the final DAG node for the ETag - resolvedPath, err := i.api.ResolvePath(r.Context(), parsedPath) + resolvedPath, err := i.api.ResolvePath(ctx, parsedPath) switch err { case nil: case coreiface.ErrOffline: @@ -286,7 +292,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request return } - dr, err := i.api.Unixfs().Get(r.Context(), resolvedPath) + dr, err := i.api.Unixfs().Get(ctx, resolvedPath) if err != nil { webError(w, "ipfs cat "+escapedURLPath, err, http.StatusNotFound) return @@ -345,7 +351,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request } else { name = getFilename(urlPath) } - i.serveFile(w, r, name, modtime, f) + i.serveFile(ctx, w, r, name, modtime, f) return } dir, ok := dr.(files.Directory) @@ -354,7 +360,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request return } - idx, err := i.api.Unixfs().Get(r.Context(), ipath.Join(resolvedPath, "index.html")) + idx, err := i.api.Unixfs().Get(ctx, ipath.Join(resolvedPath, "index.html")) switch err.(type) { case nil: dirwithoutslash := urlPath[len(urlPath)-1] != '/' @@ -377,7 +383,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request } // write to request - i.serveFile(w, r, "index.html", modtime, f) + i.serveFile(ctx, w, r, "index.html", modtime, f) return case resolver.ErrNoLink: // no index.html; noop @@ -403,6 +409,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request } // storage for directory listing + span.AddEvent("DirListingStart") var dirListing []directoryItem dirit := dir.Entries() for dirit.Next() { @@ -412,7 +419,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request size = humanize.Bytes(uint64(s)) } - resolved, err := i.api.ResolvePath(r.Context(), ipath.Join(resolvedPath, dirit.Name())) + resolved, err := i.api.ResolvePath(ctx, ipath.Join(resolvedPath, dirit.Name())) if err != nil { internalWebError(w, err) return @@ -429,6 +436,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request } dirListing = append(dirListing, di) } + span.AddEvent("DirListingEnd") if dirit.Err() != nil { internalWebError(w, dirit.Err()) return @@ -470,7 +478,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request var gwURL string // Get gateway hostname and build gateway URL. - if h, ok := r.Context().Value("gw-hostname").(string); ok { + if h, ok := ctx.Value("gw-hostname").(string); ok { gwURL = "//" + h } else { gwURL = "" @@ -497,7 +505,11 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request } } -func (i *gatewayHandler) serveFile(w http.ResponseWriter, req *http.Request, name string, modtime time.Time, file files.File) { +func (i *gatewayHandler) serveFile(ctx context.Context, w http.ResponseWriter, req *http.Request, name string, modtime time.Time, file files.File) { + ctx, span := tracing.Span(ctx, "Gateway", "ServeFile") + defer span.End() + req = req.WithContext(ctx) + size, err := file.Size() if err != nil { http.Error(w, "cannot serve files with unknown sizes", http.StatusBadGateway) @@ -577,7 +589,10 @@ func (i *gatewayHandler) servePretty404IfPresent(w http.ResponseWriter, r *http. } func (i *gatewayHandler) postHandler(w http.ResponseWriter, r *http.Request) { - p, err := i.api.Unixfs().Add(r.Context(), files.NewReaderFile(r.Body)) + ctx, span := tracing.Span(r.Context(), "Gateway", "PostHandler") + defer span.End() + + p, err := i.api.Unixfs().Add(ctx, files.NewReaderFile(r.Body)) if err != nil { internalWebError(w, err) return @@ -589,7 +604,9 @@ func (i *gatewayHandler) postHandler(w http.ResponseWriter, r *http.Request) { } func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() + ctx, span := tracing.Span(r.Context(), "Gateway", "PutHandler") + defer span.End() + ds := i.api.Dag() // Parse the path @@ -681,7 +698,8 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) { } func (i *gatewayHandler) deleteHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() + ctx, span := otel.Tracer("").Start(r.Context(), "gatewayHandler-deleteHandler") + defer span.End() // parse the path diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 387a977784d8..532b1bfbaab7 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -24,8 +24,11 @@ import ( "github.com/ipfs/go-unixfs/importer/trickle" coreiface "github.com/ipfs/interface-go-ipfs-core" "github.com/ipfs/interface-go-ipfs-core/path" + "go.opentelemetry.io/otel" ) +const pkg = "go-ipfs/core/coreunix" + var log = logging.Logger("coreunix") // how many bytes of progress to wait before sending a progress update message @@ -158,20 +161,23 @@ func (adder *Adder) curRootNode() (ipld.Node, error) { // Recursively pins the root node of Adder and // writes the pin state to the backing datastore. -func (adder *Adder) PinRoot(root ipld.Node) error { +func (adder *Adder) PinRoot(ctx context.Context, root ipld.Node) error { + ctx, span := otel.Tracer(pkg).Start(ctx, "Adder.PinRoot") + defer span.End() + if !adder.Pin { return nil } rnk := root.Cid() - err := adder.dagService.Add(adder.ctx, root) + err := adder.dagService.Add(ctx, root) if err != nil { return err } if adder.tempRoot.Defined() { - err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true) + err := adder.pinning.Unpin(ctx, adder.tempRoot, true) if err != nil { return err } @@ -179,7 +185,7 @@ func (adder *Adder) PinRoot(root ipld.Node) error { } adder.pinning.PinWithMode(rnk, pin.Recursive) - return adder.pinning.Flush(adder.ctx) + return adder.pinning.Flush(ctx) } func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error { @@ -255,6 +261,9 @@ func (adder *Adder) addNode(node ipld.Node, path string) error { // AddAllAndPin adds the given request's files and pin them. func (adder *Adder) AddAllAndPin(ctx context.Context, file files.Node) (ipld.Node, error) { + ctx, span := otel.Tracer(pkg).Start(ctx, "Adder.AddAllAndPin") + defer span.End() + if adder.Pin { adder.unlocker = adder.gcLocker.PinLock(ctx) } @@ -330,10 +339,13 @@ func (adder *Adder) AddAllAndPin(ctx context.Context, file files.Node) (ipld.Nod if !adder.Pin { return nd, nil } - return nd, adder.PinRoot(nd) + return nd, adder.PinRoot(ctx, nd) } func (adder *Adder) addFileNode(ctx context.Context, path string, file files.Node, toplevel bool) error { + ctx, span := otel.Tracer(pkg).Start(ctx, "Adder.addFileNode") + defer span.End() + defer file.Close() err := adder.maybePauseForGC(ctx) @@ -436,13 +448,16 @@ func (adder *Adder) addDir(ctx context.Context, path string, dir files.Directory } func (adder *Adder) maybePauseForGC(ctx context.Context) error { + ctx, span := otel.Tracer(pkg).Start(ctx, "Adder.maybePauseForGC") + defer span.End() + if adder.unlocker != nil && adder.gcLocker.GCRequested(ctx) { rn, err := adder.curRootNode() if err != nil { return err } - err = adder.PinRoot(rn) + err = adder.PinRoot(ctx, rn) if err != nil { return err } diff --git a/go.mod b/go.mod index 0ea335814d6b..6a72f3ab110e 100644 --- a/go.mod +++ b/go.mod @@ -104,6 +104,14 @@ require ( github.com/whyrusleeping/go-sysinfo v0.0.0-20190219211824-4a357d4b90b1 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 go.opencensus.io v0.23.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.27.0 + go.opentelemetry.io/otel v1.2.0 + go.opentelemetry.io/otel/exporters/jaeger v1.2.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.2.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.2.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.2.0 + go.opentelemetry.io/otel/sdk v1.2.0 + go.opentelemetry.io/otel/trace v1.2.0 go.uber.org/fx v1.15.0 go.uber.org/zap v1.19.1 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 diff --git a/go.sum b/go.sum index 49a1addffe43..c3fbaa233031 100644 --- a/go.sum +++ b/go.sum @@ -117,6 +117,8 @@ github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7 github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= +github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/ceramicnetwork/go-dag-jose v0.1.0 h1:yJ/HVlfKpnD3LdYP03AHyTvbm3BpPiz2oZiOeReJRdU= github.com/ceramicnetwork/go-dag-jose v0.1.0/go.mod h1:qYA1nYt0X8u4XoMAVoOV3upUVKtrxy/I670Dg5F0wjI= @@ -135,7 +137,11 @@ github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= @@ -194,12 +200,15 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= +github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5 h1:BBso6MBKW8ncyZLv37o+KNyy0HrrHgfnOaGQC2qvN+A= github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5/go.mod h1:JpoxHjuQauoxiFMl1ie8Xc/7TfLuMZ5eOCONd1sUBHg= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= +github.com/felixge/httpsnoop v1.0.2 h1:+nS9g82KMXccJ/wp0zyRW9ZBHFETmMGtkk+2CTTrW4o= +github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= @@ -344,6 +353,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= @@ -1328,6 +1338,7 @@ github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3 github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -1402,15 +1413,35 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.27.0 h1:0BgiNWjN7rUWO9HdjF4L12r8OW86QkVQcYmCjnayJLo= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.27.0/go.mod h1:bdvm3YpMxWAgEfQhtTBaVR8ceXPRuRBSQrvOBnIlHxc= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= -go.opentelemetry.io/otel/metric v0.20.0 h1:4kzhXFP+btKm4jwxpjIqjs41A7MakRFUS86bqLHTIw8= +go.opentelemetry.io/otel v1.2.0 h1:YOQDvxO1FayUcT9MIhJhgMyNO1WqoduiyvQHzGN0kUQ= +go.opentelemetry.io/otel v1.2.0/go.mod h1:aT17Fk0Z1Nor9e0uisf98LrntPGMnk4frBO9+dkf69I= +go.opentelemetry.io/otel/exporters/jaeger v1.2.0 h1:C/5Egj3MJBXRJi22cSl07suqPqtZLnLFmH//OxETUEc= +go.opentelemetry.io/otel/exporters/jaeger v1.2.0/go.mod h1:KJLFbEMKTNPIfOxcg/WikIozEoKcPgJRz3Ce1vLlM8E= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.2.0 h1:xzbcGykysUh776gzD1LUPsNNHKWN0kQWDnJhn1ddUuk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.2.0/go.mod h1:14T5gr+Y6s2AgHPqBMgnGwp04csUjQmYXFWPeiBoq5s= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.2.0 h1:VsgsSCDwOSuO8eMVh63Cd4nACMqgjpmAeJSIvVNneD0= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.2.0/go.mod h1:9mLBBnPRf3sf+ASVH2p9xREXVBvwib02FxcKnavtExg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.2.0 h1:j/jXNzS6Dy0DFgO/oyCvin4H7vTQBg2Vdi6idIzWhCI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.2.0/go.mod h1:k5GnE4m4Jyy2DNh6UAzG6Nml51nuqQyszV7O1ksQAnE= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.2.0 h1:OiYdrCq1Ctwnovp6EofSPwlp5aGy4LgKNbkg7PtEUw8= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.2.0/go.mod h1:DUFCmFkXr0VtAHl5Zq2JRx24G6ze5CAq8YfdD36RdX8= +go.opentelemetry.io/otel/internal/metric v0.25.0 h1:w/7RXe16WdPylaIXDgcYM6t/q0K5lXgSdZOEbIEyliE= +go.opentelemetry.io/otel/internal/metric v0.25.0/go.mod h1:Nhuw26QSX7d6n4duoqAFi5KOQR4AuzyMcl5eXOgwxtc= go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= -go.opentelemetry.io/otel/oteltest v0.20.0 h1:HiITxCawalo5vQzdHfKeZurV8x7ljcqAgiWzF6Vaeaw= +go.opentelemetry.io/otel/metric v0.25.0 h1:7cXOnCADUsR3+EOqxPaSKwhEuNu0gz/56dRN1hpIdKw= +go.opentelemetry.io/otel/metric v0.25.0/go.mod h1:E884FSpQfnJOMMUaq+05IWlJ4rjZpk2s/F1Ju+TEEm8= go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= -go.opentelemetry.io/otel/trace v0.20.0 h1:1DL6EXUdcg95gukhuRRvLDO/4X5THh/5dIV52lqtnbw= +go.opentelemetry.io/otel/sdk v1.2.0 h1:wKN260u4DesJYhyjxDa7LRFkuhH7ncEVKU37LWcyNIo= +go.opentelemetry.io/otel/sdk v1.2.0/go.mod h1:jNN8QtpvbsKhgaC6V5lHiejMoKD+V8uadoSafgHPx1U= go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= +go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0= +go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v0.10.0 h1:n7brgtEbDvXEgGyKKo8SobKT1e9FewlDtXzkVP5djoE= +go.opentelemetry.io/proto/otlp v0.10.0/go.mod h1:zG20xCK0szZ1xdokeSOwEcmlXu+x9kkdRe6N1DhKcfU= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -1665,6 +1696,7 @@ golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210426080607-c94f62235c83/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1843,8 +1875,10 @@ google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= +google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A= +google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/tracing/tracing.go b/tracing/tracing.go new file mode 100644 index 000000000000..b9ff92035ce9 --- /dev/null +++ b/tracing/tracing.go @@ -0,0 +1,134 @@ +package tracing + +import ( + "context" + "fmt" + "os" + "strconv" + + version "github.com/ipfs/go-ipfs" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.7.0" + traceapi "go.opentelemetry.io/otel/trace" +) + +var exporterBuilders = map[string]func(context.Context, string) (trace.SpanExporter, error){ + "IPFS_TRACING_JAEGER": func(ctx context.Context, s string) (trace.SpanExporter, error) { + return jaeger.New(jaeger.WithCollectorEndpoint()) + }, + "IPFS_TRACING_FILE": func(ctx context.Context, s string) (trace.SpanExporter, error) { + return newFileExporter(s) + }, + "IPFS_TRACING_OTLP_HTTP": func(ctx context.Context, s string) (trace.SpanExporter, error) { + return otlptracehttp.New(ctx) + }, + "IPFS_TRACING_OTLP_GRPC": func(ctx context.Context, s string) (trace.SpanExporter, error) { + return otlptracegrpc.New(ctx) + }, +} + +// fileExporter wraps a file-writing exporter and closes the file when the exporter is shutdown. +type fileExporter struct { + file *os.File + writerExporter *stdouttrace.Exporter +} + +var _ trace.SpanExporter = &fileExporter{} + +func newFileExporter(file string) (*fileExporter, error) { + f, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return nil, fmt.Errorf("opening %s: %w", file, err) + } + stdoutExporter, err := stdouttrace.New(stdouttrace.WithWriter(f)) + if err != nil { + return nil, err + } + return &fileExporter{ + writerExporter: stdoutExporter, + file: f, + }, nil +} + +func (e *fileExporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error { + return e.writerExporter.ExportSpans(ctx, spans) +} +func (e *fileExporter) Shutdown(ctx context.Context) error { + if err := e.writerExporter.Shutdown(ctx); err != nil { + return err + } + if err := e.file.Close(); err != nil { + return fmt.Errorf("closing trace file: %w", err) + } + return nil +} + +// noopShutdownTracerProvider wraps a TracerProvider with a no-op Shutdown method. +type noopShutdownTracerProvider struct { + tp traceapi.TracerProvider +} + +func (n *noopShutdownTracerProvider) Shutdown(ctx context.Context) error { + return nil +} +func (n *noopShutdownTracerProvider) Tracer(instrumentationName string, opts ...traceapi.TracerOption) traceapi.Tracer { + return n.tp.Tracer(instrumentationName, opts...) +} + +type ShutdownTracerProvider interface { + traceapi.TracerProvider + Shutdown(ctx context.Context) error +} + +func NewTracerProvider(ctx context.Context) (ShutdownTracerProvider, error) { + if os.Getenv("IPFS_TRACING") == "" { + return &noopShutdownTracerProvider{tp: traceapi.NewNoopTracerProvider()}, nil + } + + options := []trace.TracerProviderOption{} + + traceRatio := 1.0 + if envRatio := os.Getenv("IPFS_TRACING_RATIO"); envRatio != "" { + r, err := strconv.ParseFloat(envRatio, 64) + if err == nil { + traceRatio = r + } + } + options = append(options, trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(traceRatio)))) + + r, err := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("go-ipfs"), + semconv.ServiceVersionKey.String(version.CurrentVersionNumber), + ), + ) + if err != nil { + return nil, err + } + options = append(options, trace.WithResource(r)) + + for envVar, builder := range exporterBuilders { + if val := os.Getenv(envVar); val != "" { + exporter, err := builder(ctx, val) + if err != nil { + return nil, err + } + options = append(options, trace.WithBatcher(exporter)) + } + } + + return trace.NewTracerProvider(options...), nil +} + +// Span creates a span using the standard IPFS tracing conventions. +func Span(ctx context.Context, componentName string, spanName string, opts ...traceapi.SpanStartOption) (context.Context, traceapi.Span) { + return otel.Tracer("go-ipfs").Start(ctx, fmt.Sprintf("%s.%s", componentName, spanName), opts...) +}