Skip to content

Commit

Permalink
Merge #107114
Browse files Browse the repository at this point in the history
107114: sql: collect cluster-wide traces as part of job execution details r=stevendanna a=adityamaru

This change includes the collection of cluster-wide, active tracing
spans to a job's execution details. The traces are stored in a zip
with a text and jaegar json file per node, that contain the active
tracing spans associated with the current execution of the job.

Once #106879
and #107210
merge a trace.zip will be generated everytime a user requests
new execution details from the job details page. This zip will
be downloadable from the job details page itself.

Informs: #102794
Release note: None

Co-authored-by: adityamaru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Jul 25, 2023
2 parents 3b2c035 + 8fc74f2 commit 451d761
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 6 deletions.
26 changes: 26 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/gogo/protobuf/jsonpb"
Expand Down Expand Up @@ -1121,3 +1122,28 @@ func FormatRetriableExecutionErrorLogToStringArray(
}
return arr
}

// GetJobTraceID returns the current trace ID of the job from the job progress.
func GetJobTraceID(ctx context.Context, db isql.DB, jobID jobspb.JobID) (tracingpb.TraceID, error) {
var traceID tracingpb.TraceID
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
jobInfo := InfoStorageForJob(txn, jobID)
progressBytes, exists, err := jobInfo.GetLegacyProgress(ctx)
if err != nil {
return err
}
if !exists {
return errors.New("progress not found")
}
var progress jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &progress); err != nil {
return errors.Wrap(err, "failed to unmarshal progress bytes")
}
traceID = progress.TraceID
return nil
}); err != nil {
return 0, errors.Wrapf(err, "failed to fetch trace ID for job %d", jobID)
}

return traceID, nil
}
2 changes: 2 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ go_library(
"//pkg/util/tracing",
"//pkg/util/tracing/collector",
"//pkg/util/tracing/tracingpb",
"//pkg/util/tracing/zipper",
"//pkg/util/tsearch",
"//pkg/util/uint128",
"//pkg/util/uuid",
Expand Down Expand Up @@ -904,6 +905,7 @@ go_test(
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgtype//:pgtype",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_klauspost_compress//zip",
"@com_github_lib_pq//:pq",
"@com_github_lib_pq//oid",
"@com_github_petermattis_goid//:goid",
Expand Down
32 changes: 28 additions & 4 deletions pkg/sql/jobs_profiler_execution_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/zipper"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -196,6 +197,7 @@ func (p *planner) RequestExecutionDetailFiles(ctx context.Context, jobID jobspb.
// parallelize the collection of the various pieces.
e.addDistSQLDiagram(ctx)
e.addLabelledGoroutines(ctx)
e.addClusterWideTraces(ctx)

return nil
}
Expand Down Expand Up @@ -230,12 +232,12 @@ func (e *executionDetailsBuilder) addLabelledGoroutines(ctx context.Context) {
}
resp, err := e.srv.Profile(ctx, &profileRequest)
if err != nil {
log.Errorf(ctx, "failed to collect goroutines for job %d: %+v", e.jobID, err.Error())
log.Errorf(ctx, "failed to collect goroutines for job %d: %v", e.jobID, err.Error())
return
}
filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00"))
if err := jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, e.db, e.jobID); err != nil {
log.Errorf(ctx, "failed to write goroutine for job %d: %+v", e.jobID, err.Error())
log.Errorf(ctx, "failed to write goroutine for job %d: %v", e.jobID, err.Error())
}
}

Expand All @@ -245,7 +247,7 @@ func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) {
row, err := e.db.Executor().QueryRowEx(ctx, "profiler-bundler-add-diagram", nil, /* txn */
sessiondata.NoSessionDataOverride, query, e.jobID)
if err != nil {
log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error())
log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error())
return
}
if row != nil && row[0] != tree.DNull {
Expand All @@ -254,7 +256,29 @@ func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) {
if err := jobs.WriteExecutionDetailFile(ctx, filename,
[]byte(fmt.Sprintf(`<meta http-equiv="Refresh" content="0; url=%s">`, dspDiagramURL)),
e.db, e.jobID); err != nil {
log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error())
log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error())
}
}
}

