Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic gateway tracing #8595

Merged
merged 13 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions cmd/ipfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
loader "github.com/ipfs/go-ipfs/plugin/loader"
repo "github.com/ipfs/go-ipfs/repo"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
"github.com/ipfs/go-ipfs/tracing"
"go.opentelemetry.io/otel"

cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/go-ipfs-cmds/cli"
Expand Down Expand Up @@ -70,21 +72,30 @@ func main() {
os.Exit(mainRet())
}

func mainRet() int {
func printErr(err error) int {
fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
return 1
}

func mainRet() (exitCode int) {
rand.Seed(time.Now().UnixNano())
ctx := logging.ContextWithLoggable(context.Background(), loggables.Uuid("session"))
var err error

// we'll call this local helper to output errors.
// this is so we control how to print errors in one place.
printErr := func(err error) {
fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
tp, err := tracing.NewTracerProvider(ctx)
if err != nil {
return printErr(err)
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
exitCode = printErr(err)
}
}()
otel.SetTracerProvider(tp)

stopFunc, err := profileIfEnabled()
if err != nil {
printErr(err)
return 1
return printErr(err)
}
defer stopFunc() // to be executed as late as possible

Expand Down
14 changes: 14 additions & 0 deletions core/coreapi/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
coreiface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
path "github.com/ipfs/interface-go-ipfs-core/path"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

util "github.com/ipfs/go-ipfs/blocks/blockstoreutil"
"github.com/ipfs/go-ipfs/tracing"
)

type BlockAPI CoreAPI
Expand All @@ -25,6 +28,9 @@ type BlockStat struct {
}

func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.BlockPutOption) (coreiface.BlockStat, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.BlockAPI", "Put")
defer span.End()

settings, pref, err := caopts.BlockPutOptions(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -65,6 +71,8 @@ func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Bloc
}

func (api *BlockAPI) Get(ctx context.Context, p path.Path) (io.Reader, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.BlockAPI", "Get", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()
rp, err := api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
Expand All @@ -79,6 +87,9 @@ func (api *BlockAPI) Get(ctx context.Context, p path.Path) (io.Reader, error) {
}

func (api *BlockAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.BlockRmOption) error {
ctx, span := tracing.Span(ctx, "CoreAPI.BlockAPI", "Rm", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()

rp, err := api.core().ResolvePath(ctx, p)
if err != nil {
return err
Expand Down Expand Up @@ -119,6 +130,9 @@ func (api *BlockAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.BlockRm
}

func (api *BlockAPI) Stat(ctx context.Context, p path.Path) (coreiface.BlockStat, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.BlockAPI", "Stat", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()

rp, err := api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
Expand Down
7 changes: 7 additions & 0 deletions core/coreapi/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (

cid "github.com/ipfs/go-cid"
pin "github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs/tracing"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type dagAPI struct {
Expand All @@ -18,6 +21,8 @@ type dagAPI struct {
type pinningAdder CoreAPI

func (adder *pinningAdder) Add(ctx context.Context, nd ipld.Node) error {
ctx, span := tracing.Span(ctx, "CoreAPI.PinningAdder", "Add", trace.WithAttributes(attribute.String("node", nd.String())))
defer span.End()
defer adder.blockstore.PinLock(ctx).Unlock(ctx)

if err := adder.dag.Add(ctx, nd); err != nil {
Expand All @@ -30,6 +35,8 @@ func (adder *pinningAdder) Add(ctx context.Context, nd ipld.Node) error {
}

func (adder *pinningAdder) AddMany(ctx context.Context, nds []ipld.Node) error {
ctx, span := tracing.Span(ctx, "CoreAPI.PinningAdder", "AddMany", trace.WithAttributes(attribute.Int("nodes.count", len(nds))))
defer span.End()
defer adder.blockstore.PinLock(ctx).Unlock(ctx)

if err := adder.dag.AddMany(ctx, nds); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions core/coreapi/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,22 @@ import (
cidutil "github.com/ipfs/go-cidutil"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-ipfs/tracing"
dag "github.com/ipfs/go-merkledag"
coreiface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
path "github.com/ipfs/interface-go-ipfs-core/path"
peer "github.com/libp2p/go-libp2p-core/peer"
routing "github.com/libp2p/go-libp2p-core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type DhtAPI CoreAPI

func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "FindPeer", trace.WithAttributes(attribute.String("peer", p.String())))
defer span.End()
err := api.checkOnline(false)
if err != nil {
return peer.AddrInfo{}, err
Expand All @@ -34,10 +39,14 @@ func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, erro
}

func (api *DhtAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.AddrInfo, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "FindProviders", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()

settings, err := caopts.DhtFindProvidersOptions(opts...)
if err != nil {
return nil, err
}
span.SetAttributes(attribute.Int("numproviders", settings.NumProviders))

err = api.checkOnline(false)
if err != nil {
Expand All @@ -59,10 +68,14 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopt
}

func (api *DhtAPI) Provide(ctx context.Context, path path.Path, opts ...caopts.DhtProvideOption) error {
ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "Provide", trace.WithAttributes(attribute.String("path", path.String())))
defer span.End()

settings, err := caopts.DhtProvideOptions(opts...)
if err != nil {
return err
}
span.SetAttributes(attribute.Bool("recursive", settings.Recursive))

err = api.checkOnline(false)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions core/coreapi/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"fmt"
"sort"

"github.com/ipfs/go-ipfs/tracing"
ipfspath "github.com/ipfs/go-path"
coreiface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
path "github.com/ipfs/interface-go-ipfs-core/path"
crypto "github.com/libp2p/go-libp2p-core/crypto"
peer "github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type KeyAPI CoreAPI
Expand Down Expand Up @@ -40,6 +43,9 @@ func (k *key) ID() peer.ID {
// Generate generates new key, stores it in the keystore under the specified
// name and returns a base58 encoded multihash of its public key.
func (api *KeyAPI) Generate(ctx context.Context, name string, opts ...caopts.KeyGenerateOption) (coreiface.Key, error) {
_, span := tracing.Span(ctx, "CoreAPI.KeyAPI", "Generate", trace.WithAttributes(attribute.String("name", name)))
defer span.End()

options, err := caopts.KeyGenerateOptions(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -97,6 +103,9 @@ func (api *KeyAPI) Generate(ctx context.Context, name string, opts ...caopts.Key

// List returns a list keys stored in keystore.
func (api *KeyAPI) List(ctx context.Context) ([]coreiface.Key, error) {
_, span := tracing.Span(ctx, "CoreAPI.KeyAPI", "List")
defer span.End()

keys, err := api.repo.Keystore().List()
if err != nil {
return nil, err
Expand Down Expand Up @@ -128,10 +137,14 @@ func (api *KeyAPI) List(ctx context.Context) ([]coreiface.Key, error) {
// Rename renames `oldName` to `newName`. Returns the key and whether another
// key was overwritten, or an error.
func (api *KeyAPI) Rename(ctx context.Context, oldName string, newName string, opts ...caopts.KeyRenameOption) (coreiface.Key, bool, error) {
_, span := tracing.Span(ctx, "CoreAPI.KeyAPI", "Rename", trace.WithAttributes(attribute.String("oldname", oldName), attribute.String("newname", newName)))
defer span.End()

options, err := caopts.KeyRenameOptions(opts...)
if err != nil {
return nil, false, err
}
span.SetAttributes(attribute.Bool("force", options.Force))

ks := api.repo.Keystore()

Expand Down Expand Up @@ -187,6 +200,9 @@ func (api *KeyAPI) Rename(ctx context.Context, oldName string, newName string, o

// Remove removes keys from keystore. Returns ipns path of the removed key.
func (api *KeyAPI) Remove(ctx context.Context, name string) (coreiface.Key, error) {
_, span := tracing.Span(ctx, "CoreAPI.KeyAPI", "Remove", trace.WithAttributes(attribute.String("name", name)))
defer span.End()

ks := api.repo.Keystore()

if name == "self" {
Expand Down
24 changes: 23 additions & 1 deletion core/coreapi/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"strings"
"time"

"github.com/ipfs/go-ipfs-keystore"
keystore "github.com/ipfs/go-ipfs-keystore"
"github.com/ipfs/go-ipfs/tracing"
"github.com/ipfs/go-namesys"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

ipath "github.com/ipfs/go-path"
coreiface "github.com/ipfs/interface-go-ipfs-core"
Expand Down Expand Up @@ -36,6 +39,9 @@ func (e *ipnsEntry) Value() path.Path {

// Publish announces new IPNS name and returns the new IPNS entry.
func (api *NameAPI) Publish(ctx context.Context, p path.Path, opts ...caopts.NamePublishOption) (coreiface.IpnsEntry, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.NameAPI", "Publish", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()

if err := api.checkPublishAllowed(); err != nil {
return nil, err
}
Expand All @@ -44,6 +50,14 @@ func (api *NameAPI) Publish(ctx context.Context, p path.Path, opts ...caopts.Nam
if err != nil {
return nil, err
}
span.SetAttributes(
attribute.Bool("allowoffline", options.AllowOffline),
attribute.String("key", options.Key),
attribute.Float64("validtime", options.ValidTime.Seconds()),
)
if options.TTL != nil {
span.SetAttributes(attribute.Float64("ttl", options.TTL.Seconds()))
}

err = api.checkOnline(options.AllowOffline)
if err != nil {
Expand Down Expand Up @@ -82,11 +96,16 @@ func (api *NameAPI) Publish(ctx context.Context, p path.Path, opts ...caopts.Nam
}

func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.NameResolveOption) (<-chan coreiface.IpnsResult, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.NameAPI", "Search", trace.WithAttributes(attribute.String("name", name)))
defer span.End()

options, err := caopts.NameResolveOptions(opts...)
if err != nil {
return nil, err
}

span.SetAttributes(attribute.Bool("cache", options.Cache))

err = api.checkOnline(true)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,6 +143,9 @@ func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.Name
// Resolve attempts to resolve the newest version of the specified name and
// returns its path.
func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.NameResolveOption) (path.Path, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.NameAPI", "Resolve", trace.WithAttributes(attribute.String("name", name)))
defer span.End()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down
Loading