From 928d2cc6ec65b4bd57e9f0648e347e383c2fcf8e Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 28 Oct 2020 10:24:01 +0100 Subject: [PATCH 1/5] tracing: move recordings to new file Release note: None --- pkg/util/tracing/grpc_interceptor.go | 5 + pkg/util/tracing/recording.go | 463 +++++++++++++++++++++++++++ pkg/util/tracing/span.go | 363 --------------------- pkg/util/tracing/tracer.go | 87 ----- 4 files changed, 468 insertions(+), 450 deletions(-) create mode 100644 pkg/util/tracing/recording.go diff --git a/pkg/util/tracing/grpc_interceptor.go b/pkg/util/tracing/grpc_interceptor.go index bb76cc7bba67..31111a448d05 100644 --- a/pkg/util/tracing/grpc_interceptor.go +++ b/pkg/util/tracing/grpc_interceptor.go @@ -17,6 +17,7 @@ import ( "strings" "sync/atomic" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" @@ -425,3 +426,7 @@ func (cs *tracingClientStream) CloseSend() error { } return err } + +// Recording represents a group of RecordedSpans, as returned by GetRecording. +// Spans are sorted by StartTime. +type Recording []tracingpb.RecordedSpan diff --git a/pkg/util/tracing/recording.go b/pkg/util/tracing/recording.go new file mode 100644 index 000000000000..a58672b46833 --- /dev/null +++ b/pkg/util/tracing/recording.go @@ -0,0 +1,463 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracing + +import ( + "encoding/json" + "fmt" + "regexp" + "sort" + "strconv" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/caller" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/errors" + jaegerjson "github.com/jaegertracing/jaeger/model/json" + "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" +) + +// RecordingType is the type of recording that a Span might be performing. +type RecordingType int + +const ( + // NoRecording means that the Span isn't recording. Child spans created from + // it similarly won't be recording by default. + NoRecording RecordingType = iota + // SnowballRecording means that the Span is recording and that derived + // spans will be as well, in the same mode (this includes remote spans, + // i.e. this mode crosses RPC boundaries). Derived spans will maintain + // their own recording, and this recording will be included in that of + // any local parent spans. + SnowballRecording + // SingleNodeRecording means that the Span is recording and that locally + // derived spans will as well (i.e. a remote Span typically won't be + // recording by default, in contrast to SnowballRecording). Similar to + // SnowballRecording, children have their own recording which is also + // included in that of their parents. + SingleNodeRecording +) + +type traceLogData struct { + opentracing.LogRecord + depth int + // timeSincePrev represents the duration since the previous log line (previous in the + // set of log lines that this is part of). This is always computed relative to a log line + // from the same Span, except for start of Span in which case the duration is computed relative + // to the last log in the parent occurring before this start. For example: + // start Span A + // log 1 // duration relative to "start Span A" + // start Span B // duration relative to "log 1" + // log 2 // duration relative to "start Span B" + // log 3 // duration relative to "log 1" + timeSincePrev time.Duration +} + +// String formats the given spans for human consumption, showing the +// relationship using nesting and times as both relative to the previous event +// and cumulative. +// +// Child spans are inserted into the parent at the point of the child's +// StartTime; see the diagram on generateSessionTraceVTable() for the ordering +// of messages. +// +// Each log line show the time since the beginning of the trace +// and since the previous log line. Span starts are shown with special "=== +// " lines. For a Span start, the time since the relative log line +// can be negative when the Span start follows a message from the parent that +// was generated after the child Span started (or even after the child +// finished). +// +// TODO(andrei): this should be unified with +// SessionTracing.generateSessionTraceVTable(). +func (r Recording) String() string { + if len(r) == 0 { + return "" + } + + var buf strings.Builder + start := r[0].StartTime + writeLogs := func(logs []traceLogData) { + for _, entry := range logs { + fmt.Fprintf(&buf, "% 10.3fms % 10.3fms%s", + 1000*entry.Timestamp.Sub(start).Seconds(), + 1000*entry.timeSincePrev.Seconds(), + strings.Repeat(" ", entry.depth+1)) + for i, f := range entry.Fields { + if i != 0 { + buf.WriteByte(' ') + } + fmt.Fprintf(&buf, "%s:%v", f.Key(), f.Value()) + } + buf.WriteByte('\n') + } + } + + logs := r.visitSpan(r[0], 0 /* depth */) + writeLogs(logs) + + // Check if there's any orphan spans (spans for which the parent is missing). + // This shouldn't happen, but we're protecting against incomplete traces. For + // example, ingesting of remote spans through DistSQL is complex. Orphan spans + // would not be reflected in the output string at all without this. + orphans := r.OrphanSpans() + if len(orphans) > 0 { + // This shouldn't happen. + buf.WriteString("orphan spans (trace is missing spans):\n") + for _, o := range orphans { + logs := r.visitSpan(o, 0 /* depth */) + writeLogs(logs) + } + } + return buf.String() +} + +// OrphanSpans returns the spans with parents missing from the recording. +func (r Recording) OrphanSpans() []tracingpb.RecordedSpan { + spanIDs := make(map[uint64]struct{}) + for _, sp := range r { + spanIDs[sp.SpanID] = struct{}{} + } + + var orphans []tracingpb.RecordedSpan + for i, sp := range r { + if i == 0 { + // The first Span can be a root Span. Note that any other root Span will + // be considered an orphan. + continue + } + if _, ok := spanIDs[sp.ParentSpanID]; !ok { + orphans = append(orphans, sp) + } + } + return orphans +} + +// FindLogMessage returns the first log message in the recording that matches +// the given regexp. The bool return value is true if such a message is found. +func (r Recording) FindLogMessage(pattern string) (string, bool) { + re := regexp.MustCompile(pattern) + for _, sp := range r { + for _, l := range sp.Logs { + msg := l.Msg() + if re.MatchString(msg) { + return msg, true + } + } + } + return "", false +} + +// FindSpan returns the Span with the given operation. The bool retval is false +// if the Span is not found. +func (r Recording) FindSpan(operation string) (tracingpb.RecordedSpan, bool) { + for _, sp := range r { + if sp.Operation == operation { + return sp, true + } + } + return tracingpb.RecordedSpan{}, false +} + +// visitSpan returns the log messages for sp, and all of sp's children. +// +// All messages from a Span are kept together. Sibling spans are ordered within +// the parent in their start order. +func (r Recording) visitSpan(sp tracingpb.RecordedSpan, depth int) []traceLogData { + ownLogs := make([]traceLogData, 0, len(sp.Logs)+1) + + conv := func(l opentracing.LogRecord, ref time.Time) traceLogData { + var timeSincePrev time.Duration + if ref != (time.Time{}) { + timeSincePrev = l.Timestamp.Sub(ref) + } + return traceLogData{ + LogRecord: l, + depth: depth, + timeSincePrev: timeSincePrev, + } + } + + // Add a log line representing the start of the Span. + lr := opentracing.LogRecord{ + Timestamp: sp.StartTime, + Fields: []otlog.Field{otlog.String("=== operation", sp.Operation)}, + } + if len(sp.Tags) > 0 { + tags := make([]string, 0, len(sp.Tags)) + for k := range sp.Tags { + tags = append(tags, k) + } + sort.Strings(tags) + for _, k := range tags { + lr.Fields = append(lr.Fields, otlog.String(k, sp.Tags[k])) + } + } + ownLogs = append(ownLogs, conv( + lr, + // ref - this entries timeSincePrev will be computed when we merge it into the parent + time.Time{})) + + for _, l := range sp.Logs { + lr := opentracing.LogRecord{ + Timestamp: l.Time, + Fields: make([]otlog.Field, len(l.Fields)), + } + for i, f := range l.Fields { + lr.Fields[i] = otlog.String(f.Key, f.Value) + } + lastLog := ownLogs[len(ownLogs)-1] + ownLogs = append(ownLogs, conv(lr, lastLog.Timestamp)) + } + + childSpans := make([][]traceLogData, 0) + for _, osp := range r { + if osp.ParentSpanID != sp.SpanID { + continue + } + childSpans = append(childSpans, r.visitSpan(osp, depth+1)) + } + + // Merge ownLogs with childSpans. + mergedLogs := make([]traceLogData, 0, len(ownLogs)) + timeMax := time.Date(2200, 0, 0, 0, 0, 0, 0, time.UTC) + i, j := 0, 0 + var lastTimestamp time.Time + for i < len(ownLogs) || j < len(childSpans) { + if len(mergedLogs) > 0 { + lastTimestamp = mergedLogs[len(mergedLogs)-1].Timestamp + } + nextLog, nextChild := timeMax, timeMax + if i < len(ownLogs) { + nextLog = ownLogs[i].Timestamp + } + if j < len(childSpans) { + nextChild = childSpans[j][0].Timestamp + } + if nextLog.After(nextChild) { + // Fill in timeSincePrev for the first one of the child's entries. + if lastTimestamp != (time.Time{}) { + childSpans[j][0].timeSincePrev = childSpans[j][0].Timestamp.Sub(lastTimestamp) + } + mergedLogs = append(mergedLogs, childSpans[j]...) + lastTimestamp = childSpans[j][0].Timestamp + j++ + } else { + mergedLogs = append(mergedLogs, ownLogs[i]) + lastTimestamp = ownLogs[i].Timestamp + i++ + } + } + + return mergedLogs +} + +// ToJaegerJSON returns the trace as a JSON that can be imported into Jaeger for +// visualization. +// +// The format is described here: https://github.com/jaegertracing/jaeger-ui/issues/381#issuecomment-494150826 +// +// The statement is passed in so it can be included in the trace. +func (r Recording) ToJaegerJSON(stmt string) (string, error) { + if len(r) == 0 { + return "", nil + } + + cpy := make(Recording, len(r)) + copy(cpy, r) + r = cpy + tagsCopy := make(map[string]string) + for k, v := range r[0].Tags { + tagsCopy[k] = v + } + tagsCopy["statement"] = stmt + r[0].Tags = tagsCopy + + toJaegerSpanID := func(spanID uint64) jaegerjson.SpanID { + return jaegerjson.SpanID(strconv.FormatUint(spanID, 10)) + } + + // Each Span in Jaeger belongs to a "process" that generated it. Spans + // belonging to different colors are colored differently in Jaeger. We're + // going to map our different nodes to different processes. + processes := make(map[jaegerjson.ProcessID]jaegerjson.Process) + // getProcessID figures out what "process" a Span belongs to. It looks for an + // "node: " tag. The processes map is populated with an entry for every + // node present in the trace. + getProcessID := func(sp tracingpb.RecordedSpan) jaegerjson.ProcessID { + node := "unknown node" + for k, v := range sp.Tags { + if k == "node" { + node = fmt.Sprintf("node %s", v) + break + } + } + pid := jaegerjson.ProcessID(node) + if _, ok := processes[pid]; !ok { + processes[pid] = jaegerjson.Process{ + ServiceName: node, + Tags: nil, + } + } + return pid + } + + var t jaegerjson.Trace + t.TraceID = jaegerjson.TraceID(strconv.FormatUint(r[0].TraceID, 10)) + t.Processes = processes + + for _, sp := range r { + var s jaegerjson.Span + + s.TraceID = t.TraceID + s.Duration = uint64(sp.Duration.Microseconds()) + s.StartTime = uint64(sp.StartTime.UnixNano() / 1000) + s.SpanID = toJaegerSpanID(sp.SpanID) + s.OperationName = sp.Operation + s.ProcessID = getProcessID(sp) + + if sp.ParentSpanID != 0 { + s.References = []jaegerjson.Reference{{ + RefType: jaegerjson.ChildOf, + TraceID: s.TraceID, + SpanID: toJaegerSpanID(sp.ParentSpanID), + }} + } + + for k, v := range sp.Tags { + s.Tags = append(s.Tags, jaegerjson.KeyValue{ + Key: k, + Value: v, + Type: "STRING", + }) + } + for _, l := range sp.Logs { + jl := jaegerjson.Log{Timestamp: uint64(l.Time.UnixNano() / 1000)} + for _, field := range l.Fields { + jl.Fields = append(jl.Fields, jaegerjson.KeyValue{ + Key: field.Key, + Value: field.Value, + Type: "STRING", + }) + } + s.Logs = append(s.Logs, jl) + } + t.Spans = append(t.Spans, s) + } + + data := TraceCollection{ + Data: []jaegerjson.Trace{t}, + // Add a comment that will show-up at the top of the JSON file, is someone opens the file. + // NOTE: This comment is scarce on newlines because they appear as \n in the + // generated file doing more harm than good. + Comment: fmt.Sprintf(`This is a trace for SQL statement: %s +This trace can be imported into Jaeger for visualization. From the Jaeger Search screen, select JSON File. +Jaeger can be started using docker with: docker run -d --name jaeger -p 16686:16686 jaegertracing/all-in-one:1.17 +The UI can then be accessed at http://localhost:16686/search`, + stmt), + } + json, err := json.MarshalIndent(data, "" /* prefix */, "\t" /* indent */) + if err != nil { + return "", err + } + return string(json), nil +} + +// TraceCollection is the format accepted by the Jaegar upload feature, as per +// https://github.com/jaegertracing/jaeger-ui/issues/381#issuecomment-494150826 +type TraceCollection struct { + // Comment is a dummy field we use to put instructions on how to load the trace. + Comment string `json:"_comment"` + Data []jaegerjson.Trace `json:"data"` +} + +// TestingCheckRecordedSpans checks whether a recording looks like an expected +// one represented by a string with one line per expected Span and one line per +// expected event (i.e. log message). +// +// Use with something like: +// if err := TestingCheckRecordedSpans(tracing.GetRecording(Span), ` +// Span root: +// event: a +// event: c +// Span child: +// event: [ambient] b +// `); err != nil { +// t.Fatal(err) +// } +// +// The event lines can (and generally should) omit the file:line part that they +// might contain (depending on the level at which they were logged). +// +// Note: this test function is in this file because it needs to be used by +// both tests in the tracing package and tests outside of it, and the function +// itself depends on tracing. +func TestingCheckRecordedSpans(recSpans []tracingpb.RecordedSpan, expected string) error { + expected = strings.TrimSpace(expected) + var rows []string + row := func(format string, args ...interface{}) { + rows = append(rows, fmt.Sprintf(format, args...)) + } + + for _, rs := range recSpans { + row("Span %s:", rs.Operation) + if len(rs.Tags) > 0 { + var tags []string + for k, v := range rs.Tags { + tags = append(tags, fmt.Sprintf("%s=%v", k, v)) + } + sort.Strings(tags) + row(" tags: %s", strings.Join(tags, " ")) + } + for _, l := range rs.Logs { + msg := "" + for _, f := range l.Fields { + msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value) + } + row("%s", msg) + } + } + var expRows []string + if expected != "" { + expRows = strings.Split(expected, "\n") + } + match := false + if len(expRows) == len(rows) { + match = true + for i := range expRows { + e := strings.Trim(expRows[i], " \t") + r := strings.Trim(rows[i], " \t") + if e != r && !matchesWithoutFileLine(r, e) { + match = false + break + } + } + } + if !match { + file, line, _ := caller.Lookup(1) + return errors.Errorf( + "%s:%d expected:\n%s\ngot:\n%s", + file, line, expected, strings.Join(rows, "\n")) + } + return nil +} + +// matchesWithoutFileLine tries to match an event by stripping a file:line from +// it. For example: +// "event: util/log/trace_test.go:111 log" will match "event: log". +// +// Returns true if it matches. +func matchesWithoutFileLine(msg string, expected string) bool { + groups := regexp.MustCompile(`^(event: ).*:[0-9]* (.*)$`).FindStringSubmatch(msg) + return len(groups) == 3 && fmt.Sprintf("event: %s", groups[2]) == expected +} diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index 3e9375119264..89e3f734c37f 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -12,12 +12,8 @@ package tracing import ( "bytes" - "encoding/json" "fmt" - "regexp" "sort" - "strconv" - "strings" "sync/atomic" "time" @@ -28,7 +24,6 @@ import ( "github.com/cockroachdb/logtags" proto "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" - jaegerjson "github.com/jaegertracing/jaeger/model/json" opentracing "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "golang.org/x/net/trace" @@ -95,27 +90,6 @@ func (sc *SpanContext) ForeachBaggageItem(handler func(k, v string) bool) { } } -// RecordingType is the type of recording that a Span might be performing. -type RecordingType int - -const ( - // NoRecording means that the Span isn't recording. Child spans created from - // it similarly won't be recording by default. - NoRecording RecordingType = iota - // SnowballRecording means that the Span is recording and that derived - // spans will be as well, in the same mode (this includes remote spans, - // i.e. this mode crosses RPC boundaries). Derived spans will maintain - // their own recording, and this recording will be included in that of - // any local parent spans. - SnowballRecording - // SingleNodeRecording means that the Span is recording and that locally - // derived spans will as well (i.e. a remote Span typically won't be - // recording by default, in contrast to SnowballRecording). Similar to - // SnowballRecording, children have their own recording which is also - // included in that of their parents. - SingleNodeRecording -) - type crdbSpan struct { spanMeta @@ -330,10 +304,6 @@ func IsRecordable(sp *Span) bool { return !sp.isNoop() } -// Recording represents a group of RecordedSpans, as returned by GetRecording. -// Spans are sorted by StartTime. -type Recording []tracingpb.RecordedSpan - // GetRecording retrieves the current recording, if the Span has recording // enabled. This can be called while spans that are part of the recording are // still open; it can run concurrently with operations on those spans. @@ -368,339 +338,6 @@ func (s *crdbSpan) getRecording() Recording { return result } -type traceLogData struct { - opentracing.LogRecord - depth int - // timeSincePrev represents the duration since the previous log line (previous in the - // set of log lines that this is part of). This is always computed relative to a log line - // from the same Span, except for start of Span in which case the duration is computed relative - // to the last log in the parent occurring before this start. For example: - // start Span A - // log 1 // duration relative to "start Span A" - // start Span B // duration relative to "log 1" - // log 2 // duration relative to "start Span B" - // log 3 // duration relative to "log 1" - timeSincePrev time.Duration -} - -// String formats the given spans for human consumption, showing the -// relationship using nesting and times as both relative to the previous event -// and cumulative. -// -// Child spans are inserted into the parent at the point of the child's -// StartTime; see the diagram on generateSessionTraceVTable() for the ordering -// of messages. -// -// Each log line show the time since the beginning of the trace -// and since the previous log line. Span starts are shown with special "=== -// " lines. For a Span start, the time since the relative log line -// can be negative when the Span start follows a message from the parent that -// was generated after the child Span started (or even after the child -// finished). -// -// TODO(andrei): this should be unified with -// SessionTracing.generateSessionTraceVTable(). -func (r Recording) String() string { - if len(r) == 0 { - return "" - } - - var buf strings.Builder - start := r[0].StartTime - writeLogs := func(logs []traceLogData) { - for _, entry := range logs { - fmt.Fprintf(&buf, "% 10.3fms % 10.3fms%s", - 1000*entry.Timestamp.Sub(start).Seconds(), - 1000*entry.timeSincePrev.Seconds(), - strings.Repeat(" ", entry.depth+1)) - for i, f := range entry.Fields { - if i != 0 { - buf.WriteByte(' ') - } - fmt.Fprintf(&buf, "%s:%v", f.Key(), f.Value()) - } - buf.WriteByte('\n') - } - } - - logs := r.visitSpan(r[0], 0 /* depth */) - writeLogs(logs) - - // Check if there's any orphan spans (spans for which the parent is missing). - // This shouldn't happen, but we're protecting against incomplete traces. For - // example, ingesting of remote spans through DistSQL is complex. Orphan spans - // would not be reflected in the output string at all without this. - orphans := r.OrphanSpans() - if len(orphans) > 0 { - // This shouldn't happen. - buf.WriteString("orphan spans (trace is missing spans):\n") - for _, o := range orphans { - logs := r.visitSpan(o, 0 /* depth */) - writeLogs(logs) - } - } - return buf.String() -} - -// OrphanSpans returns the spans with parents missing from the recording. -func (r Recording) OrphanSpans() []tracingpb.RecordedSpan { - spanIDs := make(map[uint64]struct{}) - for _, sp := range r { - spanIDs[sp.SpanID] = struct{}{} - } - - var orphans []tracingpb.RecordedSpan - for i, sp := range r { - if i == 0 { - // The first Span can be a root Span. Note that any other root Span will - // be considered an orphan. - continue - } - if _, ok := spanIDs[sp.ParentSpanID]; !ok { - orphans = append(orphans, sp) - } - } - return orphans -} - -// FindLogMessage returns the first log message in the recording that matches -// the given regexp. The bool return value is true if such a message is found. -func (r Recording) FindLogMessage(pattern string) (string, bool) { - re := regexp.MustCompile(pattern) - for _, sp := range r { - for _, l := range sp.Logs { - msg := l.Msg() - if re.MatchString(msg) { - return msg, true - } - } - } - return "", false -} - -// FindSpan returns the Span with the given operation. The bool retval is false -// if the Span is not found. -func (r Recording) FindSpan(operation string) (tracingpb.RecordedSpan, bool) { - for _, sp := range r { - if sp.Operation == operation { - return sp, true - } - } - return tracingpb.RecordedSpan{}, false -} - -// visitSpan returns the log messages for sp, and all of sp's children. -// -// All messages from a Span are kept together. Sibling spans are ordered within -// the parent in their start order. -func (r Recording) visitSpan(sp tracingpb.RecordedSpan, depth int) []traceLogData { - ownLogs := make([]traceLogData, 0, len(sp.Logs)+1) - - conv := func(l opentracing.LogRecord, ref time.Time) traceLogData { - var timeSincePrev time.Duration - if ref != (time.Time{}) { - timeSincePrev = l.Timestamp.Sub(ref) - } - return traceLogData{ - LogRecord: l, - depth: depth, - timeSincePrev: timeSincePrev, - } - } - - // Add a log line representing the start of the Span. - lr := opentracing.LogRecord{ - Timestamp: sp.StartTime, - Fields: []otlog.Field{otlog.String("=== operation", sp.Operation)}, - } - if len(sp.Tags) > 0 { - tags := make([]string, 0, len(sp.Tags)) - for k := range sp.Tags { - tags = append(tags, k) - } - sort.Strings(tags) - for _, k := range tags { - lr.Fields = append(lr.Fields, otlog.String(k, sp.Tags[k])) - } - } - ownLogs = append(ownLogs, conv( - lr, - // ref - this entries timeSincePrev will be computed when we merge it into the parent - time.Time{})) - - for _, l := range sp.Logs { - lr := opentracing.LogRecord{ - Timestamp: l.Time, - Fields: make([]otlog.Field, len(l.Fields)), - } - for i, f := range l.Fields { - lr.Fields[i] = otlog.String(f.Key, f.Value) - } - lastLog := ownLogs[len(ownLogs)-1] - ownLogs = append(ownLogs, conv(lr, lastLog.Timestamp)) - } - - childSpans := make([][]traceLogData, 0) - for _, osp := range r { - if osp.ParentSpanID != sp.SpanID { - continue - } - childSpans = append(childSpans, r.visitSpan(osp, depth+1)) - } - - // Merge ownLogs with childSpans. - mergedLogs := make([]traceLogData, 0, len(ownLogs)) - timeMax := time.Date(2200, 0, 0, 0, 0, 0, 0, time.UTC) - i, j := 0, 0 - var lastTimestamp time.Time - for i < len(ownLogs) || j < len(childSpans) { - if len(mergedLogs) > 0 { - lastTimestamp = mergedLogs[len(mergedLogs)-1].Timestamp - } - nextLog, nextChild := timeMax, timeMax - if i < len(ownLogs) { - nextLog = ownLogs[i].Timestamp - } - if j < len(childSpans) { - nextChild = childSpans[j][0].Timestamp - } - if nextLog.After(nextChild) { - // Fill in timeSincePrev for the first one of the child's entries. - if lastTimestamp != (time.Time{}) { - childSpans[j][0].timeSincePrev = childSpans[j][0].Timestamp.Sub(lastTimestamp) - } - mergedLogs = append(mergedLogs, childSpans[j]...) - lastTimestamp = childSpans[j][0].Timestamp - j++ - } else { - mergedLogs = append(mergedLogs, ownLogs[i]) - lastTimestamp = ownLogs[i].Timestamp - i++ - } - } - - return mergedLogs -} - -// ToJaegerJSON returns the trace as a JSON that can be imported into Jaeger for -// visualization. -// -// The format is described here: https://github.com/jaegertracing/jaeger-ui/issues/381#issuecomment-494150826 -// -// The statement is passed in so it can be included in the trace. -func (r Recording) ToJaegerJSON(stmt string) (string, error) { - if len(r) == 0 { - return "", nil - } - - cpy := make(Recording, len(r)) - copy(cpy, r) - r = cpy - tagsCopy := make(map[string]string) - for k, v := range r[0].Tags { - tagsCopy[k] = v - } - tagsCopy["statement"] = stmt - r[0].Tags = tagsCopy - - toJaegerSpanID := func(spanID uint64) jaegerjson.SpanID { - return jaegerjson.SpanID(strconv.FormatUint(spanID, 10)) - } - - // Each Span in Jaeger belongs to a "process" that generated it. Spans - // belonging to different colors are colored differently in Jaeger. We're - // going to map our different nodes to different processes. - processes := make(map[jaegerjson.ProcessID]jaegerjson.Process) - // getProcessID figures out what "process" a Span belongs to. It looks for an - // "node: " tag. The processes map is populated with an entry for every - // node present in the trace. - getProcessID := func(sp tracingpb.RecordedSpan) jaegerjson.ProcessID { - node := "unknown node" - for k, v := range sp.Tags { - if k == "node" { - node = fmt.Sprintf("node %s", v) - break - } - } - pid := jaegerjson.ProcessID(node) - if _, ok := processes[pid]; !ok { - processes[pid] = jaegerjson.Process{ - ServiceName: node, - Tags: nil, - } - } - return pid - } - - var t jaegerjson.Trace - t.TraceID = jaegerjson.TraceID(strconv.FormatUint(r[0].TraceID, 10)) - t.Processes = processes - - for _, sp := range r { - var s jaegerjson.Span - - s.TraceID = t.TraceID - s.Duration = uint64(sp.Duration.Microseconds()) - s.StartTime = uint64(sp.StartTime.UnixNano() / 1000) - s.SpanID = toJaegerSpanID(sp.SpanID) - s.OperationName = sp.Operation - s.ProcessID = getProcessID(sp) - - if sp.ParentSpanID != 0 { - s.References = []jaegerjson.Reference{{ - RefType: jaegerjson.ChildOf, - TraceID: s.TraceID, - SpanID: toJaegerSpanID(sp.ParentSpanID), - }} - } - - for k, v := range sp.Tags { - s.Tags = append(s.Tags, jaegerjson.KeyValue{ - Key: k, - Value: v, - Type: "STRING", - }) - } - for _, l := range sp.Logs { - jl := jaegerjson.Log{Timestamp: uint64(l.Time.UnixNano() / 1000)} - for _, field := range l.Fields { - jl.Fields = append(jl.Fields, jaegerjson.KeyValue{ - Key: field.Key, - Value: field.Value, - Type: "STRING", - }) - } - s.Logs = append(s.Logs, jl) - } - t.Spans = append(t.Spans, s) - } - - data := TraceCollection{ - Data: []jaegerjson.Trace{t}, - // Add a comment that will show-up at the top of the JSON file, is someone opens the file. - // NOTE: This comment is scarce on newlines because they appear as \n in the - // generated file doing more harm than good. - Comment: fmt.Sprintf(`This is a trace for SQL statement: %s -This trace can be imported into Jaeger for visualization. From the Jaeger Search screen, select JSON File. -Jaeger can be started using docker with: docker run -d --name jaeger -p 16686:16686 jaegertracing/all-in-one:1.17 -The UI can then be accessed at http://localhost:16686/search`, - stmt), - } - json, err := json.MarshalIndent(data, "" /* prefix */, "\t" /* indent */) - if err != nil { - return "", err - } - return string(json), nil -} - -// TraceCollection is the format accepted by the Jaegar upload feature, as per -// https://github.com/jaegertracing/jaeger-ui/issues/381#issuecomment-494150826 -type TraceCollection struct { - // Comment is a dummy field we use to put instructions on how to load the trace. - Comment string `json:"_comment"` - Data []jaegerjson.Trace `json:"data"` -} - // ImportRemoteSpans adds RecordedSpan data to the recording of the given Span; // these spans will be part of the result of GetRecording. Used to import // recorded traces from other nodes. diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index 384d47014c29..f5a7418f3dd8 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -12,10 +12,7 @@ package tracing import ( "context" - "fmt" "math/rand" - "regexp" - "sort" "strconv" "strings" "sync/atomic" @@ -23,10 +20,7 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/util/caller" "github.com/cockroachdb/cockroach/pkg/util/envutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" - "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" opentracing "github.com/opentracing/opentracing-go" "golang.org/x/net/trace" @@ -689,87 +683,6 @@ func StartSnowballTrace( return ContextWithSpan(ctx, span), span } -// TestingCheckRecordedSpans checks whether a recording looks like an expected -// one represented by a string with one line per expected Span and one line per -// expected event (i.e. log message). -// -// Use with something like: -// if err := TestingCheckRecordedSpans(tracing.GetRecording(Span), ` -// Span root: -// event: a -// event: c -// Span child: -// event: [ambient] b -// `); err != nil { -// t.Fatal(err) -// } -// -// The event lines can (and generally should) omit the file:line part that they -// might contain (depending on the level at which they were logged). -// -// Note: this test function is in this file because it needs to be used by -// both tests in the tracing package and tests outside of it, and the function -// itself depends on tracing. -func TestingCheckRecordedSpans(recSpans []tracingpb.RecordedSpan, expected string) error { - expected = strings.TrimSpace(expected) - var rows []string - row := func(format string, args ...interface{}) { - rows = append(rows, fmt.Sprintf(format, args...)) - } - - for _, rs := range recSpans { - row("Span %s:", rs.Operation) - if len(rs.Tags) > 0 { - var tags []string - for k, v := range rs.Tags { - tags = append(tags, fmt.Sprintf("%s=%v", k, v)) - } - sort.Strings(tags) - row(" tags: %s", strings.Join(tags, " ")) - } - for _, l := range rs.Logs { - msg := "" - for _, f := range l.Fields { - msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value) - } - row("%s", msg) - } - } - var expRows []string - if expected != "" { - expRows = strings.Split(expected, "\n") - } - match := false - if len(expRows) == len(rows) { - match = true - for i := range expRows { - e := strings.Trim(expRows[i], " \t") - r := strings.Trim(rows[i], " \t") - if e != r && !matchesWithoutFileLine(r, e) { - match = false - break - } - } - } - if !match { - file, line, _ := caller.Lookup(1) - return errors.Errorf( - "%s:%d expected:\n%s\ngot:\n%s", - file, line, expected, strings.Join(rows, "\n")) - } - return nil -} - -// matchesWithoutFileLine tries to match an event by stripping a file:line from -// it. For example: -// "event: util/log/trace_test.go:111 log" will match "event: log". -// -// Returns true if it matches. -func matchesWithoutFileLine(msg string, expected string) bool { - groups := regexp.MustCompile(`^(event: ).*:[0-9]* (.*)$`).FindStringSubmatch(msg) - return len(groups) == 3 && fmt.Sprintf("event: %s", groups[2]) == expected -} - // ContextWithRecordingSpan returns a context with an embedded trace Span which // returns its contents when getRecording is called and must be stopped by // calling the cancel method when done with the context (getRecording() needs to From 310492dd28ebfdd32499af649ec8e695598e3c17 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 28 Oct 2020 10:56:04 +0100 Subject: [PATCH 2/5] tracing: use receivers for everything When `Span` and `Tracer` were exposed only via the `opentracing` interfaces, we weren't able to freely add methods to them, which led to a number of methods being added to the tracing package that took an `opentracing.Span` or `opentracing.Tracer` and internally unwrapped the underlying type. Now that we're not using these interfaces any more, we can be more idiomatic and put all methods on their proper receivers. This is done on this commit as a prelude to being more opinionated about our APIs. Release note: None --- pkg/bench/ddl_analysis/ddl_analysis_bench.go | 2 +- .../kvfollowerreadsccl/followerreads_test.go | 2 +- pkg/kv/kvclient/kvcoord/transport.go | 4 +- pkg/kv/kvclient/kvcoord/transport_test.go | 4 +- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 3 +- .../kvcoord/txn_coord_sender_server_test.go | 4 +- pkg/kv/kvserver/replica_proposal.go | 2 +- pkg/kv/kvserver/replica_test.go | 2 +- pkg/kv/txn_test.go | 2 +- pkg/server/node.go | 2 +- pkg/sql/colexec/stats.go | 2 +- pkg/sql/colflow/vectorized_flow.go | 4 +- pkg/sql/conn_executor_exec.go | 2 +- pkg/sql/distsql_running.go | 2 +- pkg/sql/exec_util.go | 16 ++--- pkg/sql/execinfra/base.go | 2 +- pkg/sql/execinfra/processorsbase.go | 2 +- pkg/sql/explain_distsql.go | 6 +- pkg/sql/flowinfra/outbox.go | 4 +- pkg/sql/rowexec/aggregator.go | 5 +- pkg/sql/rowexec/countrows.go | 6 +- pkg/sql/rowexec/distinct.go | 6 +- pkg/sql/rowexec/hashjoiner.go | 5 +- pkg/sql/rowexec/inverted_filterer.go | 5 +- pkg/sql/rowexec/inverted_joiner.go | 5 +- pkg/sql/rowexec/joinreader.go | 4 +- pkg/sql/rowexec/mergejoiner.go | 5 +- pkg/sql/rowexec/ordinality.go | 6 +- pkg/sql/rowexec/sample_aggregator.go | 2 +- pkg/sql/rowexec/sorter.go | 5 +- pkg/sql/rowexec/tablereader.go | 4 +- pkg/sql/rowexec/tablereader_test.go | 6 +- pkg/sql/rowexec/windower.go | 5 +- pkg/sql/rowflow/routers.go | 4 +- pkg/sql/rowflow/routers_test.go | 2 +- pkg/sql/trace_test.go | 2 +- pkg/sql/txn_state.go | 8 +-- pkg/util/log/ambient_context_test.go | 8 +-- pkg/util/log/log.go | 2 +- pkg/util/log/trace.go | 2 +- pkg/util/log/trace_test.go | 12 ++-- pkg/util/tracing/grpc_interceptor.go | 4 +- pkg/util/tracing/recording.go | 2 +- pkg/util/tracing/span.go | 61 +++++++------------ pkg/util/tracing/span_test.go | 16 ++--- pkg/util/tracing/tags_test.go | 12 ++-- pkg/util/tracing/tracer.go | 21 +++---- pkg/util/tracing/tracer_test.go | 46 +++++++------- 48 files changed, 152 insertions(+), 186 deletions(-) diff --git a/pkg/bench/ddl_analysis/ddl_analysis_bench.go b/pkg/bench/ddl_analysis/ddl_analysis_bench.go index 7fecd64d9a3e..60e74b569030 100644 --- a/pkg/bench/ddl_analysis/ddl_analysis_bench.go +++ b/pkg/bench/ddl_analysis/ddl_analysis_bench.go @@ -47,7 +47,7 @@ func RunRoundTripBenchmark(b *testing.B, tests []RoundTripBenchTestCase) { beforePlan := func(sp *tracing.Span, stmt string) { if _, ok := stmtToKvBatchRequests.Load(stmt); ok { sp.Finish() - trace := tracing.GetRecording(sp) + trace := sp.GetRecording() count := countKvBatchRequestsInRecording(trace) stmtToKvBatchRequests.Store(stmt, count) } diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index a020beb955ea..602c1dc8ee58 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -229,7 +229,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { SQLExecutor: &sql.ExecutorTestingKnobs{ WithStatementTrace: func(sp *tracing.Span, stmt string) { if stmt == historicalQuery { - recCh <- tracing.GetRecording(sp) + recCh <- sp.GetRecording() } }, }, diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index 158aa6e54142..9deb3852d4af 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -169,7 +169,7 @@ func (gt *grpcTransport) sendBatch( return nil, errors.Errorf( "trying to ingest remote spans but there is no recording span set up") } - if err := tracing.ImportRemoteSpans(span, reply.CollectedSpans); err != nil { + if err := span.ImportRemoteSpans(reply.CollectedSpans); err != nil { return nil, errors.Wrap(err, "error ingesting remote spans") } } @@ -305,7 +305,7 @@ func (s *senderTransport) SendNext( if span == nil { panic("trying to ingest remote spans but there is no recording span set up") } - if err := tracing.ImportRemoteSpans(span, br.CollectedSpans); err != nil { + if err := span.ImportRemoteSpans(br.CollectedSpans); err != nil { panic(err) } } diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 6ce953ca65c4..b7d5c3b01252 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -146,13 +146,13 @@ func (m *mockInternalClient) Batch( ) (*roachpb.BatchResponse, error) { sp := m.tr.StartRootSpan("mock", nil /* logTags */, tracing.RecordableSpan) defer sp.Finish() - tracing.StartRecording(sp, tracing.SnowballRecording) + sp.StartRecording(tracing.SnowballRecording) ctx = tracing.ContextWithSpan(ctx, sp) log.Eventf(ctx, "mockInternalClient processing batch") br := &roachpb.BatchResponse{} br.Error = m.pErr - if rec := tracing.GetRecording(sp); rec != nil { + if rec := sp.GetRecording(); rec != nil { br.CollectedSpans = append(br.CollectedSpans, rec...) } return br, nil diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index ec65b7f4d70b..9d3cd6b920c8 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" @@ -472,7 +471,7 @@ func (tc *TxnCoordSender) Send( if tc.mu.txn.ID == (uuid.UUID{}) { log.Fatalf(ctx, "cannot send transactional request through unbound TxnCoordSender") } - if !tracing.IsBlackHoleSpan(sp) { + if !sp.IsBlackHole() { sp.SetBaggageItem("txnID", tc.mu.txn.ID.String()) } ctx = logtags.AddTag(ctx, "txn", uuid.ShortStringer(tc.mu.txn.ID)) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go index 2f6505a66150..e9d538cf5460 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go @@ -152,7 +152,7 @@ func TestNoDuplicateHeartbeatLoops(t *testing.T) { tracer := tracing.NewTracer() sp := tracer.StartSpan("test", tracing.Recordable) - tracing.StartRecording(sp, tracing.SingleNodeRecording) + sp.StartRecording(tracing.SingleNodeRecording) txnCtx := tracing.ContextWithSpan(context.Background(), sp) push := func(ctx context.Context, key roachpb.Key) error { @@ -179,7 +179,7 @@ func TestNoDuplicateHeartbeatLoops(t *testing.T) { t.Fatalf("expected 2 attempts, got: %d", attempts) } sp.Finish() - recording := tracing.GetRecording(sp) + recording := sp.GetRecording() var foundHeartbeatLoop bool for _, sp := range recording { if strings.Contains(sp.Operation, "heartbeat loop") { diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 714343449c2b..2cdb56fcb6f6 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -894,7 +894,7 @@ func (r *Replica) getTraceData(ctx context.Context) opentracing.TextMapCarrier { if sp == nil { return nil } - if tracing.IsBlackHoleSpan(sp) { + if sp.IsBlackHole() { return nil } traceData := opentracing.TextMapCarrier{} diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 214cc72d136f..b1be7047146f 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -12560,7 +12560,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) { tracedCtx := tracing.ContextWithSpan(ctx, sp) // Go out of our way to enable recording so that expensive logging is enabled // for this context. - tracing.StartRecording(sp, tracing.SingleNodeRecording) + sp.StartRecording(tracing.SingleNodeRecording) ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease) if pErr != nil { t.Fatal(pErr) diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index 465d1029725c..2b574fb35f98 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -56,7 +56,7 @@ func TestTxnSnowballTrace(t *testing.T) { } log.Event(ctx, "txn complete") sp.Finish() - collectedSpans := tracing.GetRecording(sp) + collectedSpans := sp.GetRecording() dump := collectedSpans.String() // dump: // 0.105ms 0.000ms event:inside txn diff --git a/pkg/server/node.go b/pkg/server/node.go index 30bc0407122e..41a6bfa90c5b 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1000,7 +1000,7 @@ func (n *Node) setupSpanForIncomingRPC( // spans in the BatchResponse at the end of the request. // We don't want to do this if the operation is on the same host, in which // case everything is already part of the same recording. - if rec := tracing.GetRecording(grpcSpan); rec != nil { + if rec := grpcSpan.GetRecording(); rec != nil { br.CollectedSpans = append(br.CollectedSpans, rec...) } } diff --git a/pkg/sql/colexec/stats.go b/pkg/sql/colexec/stats.go index 64f4cfaff78d..d2d50af719b1 100644 --- a/pkg/sql/colexec/stats.go +++ b/pkg/sql/colexec/stats.go @@ -164,6 +164,6 @@ func (vsc *VectorizedStatsCollector) OutputStats( vsc.NumBatches = 0 vsc.BytesRead = 0 } - tracing.SetSpanStats(span, &vsc.VectorizedStats) + span.SetSpanStats(&vsc.VectorizedStats) span.Finish() } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index bc2ffcbb56f3..242426894a0a 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -176,7 +176,7 @@ func (f *vectorizedFlow) Setup( } log.VEventf(ctx, 1, "setting up vectorize flow %s", f.ID.Short()) recordingStats := false - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { recordingStats = true } helper := &vectorizedFlowCreatorHelper{f: f.FlowBase} @@ -885,7 +885,7 @@ func (s *vectorizedFlowCreator) setupOutput( finishVectorizedStatsCollectors( ctx, flowCtx.ID, flowCtx.Cfg.TestingKnobs.DeterministicStats, vscs, ) - return []execinfrapb.ProducerMetadata{{TraceData: tracing.GetRecording(span)}} + return []execinfrapb.ProducerMetadata{{TraceData: span.GetRecording()}} }, }, ) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 4b332459d35d..8046a257df66 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -363,7 +363,7 @@ func (ex *connExecutor) execStmtInOpenState( // Record the statement information that we've collected. // Note that in case of implicit transactions, the trace contains the auto-commit too. sp.Finish() - trace := tracing.GetRecording(sp) + trace := sp.GetRecording() ie := p.extendedEvalCtx.InternalExecutor.(*InternalExecutor) placeholders := p.extendedEvalCtx.Placeholders if finishCollectionDiagnostics != nil { diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index d7d9bb804678..0b407557f76f 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -685,7 +685,7 @@ func (r *DistSQLReceiver) Push( if span == nil { r.resultWriter.SetError( errors.New("trying to ingest remote spans but there is no recording span set up")) - } else if err := tracing.ImportRemoteSpans(span, meta.TraceData); err != nil { + } else if err := span.ImportRemoteSpans(meta.TraceData); err != nil { r.resultWriter.SetError(errors.Errorf("error ingesting remote spans: %s", err)) } } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index bd3e15cbba92..52aeb0c78ce9 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1440,9 +1440,9 @@ func (st *SessionTracing) getSessionTrace() ([]traceRow, error) { func (st *SessionTracing) getRecording() []tracingpb.RecordedSpan { var spans []tracingpb.RecordedSpan if st.firstTxnSpan != nil { - spans = append(spans, tracing.GetRecording(st.firstTxnSpan)...) + spans = append(spans, st.firstTxnSpan.GetRecording()...) } - return append(spans, tracing.GetRecording(st.connSpan)...) + return append(spans, st.connSpan.GetRecording()...) } // StartTracing starts "session tracing". From this moment on, everything @@ -1503,7 +1503,7 @@ func (st *SessionTracing) StartTracing( if sp == nil { return errors.Errorf("no txn span for SessionTracing") } - tracing.StartRecording(sp, recType) + sp.StartRecording(recType) st.firstTxnSpan = sp } @@ -1535,7 +1535,7 @@ func (st *SessionTracing) StartTracing( tracing.LogTagsFromCtx(connCtx), ) } - tracing.StartRecording(sp, recType) + sp.StartRecording(recType) st.connSpan = sp // Hijack the connections context. @@ -1560,15 +1560,15 @@ func (st *SessionTracing) StopTracing() error { var spans []tracingpb.RecordedSpan if st.firstTxnSpan != nil { - spans = append(spans, tracing.GetRecording(st.firstTxnSpan)...) - tracing.StopRecording(st.firstTxnSpan) + spans = append(spans, st.firstTxnSpan.GetRecording()...) + st.firstTxnSpan.StopRecording() } st.connSpan.Finish() - spans = append(spans, tracing.GetRecording(st.connSpan)...) + spans = append(spans, st.connSpan.GetRecording()...) // NOTE: We're stopping recording on the connection's ctx only; the stopping // is not inherited by children. If we are inside of a txn, that span will // continue recording, even though nobody will collect its recording again. - tracing.StopRecording(st.connSpan) + st.connSpan.StopRecording() st.ex.ctxHolder.unhijack() var err error diff --git a/pkg/sql/execinfra/base.go b/pkg/sql/execinfra/base.go index b871724f85d0..ee12bec0f990 100644 --- a/pkg/sql/execinfra/base.go +++ b/pkg/sql/execinfra/base.go @@ -236,7 +236,7 @@ func DrainAndForwardMetadata(ctx context.Context, src RowSource, dst RowReceiver // GetTraceData returns the trace data. func GetTraceData(ctx context.Context) []tracingpb.RecordedSpan { if sp := tracing.SpanFromContext(ctx); sp != nil { - return tracing.GetRecording(sp) + return sp.GetRecording() } return nil } diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 2786be690ebe..536bf2029a8e 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -857,7 +857,7 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing. func (pb *ProcessorBase) StartInternal(ctx context.Context, name string) context.Context { pb.origCtx = ctx pb.Ctx, pb.span = ProcessorSpan(ctx, name) - if pb.span != nil && tracing.IsRecording(pb.span) { + if pb.span != nil && pb.span.IsRecording() { pb.span.SetTag(execinfrapb.FlowIDTagKey, pb.FlowCtx.ID.String()) pb.span.SetTag(execinfrapb.ProcessorIDTagKey, pb.processorID) } diff --git a/pkg/sql/explain_distsql.go b/pkg/sql/explain_distsql.go index 6bbe636a5abb..5b76ca9a88d7 100644 --- a/pkg/sql/explain_distsql.go +++ b/pkg/sql/explain_distsql.go @@ -165,7 +165,7 @@ func (n *explainDistSQLNode) startExec(params runParams) error { // don't get a noopSpan. var sp *tracing.Span if parentSp := tracing.SpanFromContext(params.ctx); parentSp != nil && - !tracing.IsRecording(parentSp) { + !parentSp.IsRecording() { tracer := parentSp.Tracer() sp = tracer.StartSpan( "explain-distsql", tracing.Recordable, @@ -177,7 +177,7 @@ func (n *explainDistSQLNode) startExec(params runParams) error { "explain-distsql", tracing.Recordable, tracing.LogTagsFromCtx(params.ctx)) } - tracing.StartRecording(sp, tracing.SnowballRecording) + sp.StartRecording(tracing.SnowballRecording) ctx := tracing.ContextWithSpan(params.ctx, sp) planCtx.ctx = ctx // Make a copy of the evalContext with the recording span in it; we can't @@ -218,7 +218,7 @@ func (n *explainDistSQLNode) startExec(params runParams) error { n.run.executedStatement = true sp.Finish() - spans := tracing.GetRecording(sp) + spans := sp.GetRecording() if err := rw.Err(); err != nil { return err diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index 7138719bf22f..4a896a265b84 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -202,7 +202,7 @@ func (m *Outbox) mainLoop(ctx context.Context) error { var span *tracing.Span ctx, span = execinfra.ProcessorSpan(ctx, "outbox") - if span != nil && tracing.IsRecording(span) { + if span != nil && span.IsRecording() { m.statsCollectionEnabled = true span.SetTag(execinfrapb.FlowIDTagKey, m.flowCtx.ID.String()) span.SetTag(execinfrapb.StreamIDTagKey, m.streamID) @@ -285,7 +285,7 @@ func (m *Outbox) mainLoop(ctx context.Context) error { if m.flowCtx.Cfg.TestingKnobs.DeterministicStats { m.stats.BytesSent = 0 } - tracing.SetSpanStats(span, &m.stats) + span.SetSpanStats(&m.stats) tracing.FinishSpan(span) spanFinished = true if trace := execinfra.GetTraceData(ctx); trace != nil { diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index 4512745f0333..b02ee41ef843 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -95,7 +95,7 @@ func (ag *aggregatorBase) init( ) error { ctx := flowCtx.EvalCtx.Ctx() memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "aggregator-mem") - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { input = newInputStatCollector(input) ag.FinishTrace = ag.outputStatsToTrace } @@ -182,8 +182,7 @@ func (ag *aggregatorBase) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(ag.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &AggregatorStats{ InputStats: is, MaxAllocatedMem: ag.MemMonitor.MaximumBytes(), diff --git a/pkg/sql/rowexec/countrows.go b/pkg/sql/rowexec/countrows.go index 3f1eb1b43dc9..18f7ba2896f7 100644 --- a/pkg/sql/rowexec/countrows.go +++ b/pkg/sql/rowexec/countrows.go @@ -47,7 +47,7 @@ func newCountAggregator( ag := &countAggregator{} ag.input = input - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { ag.input = newInputStatCollector(input) ag.FinishTrace = ag.outputStatsToTrace } @@ -115,8 +115,8 @@ func (ag *countAggregator) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(ag.Ctx); sp != nil { - tracing.SetSpanStats( - sp, &AggregatorStats{InputStats: is}, + sp.SetSpanStats( + &AggregatorStats{InputStats: is}, ) } } diff --git a/pkg/sql/rowexec/distinct.go b/pkg/sql/rowexec/distinct.go index 9402466ff7fc..32c7c9568ac7 100644 --- a/pkg/sql/rowexec/distinct.go +++ b/pkg/sql/rowexec/distinct.go @@ -134,7 +134,7 @@ func newDistinct( // So we have to set up the account here. d.arena = stringarena.Make(&d.memAcc) - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { d.input = newInputStatCollector(d.input) d.FinishTrace = d.outputStatsToTrace } @@ -376,8 +376,8 @@ func (d *distinct) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(d.Ctx); sp != nil { - tracing.SetSpanStats( - sp, &DistinctStats{InputStats: is, MaxAllocatedMem: d.MemMonitor.MaximumBytes()}, + sp.SetSpanStats( + &DistinctStats{InputStats: is, MaxAllocatedMem: d.MemMonitor.MaximumBytes()}, ) } } diff --git a/pkg/sql/rowexec/hashjoiner.go b/pkg/sql/rowexec/hashjoiner.go index 823e98585efd..a79e7894f93c 100644 --- a/pkg/sql/rowexec/hashjoiner.go +++ b/pkg/sql/rowexec/hashjoiner.go @@ -191,7 +191,7 @@ func newHashJoiner( } // If the trace is recording, instrument the hashJoiner to collect stats. - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { h.leftSource = newInputStatCollector(h.leftSource) h.rightSource = newInputStatCollector(h.rightSource) h.FinishTrace = h.outputStatsToTrace @@ -804,8 +804,7 @@ func (h *hashJoiner) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(h.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &HashJoinerStats{ LeftInputStats: lis, RightInputStats: ris, diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index 18fd51d58011..3d5037b1332e 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -125,7 +125,7 @@ func newInvertedFilterer( ifr.diskMonitor, ) - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { ifr.input = newInputStatCollector(ifr.input) ifr.FinishTrace = ifr.outputStatsToTrace } @@ -321,8 +321,7 @@ func (ifr *invertedFilterer) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(ifr.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &InvertedFiltererStats{ InputStats: is, MaxAllocatedMem: ifr.MemMonitor.MaximumBytes(), diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index f8f15e869354..49f94d8984d9 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -285,7 +285,7 @@ func newInvertedJoiner( } collectingStats := false - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { collectingStats = true } if collectingStats { @@ -738,8 +738,7 @@ func (ij *invertedJoiner) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(ij.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &InvertedJoinerStats{ InputStats: is, IndexScanStats: fis, diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index ad55a480e784..7d738fd7ee44 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -253,7 +253,7 @@ func newJoinReader( } collectingStats := false - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { collectingStats = true } @@ -690,7 +690,7 @@ func (jr *joinReader) outputStatsToTrace() { IndexLookupStats: ils, } if sp := tracing.SpanFromContext(jr.Ctx); sp != nil { - tracing.SetSpanStats(sp, jrs) + sp.SetSpanStats(jrs) } } diff --git a/pkg/sql/rowexec/mergejoiner.go b/pkg/sql/rowexec/mergejoiner.go index a7f0e9284c9b..57bf911fcb55 100644 --- a/pkg/sql/rowexec/mergejoiner.go +++ b/pkg/sql/rowexec/mergejoiner.go @@ -74,7 +74,7 @@ func newMergeJoiner( rightSource: rightSource, } - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { m.leftSource = newInputStatCollector(m.leftSource) m.rightSource = newInputStatCollector(m.rightSource) m.FinishTrace = m.outputStatsToTrace @@ -295,8 +295,7 @@ func (m *mergeJoiner) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(m.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &MergeJoinerStats{ LeftInputStats: lis, RightInputStats: ris, diff --git a/pkg/sql/rowexec/ordinality.go b/pkg/sql/rowexec/ordinality.go index 1eee1af48045..5d7075c7085b 100644 --- a/pkg/sql/rowexec/ordinality.go +++ b/pkg/sql/rowexec/ordinality.go @@ -67,7 +67,7 @@ func newOrdinalityProcessor( return nil, err } - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { o.input = newInputStatCollector(o.input) o.FinishTrace = o.outputStatsToTrace } @@ -134,8 +134,6 @@ func (o *ordinalityProcessor) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(o.Ctx); sp != nil { - tracing.SetSpanStats( - sp, &OrdinalityStats{InputStats: is}, - ) + sp.SetSpanStats(&OrdinalityStats{InputStats: is}) } } diff --git a/pkg/sql/rowexec/sample_aggregator.go b/pkg/sql/rowexec/sample_aggregator.go index 6359c7e7d746..ef19f11b4e0b 100644 --- a/pkg/sql/rowexec/sample_aggregator.go +++ b/pkg/sql/rowexec/sample_aggregator.go @@ -382,7 +382,7 @@ func (s *sampleAggregator) sampleRow( func (s *sampleAggregator) writeResults(ctx context.Context) error { // Turn off tracing so these writes don't affect the results of EXPLAIN // ANALYZE. - if span := tracing.SpanFromContext(ctx); span != nil && tracing.IsRecording(span) { + if span := tracing.SpanFromContext(ctx); span != nil && span.IsRecording() { // TODO(rytaft): this also hides writes in this function from SQL session // traces. ctx = tracing.ContextWithSpan(ctx, nil) diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index 9d37ecde4c0e..1f98122b9815 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -54,7 +54,7 @@ func (s *sorterBase) init( opts execinfra.ProcStateOpts, ) error { ctx := flowCtx.EvalCtx.Ctx() - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { input = newInputStatCollector(input) s.FinishTrace = s.outputStatsToTrace } @@ -163,8 +163,7 @@ func (s *sorterBase) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(s.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &SorterStats{ InputStats: is, MaxAllocatedMem: s.MemMonitor.MaximumBytes(), diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index 176f3127651a..01f95627a076 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -159,7 +159,7 @@ func newTableReader( tr.spans[i] = s.Span } - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { tr.fetcher = newRowFetcherStatCollector(&fetcher) tr.FinishTrace = tr.outputStatsToTrace } else { @@ -299,7 +299,7 @@ func (tr *tableReader) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(tr.Ctx); sp != nil { - tracing.SetSpanStats(sp, &TableReaderStats{ + sp.SetSpanStats(&TableReaderStats{ InputStats: is, BytesRead: tr.GetBytesRead(), }) diff --git a/pkg/sql/rowexec/tablereader_test.go b/pkg/sql/rowexec/tablereader_test.go index eea4c11714f5..19c10ef31859 100644 --- a/pkg/sql/rowexec/tablereader_test.go +++ b/pkg/sql/rowexec/tablereader_test.go @@ -393,7 +393,7 @@ func TestLimitScans(t *testing.T) { // Now we're going to run the tableReader and trace it. tracer := tracing.NewTracer() sp := tracer.StartSpan("root", tracing.Recordable) - tracing.StartRecording(sp, tracing.SnowballRecording) + sp.StartRecording(tracing.SnowballRecording) ctx = tracing.ContextWithSpan(ctx, sp) flowCtx.EvalCtx.Context = ctx @@ -412,7 +412,7 @@ func TestLimitScans(t *testing.T) { // Simulate what the DistSQLReceiver does and ingest the trace. if meta != nil && len(meta.TraceData) > 0 { - if err := tracing.ImportRemoteSpans(sp, meta.TraceData); err != nil { + if err := sp.ImportRemoteSpans(meta.TraceData); err != nil { t.Fatal(err) } } @@ -433,7 +433,7 @@ func TestLimitScans(t *testing.T) { // scans from the same key as the DistSender retries scans when it detects // splits. re := regexp.MustCompile(fmt.Sprintf(`querying next range at /Table/%d/1(\S.*)?`, tableDesc.ID)) - spans := tracing.GetRecording(sp) + spans := sp.GetRecording() ranges := make(map[string]struct{}) for _, span := range spans { if span.Operation == tableReaderProcName { diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index f0d230976dc6..5b35efd2aeea 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -206,7 +206,7 @@ func newWindower( // them to reuse the same shared memory account with the windower. evalCtx.SingleDatumAggMemAccount = &w.acc - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { w.input = newInputStatCollector(w.input) w.FinishTrace = w.outputStatsToTrace } @@ -878,8 +878,7 @@ func (w *windower) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(w.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &WindowerStats{ InputStats: is, MaxAllocatedMem: w.MemMonitor.MaximumBytes(), diff --git a/pkg/sql/rowflow/routers.go b/pkg/sql/rowflow/routers.go index 068a45315ab4..e2910a90a577 100644 --- a/pkg/sql/rowflow/routers.go +++ b/pkg/sql/rowflow/routers.go @@ -260,7 +260,7 @@ func (rb *routerBase) setupStreams( // init must be called after setupStreams but before Start. func (rb *routerBase) init(ctx context.Context, flowCtx *execinfra.FlowCtx, types []*types.T) { // Check if we're recording stats. - if s := tracing.SpanFromContext(ctx); s != nil && tracing.IsRecording(s) { + if s := tracing.SpanFromContext(ctx); s != nil && s.IsRecording() { rb.statsCollectionEnabled = true } @@ -371,7 +371,7 @@ func (rb *routerBase) Start(ctx context.Context, wg *sync.WaitGroup, ctxCancel c if rb.statsCollectionEnabled { ro.stats.MaxAllocatedMem = ro.memoryMonitor.MaximumBytes() ro.stats.MaxAllocatedDisk = ro.diskMonitor.MaximumBytes() - tracing.SetSpanStats(span, &ro.stats) + span.SetSpanStats(&ro.stats) tracing.FinishSpan(span) if trace := execinfra.GetTraceData(ctx); trace != nil { ro.mu.Unlock() diff --git a/pkg/sql/rowflow/routers_test.go b/pkg/sql/rowflow/routers_test.go index 97ca352cd5a2..00c1c53e019e 100644 --- a/pkg/sql/rowflow/routers_test.go +++ b/pkg/sql/rowflow/routers_test.go @@ -753,7 +753,7 @@ func TestRouterDiskSpill(t *testing.T) { // Enable stats recording. tracer := tracing.NewTracer() sp := tracer.StartSpan("root", tracing.Recordable) - tracing.StartRecording(sp, tracing.SnowballRecording) + sp.StartRecording(tracing.SnowballRecording) ctx := tracing.ContextWithSpan(context.Background(), sp) st := cluster.MakeTestingClusterSettings() diff --git a/pkg/sql/trace_test.go b/pkg/sql/trace_test.go index c1f43a14387c..648671afd2f8 100644 --- a/pkg/sql/trace_test.go +++ b/pkg/sql/trace_test.go @@ -589,7 +589,7 @@ func TestTraceDistSQL(t *testing.T) { SQLExecutor: &sql.ExecutorTestingKnobs{ WithStatementTrace: func(sp *tracing.Span, stmt string) { if stmt == countStmt { - recCh <- tracing.GetRecording(sp) + recCh <- sp.GetRecording() } }, }, diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index be8040fa0ac9..ffcb8c42d2ee 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -189,7 +189,7 @@ func (ts *txnState) resetForNewSQLTxn( alreadyRecording := tranCtx.sessionTracing.Enabled() duration := traceTxnThreshold.Get(&tranCtx.settings.SV) if !alreadyRecording && (duration > 0) { - tracing.StartRecording(sp, tracing.SnowballRecording) + sp.StartRecording(tracing.SnowballRecording) ts.recordingThreshold = duration ts.recordingStart = timeutil.Now() } @@ -197,10 +197,6 @@ func (ts *txnState) resetForNewSQLTxn( // Put the new span in the context. txnCtx := tracing.ContextWithSpan(connCtx, sp) - if !tracing.IsRecordable(sp) { - log.Fatalf(connCtx, "non-recordable transaction span of type: %T", sp) - } - ts.sp = sp ts.Ctx, ts.cancel = contextutil.WithCancel(txnCtx) @@ -243,7 +239,7 @@ func (ts *txnState) finishSQLTxn() { } if ts.recordingThreshold > 0 { - if r := tracing.GetRecording(ts.sp); r != nil { + if r := ts.sp.GetRecording(); r != nil { if elapsed := timeutil.Since(ts.recordingStart); elapsed >= ts.recordingThreshold { dump := r.String() if len(dump) > 0 { diff --git a/pkg/util/log/ambient_context_test.go b/pkg/util/log/ambient_context_test.go index 25c397fdaa4f..42f62466ef68 100644 --- a/pkg/util/log/ambient_context_test.go +++ b/pkg/util/log/ambient_context_test.go @@ -48,7 +48,7 @@ func TestAnnotateCtxSpan(t *testing.T) { // Annotate a context that has an open span. sp1 := tracer.StartSpan("root") - tracing.StartRecording(sp1, tracing.SingleNodeRecording) + sp1.StartRecording(tracing.SingleNodeRecording) ctx1 := tracing.ContextWithSpan(context.Background(), sp1) Event(ctx1, "a") @@ -59,7 +59,7 @@ func TestAnnotateCtxSpan(t *testing.T) { sp2.Finish() sp1.Finish() - if err := tracing.TestingCheckRecordedSpans(tracing.GetRecording(sp1), ` + if err := tracing.TestingCheckRecordedSpans(sp1.GetRecording(), ` Span root: event: a event: c @@ -74,10 +74,10 @@ func TestAnnotateCtxSpan(t *testing.T) { ac.Tracer = tracer ctx, sp := ac.AnnotateCtxWithSpan(context.Background(), "s") - tracing.StartRecording(sp, tracing.SingleNodeRecording) + sp.StartRecording(tracing.SingleNodeRecording) Event(ctx, "a") sp.Finish() - if err := tracing.TestingCheckRecordedSpans(tracing.GetRecording(sp), ` + if err := tracing.TestingCheckRecordedSpans(sp.GetRecording(), ` Span s: tags: ambient= event: [ambient] a diff --git a/pkg/util/log/log.go b/pkg/util/log/log.go index f7e921bcb0d1..831c01979c50 100644 --- a/pkg/util/log/log.go +++ b/pkg/util/log/log.go @@ -215,7 +215,7 @@ func V(level Level) bool { // func ExpensiveLogEnabled(ctx context.Context, level Level) bool { if sp := tracing.SpanFromContext(ctx); sp != nil { - if tracing.IsRecording(sp) { + if sp.IsRecording() { return true } } diff --git a/pkg/util/log/trace.go b/pkg/util/log/trace.go index d0c67f9fe93a..2ca2f70bf407 100644 --- a/pkg/util/log/trace.go +++ b/pkg/util/log/trace.go @@ -91,7 +91,7 @@ func FinishEventLog(ctx context.Context) { // false. func getSpanOrEventLog(ctx context.Context) (*tracing.Span, *ctxEventLog, bool) { if sp := tracing.SpanFromContext(ctx); sp != nil { - if tracing.IsBlackHoleSpan(sp) { + if sp.IsBlackHole() { return nil, nil, false } return sp, nil, true diff --git a/pkg/util/log/trace_test.go b/pkg/util/log/trace_test.go index ee7ddd6c1e21..7c804d56a1aa 100644 --- a/pkg/util/log/trace_test.go +++ b/pkg/util/log/trace_test.go @@ -67,7 +67,7 @@ func TestTrace(t *testing.T) { tracer := tracing.NewTracer() tracer.SetForceRealSpans(true) sp := tracer.StartSpan("s") - tracing.StartRecording(sp, tracing.SingleNodeRecording) + sp.StartRecording(tracing.SingleNodeRecording) ctxWithSpan := tracing.ContextWithSpan(ctx, sp) Event(ctxWithSpan, "test1") VEvent(ctxWithSpan, noLogV(), "test2") @@ -79,7 +79,7 @@ func TestTrace(t *testing.T) { sp.Finish() - if err := tracing.TestingCheckRecordedSpans(tracing.GetRecording(sp), ` + if err := tracing.TestingCheckRecordedSpans(sp.GetRecording(), ` Span s: event: test1 event: test2 @@ -98,7 +98,7 @@ func TestTraceWithTags(t *testing.T) { tracer.SetForceRealSpans(true) sp := tracer.StartSpan("s") ctxWithSpan := tracing.ContextWithSpan(ctx, sp) - tracing.StartRecording(sp, tracing.SingleNodeRecording) + sp.StartRecording(tracing.SingleNodeRecording) Event(ctxWithSpan, "test1") VEvent(ctxWithSpan, noLogV(), "test2") @@ -106,7 +106,7 @@ func TestTraceWithTags(t *testing.T) { Info(ctxWithSpan, "log") sp.Finish() - if err := tracing.TestingCheckRecordedSpans(tracing.GetRecording(sp), ` + if err := tracing.TestingCheckRecordedSpans(sp.GetRecording(), ` Span s: event: [tag=1] test1 event: [tag=1] test2 @@ -184,7 +184,7 @@ func TestEventLogAndTrace(t *testing.T) { tracer := tracing.NewTracer() tracer.SetForceRealSpans(true) sp := tracer.StartSpan("s") - tracing.StartRecording(sp, tracing.SingleNodeRecording) + sp.StartRecording(tracing.SingleNodeRecording) ctxWithBoth := tracing.ContextWithSpan(ctxWithEventLog, sp) // Events should only go to the trace. Event(ctxWithBoth, "test3") @@ -197,7 +197,7 @@ func TestEventLogAndTrace(t *testing.T) { sp.Finish() el.Finish() - if err := tracing.TestingCheckRecordedSpans(tracing.GetRecording(sp), ` + if err := tracing.TestingCheckRecordedSpans(sp.GetRecording(), ` Span s: event: test3 event: test4 diff --git a/pkg/util/tracing/grpc_interceptor.go b/pkg/util/tracing/grpc_interceptor.go index 31111a448d05..7edb1add5972 100644 --- a/pkg/util/tracing/grpc_interceptor.go +++ b/pkg/util/tracing/grpc_interceptor.go @@ -71,7 +71,7 @@ func extractSpanContext(ctx context.Context, tracer *Tracer) (*SpanContext, erro // create a span. func spanInclusionFuncForServer(t *Tracer, parentSpanCtx *SpanContext) bool { // Is client tracing? - return (parentSpanCtx != nil && !IsNoopContext(parentSpanCtx)) || + return (parentSpanCtx != nil && !parentSpanCtx.IsNoop()) || // Should we trace regardless of the client? This is useful for calls coming // through the HTTP->RPC gateway (i.e. the AdminUI), where client is never // tracing. @@ -207,7 +207,7 @@ func (ss *tracingServerStream) Context() context.Context { // // See #17177. func spanInclusionFuncForClient(parentSpanCtx *SpanContext) bool { - return parentSpanCtx != nil && !IsNoopContext(parentSpanCtx) + return parentSpanCtx != nil && !parentSpanCtx.IsNoop() } func injectSpanContext(ctx context.Context, tracer *Tracer, clientSpan *Span) context.Context { diff --git a/pkg/util/tracing/recording.go b/pkg/util/tracing/recording.go index a58672b46833..9942959c2fbc 100644 --- a/pkg/util/tracing/recording.go +++ b/pkg/util/tracing/recording.go @@ -386,7 +386,7 @@ type TraceCollection struct { // expected event (i.e. log message). // // Use with something like: -// if err := TestingCheckRecordedSpans(tracing.GetRecording(Span), ` +// if err := TestingCheckRecordedSpans(Span.GetRecording(), ` // Span root: // event: a // event: c diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index 89e3f734c37f..a51394f94d58 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -204,8 +204,8 @@ func (s *Span) isNoop() bool { } // IsRecording returns true if the Span is recording its events. -func IsRecording(sp *Span) bool { - return sp.crdb.isRecording() +func (s *Span) IsRecording() bool { + return s.crdb.isRecording() } // enableRecording start recording on the Span. From now on, log events and child spans @@ -248,18 +248,18 @@ func (s *crdbSpan) enableRecording( // // Children spans created from the Span while it is *not* recording will not // necessarily be recordable. -func StartRecording(sp *Span, recType RecordingType) { +func (s *Span) StartRecording(recType RecordingType) { if recType == NoRecording { panic("StartRecording called with NoRecording") } - if sp.isNoop() { + if s.isNoop() { panic("StartRecording called on NoopSpan; use the Recordable option for StartSpan") } // If we're already recording (perhaps because the parent was recording when // this Span was created), there's nothing to do. - if !sp.crdb.isRecording() { - sp.crdb.enableRecording(nil /* parent */, recType, false /* separateRecording */) + if !s.crdb.isRecording() { + s.crdb.enableRecording(nil /* parent */, recType, false /* separateRecording */) } } @@ -270,11 +270,7 @@ func StartRecording(sp *Span, recType RecordingType) { // when all the spans finish. // // StopRecording() can be called on a Finish()ed Span. -func StopRecording(sp *Span) { - sp.disableRecording() -} - -func (s *Span) disableRecording() { +func (s *Span) StopRecording() { if s.isNoop() { panic("can't disable recording a noop Span") } @@ -295,20 +291,11 @@ func (s *crdbSpan) disableRecording() { } } -// IsRecordable returns true if {Start,Stop}Recording() can be called on this -// Span. -// -// In other words, this tests if the Span is our custom type, and not a noopSpan -// or anything else. -func IsRecordable(sp *Span) bool { - return !sp.isNoop() -} - // GetRecording retrieves the current recording, if the Span has recording // enabled. This can be called while spans that are part of the recording are // still open; it can run concurrently with operations on those spans. -func GetRecording(sp *Span) Recording { - return sp.crdb.getRecording() +func (s *Span) GetRecording() Recording { + return s.crdb.getRecording() } func (s *crdbSpan) getRecording() Recording { @@ -341,8 +328,8 @@ func (s *crdbSpan) getRecording() Recording { // ImportRemoteSpans adds RecordedSpan data to the recording of the given Span; // these spans will be part of the result of GetRecording. Used to import // recorded traces from other nodes. -func ImportRemoteSpans(sp *Span, remoteSpans []tracingpb.RecordedSpan) error { - return sp.crdb.ImportRemoteSpans(remoteSpans) +func (s *Span) ImportRemoteSpans(remoteSpans []tracingpb.RecordedSpan) error { + return s.crdb.ImportRemoteSpans(remoteSpans) } func (s *crdbSpan) ImportRemoteSpans(remoteSpans []tracingpb.RecordedSpan) error { @@ -360,7 +347,7 @@ func (s *crdbSpan) ImportRemoteSpans(remoteSpans []tracingpb.RecordedSpan) error return nil } -// IsBlackHoleSpan returns true if events for this Span are just dropped. This +// IsBlackHole returns true if events for this Span are just dropped. This // is the case when the Span is not recording and no external tracer is configured. // Tracing clients can use this method to figure out if they can short-circuit some // tracing-related work that would be discarded anyway. @@ -368,35 +355,31 @@ func (s *crdbSpan) ImportRemoteSpans(remoteSpans []tracingpb.RecordedSpan) error // The child of a blackhole Span is a non-recordable blackhole Span[*]. These incur // only minimal overhead. It is therefore not worth it to call this method to avoid // starting spans. -func IsBlackHoleSpan(sp *Span) bool { - return sp.isBlackHole() +func (s *Span) IsBlackHole() bool { + return s.isBlackHole() } -// IsNoopContext returns true if the Span context is from a "no-op" Span. If +// IsNoop returns true if the Span context is from a "no-op" Span. If // this is true, any Span derived from this context will be a "black hole Span". // // You should never need to care about this method. It is exported for technical // reasons. -func IsNoopContext(sc *SpanContext) bool { - return sc.isNoop() -} - -func (sc *SpanContext) isNoop() bool { +func (sc *SpanContext) IsNoop() bool { return sc.recordingType == NoRecording && sc.shadowTr == nil } // SetSpanStats sets the stats on a Span. stats.Stats() will also be added to // the Span tags. -func SetSpanStats(sp *Span, stats SpanStats) { - if sp.isNoop() { +func (s *Span) SetSpanStats(stats SpanStats) { + if s.isNoop() { return } - sp.crdb.mu.Lock() - sp.crdb.mu.stats = stats + s.crdb.mu.Lock() + s.crdb.mu.stats = stats for name, value := range stats.Stats() { - sp.setTagInner(StatTagPrefix+name, value, true /* locked */) + s.setTagInner(StatTagPrefix+name, value, true /* locked */) } - sp.crdb.mu.Unlock() + s.crdb.mu.Unlock() } // Finish is part of the opentracing.Span interface. diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index ceaa76a82a52..9b4fbff73d4d 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -28,7 +28,7 @@ func TestRecordingString(t *testing.T) { tr2 := NewTracer() root := tr.StartSpan("root", Recordable) - StartRecording(root, SnowballRecording) + root.StartRecording(SnowballRecording) root.LogFields(otlog.String(tracingpb.LogMessageField, "root 1")) // Hackily fix the timing on the first log message, so that we can check it later. root.crdb.mu.recording.recordedLogs[0].Timestamp = root.crdb.startTime.Add(time.Millisecond) @@ -45,8 +45,8 @@ func TestRecordingString(t *testing.T) { remoteChild.LogFields(otlog.String(tracingpb.LogMessageField, "remote child 1")) require.NoError(t, err) remoteChild.Finish() - remoteRec := GetRecording(remoteChild) - err = ImportRemoteSpans(root, remoteRec) + remoteRec := remoteChild.GetRecording() + err = root.ImportRemoteSpans(remoteRec) require.NoError(t, err) root.Finish() @@ -60,7 +60,7 @@ func TestRecordingString(t *testing.T) { root.LogFields(otlog.String(tracingpb.LogMessageField, "root 5")) root.Finish() - rec := GetRecording(root) + rec := root.GetRecording() // Sanity check that the recording looks like we want. Note that this is not // its String() representation; this just list all the spans in order. err = TestingCheckRecordedSpans(rec, ` @@ -146,15 +146,15 @@ func TestRecordingInRecording(t *testing.T) { tr := NewTracer() root := tr.StartSpan("root", Recordable) - StartRecording(root, SnowballRecording) + root.StartRecording(SnowballRecording) child := tr.StartSpan("child", opentracing.ChildOf(root.Context()), Recordable) - StartRecording(child, SnowballRecording) + child.StartRecording(SnowballRecording) grandChild := tr.StartSpan("grandchild", opentracing.ChildOf(child.Context())) grandChild.Finish() child.Finish() root.Finish() - rootRec := GetRecording(root) + rootRec := root.GetRecording() require.NoError(t, TestingCheckRecordedSpans(rootRec, ` Span root: tags: sb=1 @@ -164,7 +164,7 @@ Span grandchild: tags: sb=1 `)) - childRec := GetRecording(child) + childRec := child.GetRecording() require.NoError(t, TestingCheckRecordedSpans(childRec, ` Span child: tags: sb=1 diff --git a/pkg/util/tracing/tags_test.go b/pkg/util/tracing/tags_test.go index 7f90825ce0a2..592a3a675050 100644 --- a/pkg/util/tracing/tags_test.go +++ b/pkg/util/tracing/tags_test.go @@ -25,9 +25,9 @@ func TestLogTags(t *testing.T) { l := logtags.SingleTagBuffer("tag1", "val1") l = l.Add("tag2", "val2") sp1 := tr.StartSpan("foo", Recordable, LogTags(l)) - StartRecording(sp1, SingleNodeRecording) + sp1.StartRecording(SingleNodeRecording) sp1.Finish() - require.NoError(t, TestingCheckRecordedSpans(GetRecording(sp1), ` + require.NoError(t, TestingCheckRecordedSpans(sp1.GetRecording(), ` Span foo: tags: tag1=val1 tag2=val2 `)) @@ -38,9 +38,9 @@ func TestLogTags(t *testing.T) { RegisterTagRemapping("tag2", "two") sp2 := tr.StartSpan("bar", Recordable, LogTags(l)) - StartRecording(sp2, SingleNodeRecording) + sp2.StartRecording(SingleNodeRecording) sp2.Finish() - require.NoError(t, TestingCheckRecordedSpans(GetRecording(sp2), ` + require.NoError(t, TestingCheckRecordedSpans(sp2.GetRecording(), ` Span bar: tags: one=val1 two=val2 `)) @@ -48,9 +48,9 @@ func TestLogTags(t *testing.T) { shadowTracer.clear() sp3 := tr.StartRootSpan("baz", l, RecordableSpan) - StartRecording(sp3, SingleNodeRecording) + sp3.StartRecording(SingleNodeRecording) sp3.Finish() - require.NoError(t, TestingCheckRecordedSpans(GetRecording(sp3), ` + require.NoError(t, TestingCheckRecordedSpans(sp3.GetRecording(), ` Span baz: tags: one=val1 two=val2 `)) diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index f5a7418f3dd8..1da2b2af4cf1 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -196,7 +196,7 @@ func (t *Tracer) StartSpan(operationName string, opts ...opentracing.StartSpanOp // case) with a noop context, return a noop Span now. if len(opts) == 1 { if o, ok := opts[0].(opentracing.SpanReference); ok { - if IsNoopContext(o.ReferencedContext.(*SpanContext)) { + if o.ReferencedContext.(*SpanContext).IsNoop() { return t.noopSpan } } @@ -237,7 +237,7 @@ func (t *Tracer) StartSpan(operationName string, opts ...opentracing.StartSpanOp if r.ReferencedContext == nil { continue } - if IsNoopContext(r.ReferencedContext.(*SpanContext)) { + if r.ReferencedContext.(*SpanContext).IsNoop() { continue } parentType = r.Type @@ -422,7 +422,7 @@ func (fn textMapWriterFn) Set(key, val string) { func (t *Tracer) Inject( osc opentracing.SpanContext, format interface{}, carrier interface{}, ) error { - if IsNoopContext(osc.(*SpanContext)) { + if osc.(*SpanContext).IsNoop() { // Fast path when tracing is disabled. Extract will accept an empty map as a // noop context. return nil @@ -575,7 +575,7 @@ func ForkCtxSpan(ctx context.Context, opName string) (context.Context, *Span) { return ctx, sp } tr := sp.Tracer() - if IsBlackHoleSpan(sp) { + if sp.IsBlackHole() { ns := tr.noopSpan return ContextWithSpan(ctx, ns), ns } @@ -610,7 +610,7 @@ func childSpan( return ctx, sp } tr := sp.Tracer() - if IsBlackHoleSpan(sp) { + if sp.IsBlackHole() { ns := tr.noopSpan return ContextWithSpan(ctx, ns), ns } @@ -679,7 +679,7 @@ func StartSnowballTrace( } else { span = tracer.StartSpan(opName, Recordable, LogTagsFromCtx(ctx)) } - StartRecording(span, SnowballRecording) + span.StartRecording(SnowballRecording) return ContextWithSpan(ctx, span), span } @@ -695,18 +695,15 @@ func ContextWithRecordingSpan( ) (retCtx context.Context, getRecording func() Recording, cancel func()) { tr := NewTracer() sp := tr.StartSpan(opName, Recordable, LogTagsFromCtx(ctx)) - StartRecording(sp, SnowballRecording) + sp.StartRecording(SnowballRecording) ctx, cancelCtx := context.WithCancel(ctx) ctx = ContextWithSpan(ctx, sp) - getRecording = func() Recording { - return GetRecording(sp) - } cancel = func() { cancelCtx() - StopRecording(sp) + sp.StopRecording() sp.Finish() tr.Close() } - return ctx, getRecording, cancel + return ctx, sp.GetRecording, cancel } diff --git a/pkg/util/tracing/tracer_test.go b/pkg/util/tracing/tracer_test.go index 6afa4a1c96d7..26c4473f9476 100644 --- a/pkg/util/tracing/tracer_test.go +++ b/pkg/util/tracing/tracer_test.go @@ -38,7 +38,7 @@ func TestTracerRecording(t *testing.T) { if s1.isNoop() { t.Error("Recordable (but not recording) Span should not be noop") } - if !IsBlackHoleSpan(s1) { + if !s1.IsBlackHole() { t.Error("Recordable Span should be black hole") } @@ -50,15 +50,15 @@ func TestTracerRecording(t *testing.T) { noop3.Finish() s1.LogKV("x", 1) - StartRecording(s1, SingleNodeRecording) + s1.StartRecording(SingleNodeRecording) s1.LogKV("x", 2) s2 := tr.StartSpan("b", opentracing.ChildOf(s1.Context())) - if IsBlackHoleSpan(s2) { + if s2.IsBlackHole() { t.Error("recording Span should not be black hole") } s2.LogKV("x", 3) - if err := TestingCheckRecordedSpans(GetRecording(s1), ` + if err := TestingCheckRecordedSpans(s1.GetRecording(), ` Span a: tags: unfinished= x: 2 @@ -69,7 +69,7 @@ func TestTracerRecording(t *testing.T) { t.Fatal(err) } - if err := TestingCheckRecordedSpans(GetRecording(s2), ` + if err := TestingCheckRecordedSpans(s2.GetRecording(), ` Span b: tags: unfinished= x: 3 @@ -83,7 +83,7 @@ func TestTracerRecording(t *testing.T) { s2.Finish() - if err := TestingCheckRecordedSpans(GetRecording(s1), ` + if err := TestingCheckRecordedSpans(s1.GetRecording(), ` Span a: tags: unfinished= x: 2 @@ -96,7 +96,7 @@ func TestTracerRecording(t *testing.T) { t.Fatal(err) } s3.Finish() - if err := TestingCheckRecordedSpans(GetRecording(s1), ` + if err := TestingCheckRecordedSpans(s1.GetRecording(), ` Span a: tags: unfinished= x: 2 @@ -108,15 +108,15 @@ func TestTracerRecording(t *testing.T) { `); err != nil { t.Fatal(err) } - StopRecording(s1) + s1.StopRecording() s1.LogKV("x", 100) - if err := TestingCheckRecordedSpans(GetRecording(s1), ``); err != nil { + if err := TestingCheckRecordedSpans(s1.GetRecording(), ``); err != nil { t.Fatal(err) } // The child Span is still recording. s3.LogKV("x", 5) - if err := TestingCheckRecordedSpans(GetRecording(s3), ` + if err := TestingCheckRecordedSpans(s3.GetRecording(), ` Span c: tags: tag=val x: 4 @@ -130,11 +130,11 @@ func TestTracerRecording(t *testing.T) { func TestStartChildSpan(t *testing.T) { tr := NewTracer() sp1 := tr.StartSpan("parent", Recordable) - StartRecording(sp1, SingleNodeRecording) + sp1.StartRecording(SingleNodeRecording) sp2 := tr.StartChildSpan("child", sp1.SpanContext(), nil /* logTags */, false /* recordable */, false /*separateRecording*/) sp2.Finish() sp1.Finish() - if err := TestingCheckRecordedSpans(GetRecording(sp1), ` + if err := TestingCheckRecordedSpans(sp1.GetRecording(), ` Span parent: Span child: `); err != nil { @@ -142,29 +142,29 @@ func TestStartChildSpan(t *testing.T) { } sp1 = tr.StartSpan("parent", Recordable) - StartRecording(sp1, SingleNodeRecording) + sp1.StartRecording(SingleNodeRecording) sp2 = tr.StartChildSpan("child", sp1.SpanContext(), nil /* logTags */, false /* recordable */, true /*separateRecording*/) sp2.Finish() sp1.Finish() - if err := TestingCheckRecordedSpans(GetRecording(sp1), ` + if err := TestingCheckRecordedSpans(sp1.GetRecording(), ` Span parent: `); err != nil { t.Fatal(err) } - if err := TestingCheckRecordedSpans(GetRecording(sp2), ` + if err := TestingCheckRecordedSpans(sp2.GetRecording(), ` Span child: `); err != nil { t.Fatal(err) } sp1 = tr.StartSpan("parent", Recordable) - StartRecording(sp1, SingleNodeRecording) + sp1.StartRecording(SingleNodeRecording) sp2 = tr.StartChildSpan( "child", sp1.SpanContext(), logtags.SingleTagBuffer("key", "val"), false /* recordable */, false, /*separateRecording*/ ) sp2.Finish() sp1.Finish() - if err := TestingCheckRecordedSpans(GetRecording(sp1), ` + if err := TestingCheckRecordedSpans(sp1.GetRecording(), ` Span parent: Span child: tags: key=val @@ -195,7 +195,7 @@ func TestTracerInjectExtract(t *testing.T) { if err != nil { t.Fatal(err) } - if !wireContext.isNoop() { + if !wireContext.IsNoop() { t.Errorf("expected noop context: %v", wireContext) } noop2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext)) @@ -209,7 +209,7 @@ func TestTracerInjectExtract(t *testing.T) { // remote side. s1 := tr.StartSpan("a", Recordable) - StartRecording(s1, SnowballRecording) + s1.StartRecording(SnowballRecording) carrier = make(opentracing.HTTPHeadersCarrier) if err := tr.Inject(s1.Context(), opentracing.HTTPHeaders, carrier); err != nil { @@ -232,7 +232,7 @@ func TestTracerInjectExtract(t *testing.T) { s2.Finish() // Verify that recording was started automatically. - rec := GetRecording(s2) + rec := s2.GetRecording() if err := TestingCheckRecordedSpans(rec, ` Span remote op: tags: sb=1 @@ -241,19 +241,19 @@ func TestTracerInjectExtract(t *testing.T) { t.Fatal(err) } - if err := TestingCheckRecordedSpans(GetRecording(s1), ` + if err := TestingCheckRecordedSpans(s1.GetRecording(), ` Span a: tags: sb=1 unfinished= `); err != nil { t.Fatal(err) } - if err := ImportRemoteSpans(s1, rec); err != nil { + if err := s1.ImportRemoteSpans(rec); err != nil { t.Fatal(err) } s1.Finish() - if err := TestingCheckRecordedSpans(GetRecording(s1), ` + if err := TestingCheckRecordedSpans(s1.GetRecording(), ` Span a: tags: sb=1 Span remote op: From a76969f3f4ad6897dd3ea78aadfecc05a00545c8 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 28 Oct 2020 11:02:00 +0100 Subject: [PATCH 3/5] tracing: allow Finish() on nil *Span This is one of the immediate useful outcomes of moving off the `opentracing.Span` interface: we now get to decide what `(nil).Finish()` does, removing the need for workarounds. Release note: None --- pkg/ccl/backupccl/backup_planning.go | 2 +- pkg/ccl/backupccl/backup_processor.go | 2 +- pkg/ccl/backupccl/restore_job.go | 4 ++-- pkg/ccl/backupccl/restore_planning.go | 2 +- pkg/ccl/backupccl/show.go | 4 ++-- pkg/ccl/backupccl/split_and_scatter_processor.go | 2 +- pkg/ccl/changefeedccl/changefeed_stmt.go | 2 +- pkg/ccl/importccl/exportcsv.go | 2 +- pkg/ccl/importccl/import_processor.go | 4 ++-- pkg/ccl/importccl/import_stmt.go | 2 +- pkg/ccl/importccl/read_import_base.go | 4 ++-- pkg/ccl/storageccl/export.go | 2 +- pkg/ccl/storageccl/writebatch.go | 2 +- pkg/kv/kvclient/kvcoord/range_cache.go | 2 +- pkg/kv/kvserver/batcheval/cmd_add_sstable.go | 2 +- pkg/kv/kvserver/replica_application_cmd.go | 2 +- pkg/kv/kvserver/replica_proposal.go | 2 +- pkg/sql/distsql/server.go | 4 ++-- pkg/sql/execinfra/processorsbase.go | 2 +- pkg/sql/flowinfra/outbox.go | 4 ++-- pkg/sql/grant_role.go | 2 +- pkg/sql/revoke_role.go | 2 +- pkg/sql/rowexec/backfiller.go | 4 ++-- pkg/sql/rowexec/indexbackfiller.go | 2 +- pkg/sql/rowflow/routers.go | 2 +- pkg/sql/stats/stats_cache.go | 2 +- pkg/util/limit/limiter.go | 2 +- pkg/util/stop/stopper.go | 4 ++-- pkg/util/tracing/span.go | 15 +++++---------- pkg/util/tracing/tracer.go | 8 -------- 30 files changed, 41 insertions(+), 54 deletions(-) diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 7289b1efb589..efe8b9d370f1 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -560,7 +560,7 @@ func backupPlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { // TODO(dan): Move this span into sql. ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() if !(p.ExtendedEvalContext().TxnImplicit || backupStmt.Options.Detached) { return errors.Errorf("BACKUP cannot be used inside a transaction without DETACHED option") diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index ff84afa76948..ff67d54fc012 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -88,7 +88,7 @@ func newBackupDataProcessor( func (cp *backupDataProcessor) Run(ctx context.Context) { ctx, span := tracing.ChildSpan(ctx, "backupDataProcessor") - defer tracing.FinishSpan(span) + defer span.Finish() defer cp.output.ProducerDone() progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index b5732d653e5e..8d310f913b25 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -294,7 +294,7 @@ func WriteDescriptors( extra []roachpb.KeyValue, ) error { ctx, span := tracing.ChildSpan(ctx, "WriteDescriptors") - defer tracing.FinishSpan(span) + defer span.Finish() err := func() error { b := txn.NewBatch() wroteDBs := make(map[descpb.ID]catalog.DatabaseDescriptor) @@ -580,7 +580,7 @@ func restore( requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block g.GoCtx(func(ctx context.Context) error { ctx, progressSpan := tracing.ChildSpan(ctx, "progress-log") - defer tracing.FinishSpan(progressSpan) + defer progressSpan.Finish() return progressLogger.Loop(ctx, requestFinishedCh) }) diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index add5516452f8..fa05427452c5 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -1225,7 +1225,7 @@ func restorePlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { // TODO(dan): Move this span into sql. ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() if !(p.ExtendedEvalContext().TxnImplicit || restoreStmt.Options.Detached) { return errors.Errorf("RESTORE cannot be used inside a transaction without DETACHED option") diff --git a/pkg/ccl/backupccl/show.go b/pkg/ccl/backupccl/show.go index d8aaca756340..c1804033066a 100644 --- a/pkg/ccl/backupccl/show.go +++ b/pkg/ccl/backupccl/show.go @@ -95,7 +95,7 @@ func showBackupPlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { // TODO(dan): Move this span into sql. ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() str, err := toFn() if err != nil { @@ -487,7 +487,7 @@ func showBackupsInCollectionPlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { ctx, span := tracing.ChildSpan(ctx, backup.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() collection, err := collectionFn() if err != nil { diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index f234aee46780..1a35b8d09852 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -158,7 +158,7 @@ type entryNode struct { // Run implements the execinfra.Processor interface. func (ssp *splitAndScatterProcessor) Run(ctx context.Context) { ctx, span := tracing.ChildSpan(ctx, "splitAndScatterProcessor") - defer tracing.FinishSpan(span) + defer span.Finish() defer ssp.output.ProducerDone() numEntries := 0 diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index a6b2f878293a..c4c4553debc0 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -108,7 +108,7 @@ func changefeedPlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() ok, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED) if err != nil { diff --git a/pkg/ccl/importccl/exportcsv.go b/pkg/ccl/importccl/exportcsv.go index e1fdfa63a1cd..c01d5d20b2da 100644 --- a/pkg/ccl/importccl/exportcsv.go +++ b/pkg/ccl/importccl/exportcsv.go @@ -167,7 +167,7 @@ func (sp *csvWriter) OutputTypes() []*types.T { func (sp *csvWriter) Run(ctx context.Context) { ctx, span := tracing.ChildSpan(ctx, "csvWriter") - defer tracing.FinishSpan(span) + defer span.Finish() err := func() error { typs := sp.input.OutputTypes() diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index 2b4cc08dfe5e..1072c775c83f 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -77,7 +77,7 @@ func newReadImportDataProcessor( func (cp *readImportDataProcessor) Run(ctx context.Context) { ctx, span := tracing.ChildSpan(ctx, "readImportDataProcessor") - defer tracing.FinishSpan(span) + defer span.Finish() defer cp.output.ProducerDone() progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) @@ -209,7 +209,7 @@ func ingestKvs( kvCh <-chan row.KVBatch, ) (*roachpb.BulkOpSummary, error) { ctx, span := tracing.ChildSpan(ctx, "ingestKVs") - defer tracing.FinishSpan(span) + defer span.Finish() writeTS := hlc.Timestamp{WallTime: spec.WalltimeNanos} diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 41175d01b47d..f527ce39fd94 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -282,7 +282,7 @@ func importPlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { // TODO(dan): Move this span into sql. ctx, span := tracing.ChildSpan(ctx, importStmt.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() walltime := p.ExecCfg().Clock.Now().WallTime diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index da7087ce9307..4d379443a861 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -85,7 +85,7 @@ func runImport( group.GoCtx(func(ctx context.Context) error { defer close(kvCh) ctx, span := tracing.ChildSpan(ctx, "readImportFiles") - defer tracing.FinishSpan(span) + defer span.Finish() var inputs map[int32]string if spec.ResumePos != nil { // Filter out files that were completely processed. @@ -540,7 +540,7 @@ func runParallelImport( minEmited := make([]int64, parallelism) group.GoCtx(func(ctx context.Context) error { ctx, span := tracing.ChildSpan(ctx, "inputconverter") - defer tracing.FinishSpan(span) + defer span.Finish() return ctxgroup.GroupWorkers(ctx, parallelism, func(ctx context.Context, id int) error { return importer.importWorker(ctx, id, consumer, importCtx, fileCtx, minEmited) }) diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index 97af3bf04916..1ab8645f13a6 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -76,7 +76,7 @@ func evalExport( reply := resp.(*roachpb.ExportResponse) ctx, span := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey)) - defer tracing.FinishSpan(span) + defer span.Finish() // For MVCC_All backups with no start time, they'll only be capturing the // *revisions* since the gc threshold, so noting that in the reply allows the diff --git a/pkg/ccl/storageccl/writebatch.go b/pkg/ccl/storageccl/writebatch.go index 411fe4410b84..4e87b4840c6b 100644 --- a/pkg/ccl/storageccl/writebatch.go +++ b/pkg/ccl/storageccl/writebatch.go @@ -41,7 +41,7 @@ func evalWriteBatch( ms := cArgs.Stats _, span := tracing.ChildSpan(ctx, fmt.Sprintf("WriteBatch [%s,%s)", args.Key, args.EndKey)) - defer tracing.FinishSpan(span) + defer span.Finish() if log.V(1) { log.Infof(ctx, "writebatch [%s,%s)", args.Key, args.EndKey) } diff --git a/pkg/kv/kvclient/kvcoord/range_cache.go b/pkg/kv/kvclient/kvcoord/range_cache.go index f488f6c62959..3e126247cc26 100644 --- a/pkg/kv/kvclient/kvcoord/range_cache.go +++ b/pkg/kv/kvclient/kvcoord/range_cache.go @@ -655,7 +655,7 @@ func (rdc *RangeDescriptorCache) tryLookup( var lookupRes EvictionToken if err := rdc.stopper.RunTaskWithErr(ctx, "rangecache: range lookup", func(ctx context.Context) error { ctx, reqSpan := tracing.ForkCtxSpan(ctx, "range lookup") - defer tracing.FinishSpan(reqSpan) + defer reqSpan.Finish() // Clear the context's cancelation. This request services potentially many // callers waiting for its result, and using the flight's leader's // cancelation doesn't make sense. diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index f824a1eefed0..39f80ed4478f 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -42,7 +42,7 @@ func EvalAddSSTable( // TODO(tschottdorf): restore the below in some form (gets in the way of testing). // _, span := tracing.ChildSpan(ctx, fmt.Sprintf("AddSSTable [%s,%s)", args.Key, args.EndKey)) - // defer tracing.FinishSpan(span) + // defer span.Finish() log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key) // IMPORT INTO should not proceed if any KVs from the SST shadow existing data diff --git a/pkg/kv/kvserver/replica_application_cmd.go b/pkg/kv/kvserver/replica_application_cmd.go index d348950c0605..7e4c51d9903f 100644 --- a/pkg/kv/kvserver/replica_application_cmd.go +++ b/pkg/kv/kvserver/replica_application_cmd.go @@ -181,7 +181,7 @@ func (c *replicatedCmd) FinishNonLocal(ctx context.Context) { } func (c *replicatedCmd) finishTracingSpan() { - tracing.FinishSpan(c.sp) + c.sp.Finish() c.ctx, c.sp = nil, nil } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 2cdb56fcb6f6..61b6891b25f9 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -138,7 +138,7 @@ func (proposal *ProposalData) finishApplication(ctx context.Context, pr proposal proposal.ec.done(ctx, proposal.Request, pr.Reply, pr.Err) proposal.signalProposalResult(pr) if proposal.sp != nil { - tracing.FinishSpan(proposal.sp) + proposal.sp.Finish() proposal.sp = nil } } diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index e833b0418c41..3e95b2cd7f08 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -263,7 +263,7 @@ func (ds *ServerImpl) setupFlow( sd, err := sessiondata.UnmarshalNonLocal(req.EvalContext.SessionData) if err != nil { - tracing.FinishSpan(sp) + sp.Finish() return ctx, nil, err } ie := &lazyInternalExecutor{ @@ -329,7 +329,7 @@ func (ds *ServerImpl) setupFlow( // Flow.Cleanup will not be called, so we have to close the memory monitor // and finish the span manually. monitor.Stop(ctx) - tracing.FinishSpan(sp) + sp.Finish() ctx = tracing.ContextWithSpan(ctx, nil) return ctx, nil, err } diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 536bf2029a8e..b8b6e0c67bc8 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -885,7 +885,7 @@ func (pb *ProcessorBase) InternalClose() bool { } pb.Closed = true - tracing.FinishSpan(pb.span) + pb.span.Finish() pb.span = nil // Reset the context so that any incidental uses after this point do not // access the finished span. diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index 4a896a265b84..7b6812ab6e24 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -213,7 +213,7 @@ func (m *Outbox) mainLoop(ctx context.Context) error { spanFinished := false defer func() { if !spanFinished { - tracing.FinishSpan(span) + span.Finish() } }() @@ -286,7 +286,7 @@ func (m *Outbox) mainLoop(ctx context.Context) error { m.stats.BytesSent = 0 } span.SetSpanStats(&m.stats) - tracing.FinishSpan(span) + span.Finish() spanFinished = true if trace := execinfra.GetTraceData(ctx); trace != nil { err := m.addRow(ctx, nil, &execinfrapb.ProducerMetadata{TraceData: trace}) diff --git a/pkg/sql/grant_role.go b/pkg/sql/grant_role.go index 5dd58335e0e0..ac1fc3027d64 100644 --- a/pkg/sql/grant_role.go +++ b/pkg/sql/grant_role.go @@ -48,7 +48,7 @@ func (p *planner) GrantRoleNode(ctx context.Context, n *tree.GrantRole) (*GrantR sqltelemetry.IncIAMGrantCounter(n.AdminOption) ctx, span := tracing.ChildSpan(ctx, n.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() hasAdminRole, err := p.HasAdminRole(ctx) if err != nil { diff --git a/pkg/sql/revoke_role.go b/pkg/sql/revoke_role.go index 2ac4e4e5d098..998dab33d884 100644 --- a/pkg/sql/revoke_role.go +++ b/pkg/sql/revoke_role.go @@ -45,7 +45,7 @@ func (p *planner) RevokeRoleNode(ctx context.Context, n *tree.RevokeRole) (*Revo sqltelemetry.IncIAMRevokeCounter(n.AdminOption) ctx, span := tracing.ChildSpan(ctx, n.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() hasAdminRole, err := p.HasAdminRole(ctx) if err != nil { diff --git a/pkg/sql/rowexec/backfiller.go b/pkg/sql/rowexec/backfiller.go index 1a52d7a030bc..4edcaa344d01 100644 --- a/pkg/sql/rowexec/backfiller.go +++ b/pkg/sql/rowexec/backfiller.go @@ -119,7 +119,7 @@ func (b *backfiller) Run(ctx context.Context) { opName := fmt.Sprintf("%sBackfiller", b.name) ctx = logtags.AddTag(ctx, opName, int(b.spec.Table.ID)) ctx, span := execinfra.ProcessorSpan(ctx, opName) - defer tracing.FinishSpan(span) + defer span.Finish() meta := b.doRun(ctx) execinfra.SendTraceData(ctx, b.output) if emitHelper(ctx, &b.out, nil /* row */, meta, func(ctx context.Context) {}) { @@ -332,7 +332,7 @@ func WriteResumeSpan( jobsRegistry *jobs.Registry, ) error { ctx, traceSpan := tracing.ChildSpan(ctx, "checkpoint") - defer tracing.FinishSpan(traceSpan) + defer traceSpan.Finish() return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { resumeSpans, job, mutationIdx, error := GetResumeSpans( diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go index fbbdeeb9fec3..19dbf2b980cc 100644 --- a/pkg/sql/rowexec/indexbackfiller.go +++ b/pkg/sql/rowexec/indexbackfiller.go @@ -161,7 +161,7 @@ func (ib *indexBackfiller) runChunk( } ctx, traceSpan := tracing.ChildSpan(tctx, "chunk") - defer tracing.FinishSpan(traceSpan) + defer traceSpan.Finish() var key roachpb.Key diff --git a/pkg/sql/rowflow/routers.go b/pkg/sql/rowflow/routers.go index e2910a90a577..5e04594d5f04 100644 --- a/pkg/sql/rowflow/routers.go +++ b/pkg/sql/rowflow/routers.go @@ -372,7 +372,7 @@ func (rb *routerBase) Start(ctx context.Context, wg *sync.WaitGroup, ctxCancel c ro.stats.MaxAllocatedMem = ro.memoryMonitor.MaximumBytes() ro.stats.MaxAllocatedDisk = ro.diskMonitor.MaximumBytes() span.SetSpanStats(&ro.stats) - tracing.FinishSpan(span) + span.Finish() if trace := execinfra.GetTraceData(ctx); trace != nil { ro.mu.Unlock() rb.semaphore <- struct{}{} diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go index 379e95f08654..08ff89504b7b 100644 --- a/pkg/sql/stats/stats_cache.go +++ b/pkg/sql/stats/stats_cache.go @@ -318,7 +318,7 @@ func (sc *TableStatisticsCache) RefreshTableStats(ctx context.Context, tableID d ctx, span := tracing.ForkCtxSpan(ctx, "refresh-table-stats") // Perform an asynchronous refresh of the cache. go func() { - defer tracing.FinishSpan(span) + defer span.Finish() sc.refreshCacheEntry(ctx, tableID) }() } diff --git a/pkg/util/limit/limiter.go b/pkg/util/limit/limiter.go index 6a0acc88780d..962a1003d9d8 100644 --- a/pkg/util/limit/limiter.go +++ b/pkg/util/limit/limiter.go @@ -44,7 +44,7 @@ func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) error { } // If not, start a span and begin waiting. ctx, span := tracing.ChildSpan(ctx, l.spanName) - defer tracing.FinishSpan(span) + defer span.Finish() return l.sem.Acquire(ctx, 1) } diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index f79ce3b2fc4f..f2f8bfb460fe 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -341,7 +341,7 @@ func (s *Stopper) RunAsyncTask( go func() { defer s.Recover(ctx) defer s.runPostlude(taskName) - defer tracing.FinishSpan(span) + defer span.Finish() f(ctx) }() @@ -400,7 +400,7 @@ func (s *Stopper) RunLimitedAsyncTask( defer s.Recover(ctx) defer s.runPostlude(taskName) defer alloc.Release() - defer tracing.FinishSpan(span) + defer span.Finish() f(ctx) }() diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index a51394f94d58..aad1cc2595d0 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -382,20 +382,15 @@ func (s *Span) SetSpanStats(stats SpanStats) { s.crdb.mu.Unlock() } -// Finish is part of the opentracing.Span interface. +// Finish marks the Span as completed. Finishing a nil *Span is a noop. func (s *Span) Finish() { - s.FinishWithOptions(opentracing.FinishOptions{}) -} - -// FinishWithOptions is part of the opentracing.Span interface. -func (s *Span) FinishWithOptions(opts opentracing.FinishOptions) { - if s.isNoop() { + if s == nil { return } - finishTime := opts.FinishTime - if finishTime.IsZero() { - finishTime = time.Now() + if s.isNoop() { + return } + finishTime := time.Now() s.crdb.mu.Lock() s.crdb.mu.duration = finishTime.Sub(s.crdb.startTime) s.crdb.mu.Unlock() diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index 1da2b2af4cf1..b2520f53af3b 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -552,14 +552,6 @@ func (t *Tracer) Extract(format interface{}, carrier interface{}) (*SpanContext, return &sc, nil } -// FinishSpan closes the given Span (if not nil). It is a convenience wrapper -// for Span.Finish() which tolerates nil spans. -func FinishSpan(span *Span) { - if span != nil { - span.Finish() - } -} - // ForkCtxSpan checks if ctx has a Span open; if it does, it creates a new Span // that "follows from" the original Span. This allows the resulting context to be // used in an async task that might outlive the original operation. From e79dbcfc2108997ab2856c25ee01ee526138ce12 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 28 Oct 2020 11:28:36 +0100 Subject: [PATCH 4/5] tracing: remove (*Tracer).SetForceRealSpans It was a testing crutch that is no longer necessary now that we've moved off the opentracing interfaces. Release note: None --- pkg/util/log/ambient_context_test.go | 21 ++++++++------------- pkg/util/log/trace_test.go | 9 +++------ pkg/util/tracing/tracer.go | 16 +--------------- 3 files changed, 12 insertions(+), 34 deletions(-) diff --git a/pkg/util/log/ambient_context_test.go b/pkg/util/log/ambient_context_test.go index 42f62466ef68..bbae949066df 100644 --- a/pkg/util/log/ambient_context_test.go +++ b/pkg/util/log/ambient_context_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/logtags" + "github.com/stretchr/testify/require" ) func TestAnnotateCtxTags(t *testing.T) { @@ -40,14 +41,13 @@ func TestAnnotateCtxTags(t *testing.T) { func TestAnnotateCtxSpan(t *testing.T) { tracer := tracing.NewTracer() - tracer.SetForceRealSpans(true) ac := AmbientContext{Tracer: tracer} ac.AddLogTag("ambient", nil) // Annotate a context that has an open span. - sp1 := tracer.StartSpan("root") + sp1 := tracer.StartRootSpan("root", nil /* logTags */, tracing.RecordableSpan) sp1.StartRecording(tracing.SingleNodeRecording) ctx1 := tracing.ContextWithSpan(context.Background(), sp1) Event(ctx1, "a") @@ -70,20 +70,15 @@ func TestAnnotateCtxSpan(t *testing.T) { t.Fatal(err) } - // Annotate a context that has no span. + // Annotate a context that has no span. The tracer will create a non-recordable + // span. We just check here that AnnotateCtxWithSpan properly returns it to the + // caller. ac.Tracer = tracer ctx, sp := ac.AnnotateCtxWithSpan(context.Background(), "s") - sp.StartRecording(tracing.SingleNodeRecording) - Event(ctx, "a") - sp.Finish() - if err := tracing.TestingCheckRecordedSpans(sp.GetRecording(), ` - Span s: - tags: ambient= - event: [ambient] a - `); err != nil { - t.Fatal(err) - } + require.Equal(t, sp, tracing.SpanFromContext(ctx)) + require.NotNil(t, sp) + require.True(t, sp.IsBlackHole()) } func TestAnnotateCtxNodeStoreReplica(t *testing.T) { diff --git a/pkg/util/log/trace_test.go b/pkg/util/log/trace_test.go index 7c804d56a1aa..d4d2de9b096c 100644 --- a/pkg/util/log/trace_test.go +++ b/pkg/util/log/trace_test.go @@ -65,8 +65,7 @@ func TestTrace(t *testing.T) { Event(ctx, "should-not-show-up") tracer := tracing.NewTracer() - tracer.SetForceRealSpans(true) - sp := tracer.StartSpan("s") + sp := tracer.StartRootSpan("s", nil /* logTags */, tracing.RecordableSpan) sp.StartRecording(tracing.SingleNodeRecording) ctxWithSpan := tracing.ContextWithSpan(ctx, sp) Event(ctxWithSpan, "test1") @@ -95,8 +94,7 @@ func TestTraceWithTags(t *testing.T) { ctx = logtags.AddTag(ctx, "tag", 1) tracer := tracing.NewTracer() - tracer.SetForceRealSpans(true) - sp := tracer.StartSpan("s") + sp := tracer.StartRootSpan("s", nil /* logTags */, tracing.RecordableSpan) ctxWithSpan := tracing.ContextWithSpan(ctx, sp) sp.StartRecording(tracing.SingleNodeRecording) @@ -182,8 +180,7 @@ func TestEventLogAndTrace(t *testing.T) { VErrEvent(ctxWithEventLog, noLogV(), "testerr") tracer := tracing.NewTracer() - tracer.SetForceRealSpans(true) - sp := tracer.StartSpan("s") + sp := tracer.StartRootSpan("s", nil /* logTags */, tracing.RecordableSpan) sp.StartRecording(tracing.SingleNodeRecording) ctxWithBoth := tracing.ContextWithSpan(ctxWithEventLog, sp) // Events should only go to the trace. diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index b2520f53af3b..07f6c3f79254 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -88,12 +88,6 @@ type Tracer struct { // x/net/trace or lightstep and we are not recording. noopSpan *Span - // If forceRealSpans is set, this Tracer will always create real spans (never - // noopSpans), regardless of the recording or lightstep configuration. Used - // by tests for situations when they need to indirectly create spans and don't - // have the option of passing the Recordable option to their constructor. - forceRealSpans bool - // True if tracing to the debug/requests endpoint. Accessed via t.useNetTrace(). _useNetTrace int32 // updated atomically @@ -145,14 +139,6 @@ func (t *Tracer) Close() { t.setShadowTracer(nil, nil) } -// SetForceRealSpans sets forceRealSpans option to v and returns the previous -// value. -func (t *Tracer) SetForceRealSpans(v bool) bool { - prevVal := t.forceRealSpans - t.forceRealSpans = v - return prevVal -} - func (t *Tracer) setShadowTracer(manager shadowTracerManager, tr opentracing.Tracer) { var shadow *shadowTracer if manager != nil { @@ -270,7 +256,7 @@ const ( // context. func (t *Tracer) AlwaysTrace() bool { shadowTracer := t.getShadowTracer() - return t.useNetTrace() || shadowTracer != nil || t.forceRealSpans + return t.useNetTrace() || shadowTracer != nil } // StartRootSpan creates a root Span. This is functionally equivalent to: From b25946e59cdaa54a01781cca2553a60a581773ef Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 28 Oct 2020 13:06:50 +0100 Subject: [PATCH 5/5] tracing: improve comment on otSpan Release note: None --- pkg/util/log/BUILD.bazel | 1 + pkg/util/tracing/BUILD.bazel | 1 + pkg/util/tracing/span.go | 11 ++++++++--- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel index c835d17566da..3fa097003eae 100644 --- a/pkg/util/log/BUILD.bazel +++ b/pkg/util/log/BUILD.bazel @@ -145,6 +145,7 @@ go_test( "//vendor/github.com/kr/pretty", "//vendor/github.com/pmezard/go-difflib/difflib", "//vendor/github.com/stretchr/testify/assert", + "//vendor/github.com/stretchr/testify/require", "//vendor/golang.org/x/net/trace", ] + select({ "@io_bazel_rules_go//go/platform:aix": [ diff --git a/pkg/util/tracing/BUILD.bazel b/pkg/util/tracing/BUILD.bazel index 1e8fca620df2..c181e61562b2 100644 --- a/pkg/util/tracing/BUILD.bazel +++ b/pkg/util/tracing/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "annotate.go", "annotate_nocgo.go", "grpc_interceptor.go", + "recording.go", "shadow.go", "span.go", "tags.go", diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index aad1cc2595d0..760b299112f9 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -145,10 +145,15 @@ func (s *crdbSpan) isRecording() bool { return s != nil && atomic.LoadInt32(&s.recording) != 0 } +// otSpan is a span for an external opentracing compatible tracer +// such as lightstep, zipkin, jaeger, etc. type otSpan struct { - // TODO(tbg): see if we can lose the shadowTr here and rely on shadowSpan.Tracer(). - // Probably not - but worth checking. - // TODO(tbg): consider renaming 'shadow' -> 'ot' or 'external'. + // shadowTr is the shadowTracer this span was created from. We need + // to hold on to it separately because shadowSpan.Tracer() returns + // the wrapper tracer and we lose the ability to find out + // what tracer it is. This is important when deriving children from + // this span, as we want to avoid mixing different tracers, which + // would otherwise be the result of cluster settings changed. shadowTr *shadowTracer shadowSpan opentracing.Span }