// addClusterWideTraces generates and persists a `trace.<timestamp>.zip` file
// that captures the active tracing spans of a job on all nodes in the cluster.
func (e *executionDetailsBuilder) addClusterWideTraces(ctx context.Context) {
z := zipper.MakeInternalExecutorInflightTraceZipper(e.db.Executor())

traceID, err := jobs.GetJobTraceID(ctx, e.db, e.jobID)
if err != nil {
log.Warningf(ctx, "failed to fetch job trace ID: %+v", err.Error())
return
}
zippedTrace, err := z.Zip(ctx, int64(traceID))
if err != nil {
log.Errorf(ctx, "failed to collect cluster wide traces for job %d: %v", e.jobID, err.Error())
return
}

filename := fmt.Sprintf("trace.%s.zip", timeutil.Now().Format("20060102_150405.00"))
if err := jobs.WriteExecutionDetailFile(ctx, filename, zippedTrace, e.db, e.jobID); err != nil {
log.Errorf(ctx, "failed to write traces for job %d: %v", e.jobID, err.Error())
}
}
84 changes: 82 additions & 2 deletions pkg/sql/jobs_profiler_execution_details_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
"github.com/klauspost/compress/zip"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -224,6 +227,80 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) {
t.Run("execution details for invalid job ID", func(t *testing.T) {
runner.ExpectErr(t, `job -123 not found; cannot request execution details`, `SELECT crdb_internal.request_job_execution_details(-123)`)
})

t.Run("read/write terminal trace", func(t *testing.T) {
jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return fakeExecResumer{
OnResume: func(ctx context.Context) error {
sp := tracing.SpanFromContext(ctx)
require.NotNil(t, sp)
sp.RecordStructured(&types.StringValue{Value: "should see this"})
return nil
},
}
}, jobs.UsesTenantCostControl)
var importJobID int
runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID)
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
trace := checkExecutionDetails(t, s, jobspb.JobID(importJobID), "resumer-trace")
require.Contains(t, string(trace), "should see this")
})

t.Run("read/write active trace", func(t *testing.T) {
blockCh := make(chan struct{})
continueCh := make(chan struct{})
defer close(blockCh)
defer close(continueCh)
jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return fakeExecResumer{
OnResume: func(ctx context.Context) error {
_, childSp := tracing.ChildSpan(ctx, "child")
defer childSp.Finish()
blockCh <- struct{}{}
<-continueCh
return nil
},
}
}, jobs.UsesTenantCostControl)
var importJobID int
runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID)
<-blockCh
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
activeTraces := checkExecutionDetails(t, s, jobspb.JobID(importJobID), "trace")
continueCh <- struct{}{}
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
unzip, err := zip.NewReader(bytes.NewReader(activeTraces), int64(len(activeTraces)))
require.NoError(t, err)

// Make sure the bundle contains the expected list of files.
var files []string
for _, f := range unzip.File {
if f.UncompressedSize64 == 0 {
t.Fatalf("file %s is empty", f.Name)
}
files = append(files, f.Name)

r, err := f.Open()
if err != nil {
t.Fatal(err)
}
defer r.Close()
bytes, err := io.ReadAll(r)
if err != nil {
t.Fatal(err)
}
contents := string(bytes)

// Verify some contents in the active traces.
if strings.Contains(f.Name, ".txt") {
require.Regexp(t, "[child: {count: 1, duration.*, unfinished}]", contents)
} else if strings.Contains(f.Name, ".json") {
require.True(t, strings.Contains(contents, "\"operationName\": \"child\""))
}
}
require.Equal(t, []string{"node1-trace.txt", "node1-jaeger.json"}, files)
})
}

func TestListProfilerExecutionDetails(t *testing.T) {
Expand Down Expand Up @@ -272,10 +349,11 @@ func TestListProfilerExecutionDetails(t *testing.T) {

runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
files := listExecutionDetails(t, s, jobspb.JobID(importJobID))
require.Len(t, files, 3)
require.Len(t, files, 4)
require.Regexp(t, "distsql\\..*\\.html", files[0])
require.Regexp(t, "goroutines\\..*\\.txt", files[1])
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[2])
require.Regexp(t, "trace\\..*\\.zip", files[3])

// Resume the job, so it can write another DistSQL diagram and goroutine
// snapshot.
Expand All @@ -285,13 +363,15 @@ func TestListProfilerExecutionDetails(t *testing.T) {
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
files = listExecutionDetails(t, s, jobspb.JobID(importJobID))
require.Len(t, files, 6)
require.Len(t, files, 8)
require.Regexp(t, "distsql\\..*\\.html", files[0])
require.Regexp(t, "distsql\\..*\\.html", files[1])
require.Regexp(t, "goroutines\\..*\\.txt", files[2])
require.Regexp(t, "goroutines\\..*\\.txt", files[3])
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[4])
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[5])
require.Regexp(t, "trace\\..*\\.zip", files[6])
require.Regexp(t, "trace\\..*\\.zip", files[7])
})
}

Expand Down

0 comments on commit 451d761

Please sign in to comment.