Skip to content

Commit

Permalink
feat: add basic gateway tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert committed Dec 8, 2021
1 parent 72656ea commit 66b4fe3
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 31 deletions.
28 changes: 21 additions & 7 deletions cmd/ipfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
loader "github.com/ipfs/go-ipfs/plugin/loader"
repo "github.com/ipfs/go-ipfs/repo"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
"github.com/ipfs/go-ipfs/tracing"
"go.opentelemetry.io/otel"

cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/go-ipfs-cmds/cli"
Expand Down Expand Up @@ -71,21 +73,33 @@ func main() {
os.Exit(mainRet())
}

func mainRet() int {
func printErr(err error) int {
fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
return 1
}

func mainRet() (exitCode int) {
rand.Seed(time.Now().UnixNano())
ctx := logging.ContextWithLoggable(context.Background(), loggables.Uuid("session"))
var err error

// we'll call this local helper to output errors.
// this is so we control how to print errors in one place.
printErr := func(err error) {
fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
tp, err := tracing.NewTracerProvider(ctx)
if err != nil {
return printErr(err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
exitCode = printErr(err)
}
}()
otel.SetTracerProvider(tp)

ctx, span := otel.Tracer("").Start(ctx, "Run")
defer span.End()

stopFunc, err := profileIfEnabled()
if err != nil {
printErr(err)
return 1
return printErr(err)
}
defer stopFunc() // to be executed as late as possible

Expand Down
15 changes: 15 additions & 0 deletions core/coreapi/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
gopath "path"

"github.com/ipfs/go-namesys/resolve"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-fetcher"
Expand All @@ -19,6 +22,12 @@ import (
// ResolveNode resolves the path `p` using Unixfs resolver, gets and returns the
// resolved Node.
func (api *CoreAPI) ResolveNode(ctx context.Context, p path.Path) (ipld.Node, error) {
ctx, span := otel.Tracer("").Start(
ctx,
"CoreAPI-ResolveNode",
trace.WithAttributes(attribute.String("Path", p.String())),
)
defer span.End()
rp, err := api.ResolvePath(ctx, p)
if err != nil {
return nil, err
Expand All @@ -34,6 +43,12 @@ func (api *CoreAPI) ResolveNode(ctx context.Context, p path.Path) (ipld.Node, er
// ResolvePath resolves the path `p` using Unixfs resolver, returns the
// resolved path.
func (api *CoreAPI) ResolvePath(ctx context.Context, p path.Path) (path.Resolved, error) {
ctx, span := otel.Tracer("").Start(
ctx,
"CoreAPI-ResolvePath",
trace.WithAttributes(attribute.String("Path", p.String())),
)
defer span.End()
if _, ok := p.(path.Resolved); ok {
return p.(path.Resolved), nil
}
Expand Down
6 changes: 6 additions & 0 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/ipfs/go-ipfs/core"
"go.opentelemetry.io/otel"

"github.com/ipfs/go-ipfs/core/coreunix"

Expand Down Expand Up @@ -55,6 +56,9 @@ func getOrCreateNilNode() (*core.IpfsNode, error) {
// Add builds a merkledag node from a reader, adds it to the blockstore,
// and returns the key representing that node.
func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options.UnixfsAddOption) (path.Resolved, error) {
ctx, span := otel.Tracer("").Start(ctx, "UnixfsAPI-Add")
defer span.End()

settings, prefix, err := options.UnixfsAddOptions(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -179,6 +183,8 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
}

func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error) {
ctx, span := otel.Tracer("").Start(ctx, "UnixFSGet")
defer span.End()
ses := api.core().getSession(ctx)

nd, err := ses.ResolveNode(ctx, p)
Expand Down
5 changes: 4 additions & 1 deletion core/corehttp/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
version "github.com/ipfs/go-ipfs"
core "github.com/ipfs/go-ipfs/core"
coreapi "github.com/ipfs/go-ipfs/core/coreapi"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

options "github.com/ipfs/interface-go-ipfs-core/options"
id "github.com/libp2p/go-libp2p/p2p/protocol/identify"
Expand Down Expand Up @@ -87,12 +88,14 @@ func GatewayOption(writable bool, paths ...string) ServeOption {
"X-Stream-Output",
}, headers[ACEHeadersName]...))

gateway := newGatewayHandler(GatewayConfig{
var gateway http.Handler = newGatewayHandler(GatewayConfig{
Headers: headers,
Writable: writable,
PathPrefixes: cfg.Gateway.PathPrefixes,
}, api)

gateway = otelhttp.NewHandler(gateway, "GatewayRequest")

for _, p := range paths {
mux.Handle(p+"/", gateway)
}
Expand Down
39 changes: 28 additions & 11 deletions core/corehttp/gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
ipath "github.com/ipfs/interface-go-ipfs-core/path"
routing "github.com/libp2p/go-libp2p-core/routing"
prometheus "github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
)

const (
Expand Down Expand Up @@ -136,6 +137,7 @@ func (i *gatewayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// the hour is a hard fallback, we don't expect it to happen, but just in case
ctx, cancel := context.WithTimeout(r.Context(), time.Hour)
defer cancel()

r = r.WithContext(ctx)

defer func() {
Expand Down Expand Up @@ -194,6 +196,9 @@ func (i *gatewayHandler) optionsHandler(w http.ResponseWriter, r *http.Request)
}

func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Tracer("").Start(r.Context(), "gatewayHandler-getOrHeadHandler")
defer span.End()

begin := time.Now()
urlPath := r.URL.Path
escapedURLPath := r.URL.EscapedPath()
Expand Down Expand Up @@ -271,7 +276,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
}

// Resolve path to the final DAG node for the ETag
resolvedPath, err := i.api.ResolvePath(r.Context(), parsedPath)
resolvedPath, err := i.api.ResolvePath(ctx, parsedPath)
switch err {
case nil:
case coreiface.ErrOffline:
Expand All @@ -286,7 +291,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
return
}

dr, err := i.api.Unixfs().Get(r.Context(), resolvedPath)
dr, err := i.api.Unixfs().Get(ctx, resolvedPath)
if err != nil {
webError(w, "ipfs cat "+escapedURLPath, err, http.StatusNotFound)
return
Expand Down Expand Up @@ -345,7 +350,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
} else {
name = getFilename(urlPath)
}
i.serveFile(w, r, name, modtime, f)
i.serveFile(ctx, w, r, name, modtime, f)
return
}
dir, ok := dr.(files.Directory)
Expand All @@ -354,7 +359,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
return
}

idx, err := i.api.Unixfs().Get(r.Context(), ipath.Join(resolvedPath, "index.html"))
idx, err := i.api.Unixfs().Get(ctx, ipath.Join(resolvedPath, "index.html"))
switch err.(type) {
case nil:
dirwithoutslash := urlPath[len(urlPath)-1] != '/'
Expand All @@ -377,7 +382,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
}

// write to request
i.serveFile(w, r, "index.html", modtime, f)
i.serveFile(ctx, w, r, "index.html", modtime, f)
return
case resolver.ErrNoLink:
// no index.html; noop
Expand All @@ -403,6 +408,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
}

// storage for directory listing
span.AddEvent("DirListingStart")
var dirListing []directoryItem
dirit := dir.Entries()
for dirit.Next() {
Expand All @@ -412,7 +418,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
size = humanize.Bytes(uint64(s))
}

resolved, err := i.api.ResolvePath(r.Context(), ipath.Join(resolvedPath, dirit.Name()))
resolved, err := i.api.ResolvePath(ctx, ipath.Join(resolvedPath, dirit.Name()))
if err != nil {
internalWebError(w, err)
return
Expand All @@ -429,6 +435,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
}
dirListing = append(dirListing, di)
}
span.AddEvent("DirListingEnd")
if dirit.Err() != nil {
internalWebError(w, dirit.Err())
return
Expand Down Expand Up @@ -470,7 +477,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
var gwURL string

// Get gateway hostname and build gateway URL.
if h, ok := r.Context().Value("gw-hostname").(string); ok {
if h, ok := ctx.Value("gw-hostname").(string); ok {
gwURL = "//" + h
} else {
gwURL = ""
Expand All @@ -497,7 +504,11 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
}
}

func (i *gatewayHandler) serveFile(w http.ResponseWriter, req *http.Request, name string, modtime time.Time, file files.File) {
func (i *gatewayHandler) serveFile(ctx context.Context, w http.ResponseWriter, req *http.Request, name string, modtime time.Time, file files.File) {
ctx, span := otel.Tracer("").Start(ctx, "gatewayHandler-serveFile")
defer span.End()
req = req.WithContext(ctx)

size, err := file.Size()
if err != nil {
http.Error(w, "cannot serve files with unknown sizes", http.StatusBadGateway)
Expand Down Expand Up @@ -577,7 +588,10 @@ func (i *gatewayHandler) servePretty404IfPresent(w http.ResponseWriter, r *http.
}

func (i *gatewayHandler) postHandler(w http.ResponseWriter, r *http.Request) {
p, err := i.api.Unixfs().Add(r.Context(), files.NewReaderFile(r.Body))
ctx, span := otel.Tracer("").Start(r.Context(), "gatewayHandler-postHandler")
defer span.End()

p, err := i.api.Unixfs().Add(ctx, files.NewReaderFile(r.Body))
if err != nil {
internalWebError(w, err)
return
Expand All @@ -589,7 +603,9 @@ func (i *gatewayHandler) postHandler(w http.ResponseWriter, r *http.Request) {
}

func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, span := otel.Tracer("").Start(r.Context(), "gatewayHandler-putHandler")
defer span.End()

ds := i.api.Dag()

// Parse the path
Expand Down Expand Up @@ -681,7 +697,8 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) {
}

func (i *gatewayHandler) deleteHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, span := otel.Tracer("").Start(r.Context(), "gatewayHandler-deleteHandler")
defer span.End()

// parse the path

Expand Down
25 changes: 19 additions & 6 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ipfs/go-unixfs/importer/trickle"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/path"
"go.opentelemetry.io/otel"
)

var log = logging.Logger("coreunix")
Expand Down Expand Up @@ -158,28 +159,31 @@ func (adder *Adder) curRootNode() (ipld.Node, error) {

// Recursively pins the root node of Adder and
// writes the pin state to the backing datastore.
func (adder *Adder) PinRoot(root ipld.Node) error {
func (adder *Adder) PinRoot(ctx context.Context, root ipld.Node) error {
ctx, span := otel.Tracer("").Start(ctx, "Adder-PinRoot")
defer span.End()

if !adder.Pin {
return nil
}

rnk := root.Cid()

err := adder.dagService.Add(adder.ctx, root)
err := adder.dagService.Add(ctx, root)
if err != nil {
return err
}

if adder.tempRoot.Defined() {
err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true)
err := adder.pinning.Unpin(ctx, adder.tempRoot, true)
if err != nil {
return err
}
adder.tempRoot = rnk
}

adder.pinning.PinWithMode(rnk, pin.Recursive)
return adder.pinning.Flush(adder.ctx)
return adder.pinning.Flush(ctx)
}

func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
Expand Down Expand Up @@ -255,6 +259,9 @@ func (adder *Adder) addNode(node ipld.Node, path string) error {

// AddAllAndPin adds the given request's files and pin them.
func (adder *Adder) AddAllAndPin(ctx context.Context, file files.Node) (ipld.Node, error) {
ctx, span := otel.Tracer("").Start(ctx, "Adder-AddAllAndPin")
defer span.End()

if adder.Pin {
adder.unlocker = adder.gcLocker.PinLock(ctx)
}
Expand Down Expand Up @@ -330,10 +337,13 @@ func (adder *Adder) AddAllAndPin(ctx context.Context, file files.Node) (ipld.Nod
if !adder.Pin {
return nd, nil
}
return nd, adder.PinRoot(nd)
return nd, adder.PinRoot(ctx, nd)
}

func (adder *Adder) addFileNode(ctx context.Context, path string, file files.Node, toplevel bool) error {
ctx, span := otel.Tracer("").Start(ctx, "Adder-addFileNode")
defer span.End()

defer file.Close()

err := adder.maybePauseForGC(ctx)
Expand Down Expand Up @@ -436,13 +446,16 @@ func (adder *Adder) addDir(ctx context.Context, path string, dir files.Directory
}

func (adder *Adder) maybePauseForGC(ctx context.Context) error {
ctx, span := otel.Tracer("").Start(ctx, "Adder-maybePauseForGC")
defer span.End()

if adder.unlocker != nil && adder.gcLocker.GCRequested(ctx) {
rn, err := adder.curRootNode()
if err != nil {
return err
}

err = adder.PinRoot(rn)
err = adder.PinRoot(ctx, rn)
if err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ require (
github.com/whyrusleeping/go-sysinfo v0.0.0-20190219211824-4a357d4b90b1
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
go.opencensus.io v0.23.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.27.0
go.opentelemetry.io/otel v1.2.0
go.opentelemetry.io/otel/exporters/jaeger v1.2.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.2.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.2.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.2.0
go.opentelemetry.io/otel/sdk v1.2.0
go.opentelemetry.io/otel/trace v1.2.0
go.uber.org/fx v1.13.1
go.uber.org/zap v1.19.0
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
Expand Down
Loading

0 comments on commit 66b4fe3

Please sign in to comment.