From a0d6c19053deca18646c8a5af30130596402a28e Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 14 Feb 2023 10:27:29 -0500 Subject: [PATCH] jobs: Extend job metric poller to collect PTS stats Prior PR https://github.com/cockroachdb/cockroach/pull/89752 added a metrics poller job which produces per job type stats on the number of paused jobs. This PR extends metrics poller to also collect stats related to protected timestamps created by jobs. Namely, two new metrics, per job type are added: * `jobs..protected_record_count` -- keeps track of the number of protected timestamp records help by the jobs. * `jobs..protected_age_sec` -- keeps track of the age of the oldest protected timestamp held by those jobs. The metrics improve observability into protected timestamp system, and allow operators to alert when protected timestamp records are too old since that prevents garbage collection from occuring (and if GC is not performed for too long, the cluster performance would degrade). Follow on work will also make this functionality available for schedules. Epic: CRDB-21953 Fixes #78354 Release note (enterprise change): Jobs that utilize protected timestamp system (BACKUP, CHANGEFEED, IMPORT, etc) now produce metrics that can be monitored to detect cases when job leaves stale protected timestamp, preventing garbage collection from occuring. --- pkg/BUILD.bazel | 2 + pkg/jobs/BUILD.bazel | 2 + pkg/jobs/config.go | 2 +- pkg/jobs/jobs_test.go | 65 +++++++- pkg/jobs/jobsprotectedts/jobs_protected_ts.go | 7 +- pkg/jobs/metrics.go | 25 +++ pkg/jobs/metricspoller/BUILD.bazel | 33 ++++ pkg/jobs/metricspoller/job_statistics.go | 146 ++++++++++++++++++ pkg/jobs/metricspoller/poller.go | 110 +++++++++++++ pkg/jobs/registry.go | 77 --------- pkg/sql/BUILD.bazel | 1 - pkg/sql/job_statistics.go | 50 ------ pkg/ts/catalog/chart_catalog.go | 96 ++++-------- pkg/upgrade/upgrades/BUILD.bazel | 1 + .../create_jobs_metrics_polling_job.go | 1 + 15 files changed, 413 insertions(+), 205 deletions(-) create mode 100644 pkg/jobs/metricspoller/BUILD.bazel create mode 100644 pkg/jobs/metricspoller/job_statistics.go create mode 100644 pkg/jobs/metricspoller/poller.go delete mode 100644 pkg/sql/job_statistics.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 972889f84109..c05384bd4ec3 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1145,6 +1145,7 @@ GO_TARGETS = [ "//pkg/jobs/jobsprotectedts:jobsprotectedts", "//pkg/jobs/jobsprotectedts:jobsprotectedts_test", "//pkg/jobs/jobstest:jobstest", + "//pkg/jobs/metricspoller:metricspoller", "//pkg/jobs:jobs", "//pkg/jobs:jobs_test", "//pkg/keys:keys", @@ -2597,6 +2598,7 @@ GET_X_DATA_TARGETS = [ "//pkg/jobs/jobspb:get_x_data", "//pkg/jobs/jobsprotectedts:get_x_data", "//pkg/jobs/jobstest:get_x_data", + "//pkg/jobs/metricspoller:get_x_data", "//pkg/keys:get_x_data", "//pkg/keysbase:get_x_data", "//pkg/keyvisualizer:get_x_data", diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index 266f30446c9e..745af871c14f 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -104,12 +104,14 @@ go_test( "//pkg/base", "//pkg/clusterversion", "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprotectedts", "//pkg/jobs/jobstest", "//pkg/keys", "//pkg/keyvisualizer", "//pkg/kv", "//pkg/kv/kvpb", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/security/securityassets", diff --git a/pkg/jobs/config.go b/pkg/jobs/config.go index d178f0abcbec..a4a5bbb42636 100644 --- a/pkg/jobs/config.go +++ b/pkg/jobs/config.go @@ -74,7 +74,7 @@ const ( // defaultPollForMetricsInterval is the default interval to poll the jobs // table for metrics. - defaultPollForMetricsInterval = 10 * time.Second + defaultPollForMetricsInterval = 30 * time.Second ) var ( diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 0672b68f4bdc..f645c5d3eed7 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -32,9 +32,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/keyvisualizer" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" @@ -61,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "github.com/gogo/protobuf/types" @@ -3495,28 +3498,33 @@ func TestPausepoints(t *testing.T) { } } -func TestPausedMetrics(t *testing.T) { +func TestJobTypeMetrics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) skip.UnderShort(t) ctx := context.Background() - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + // Make sure we set polling interval before we start the server. Otherwise, we + // might pick up the default value (30 second), which would make this test + // slow. + args := base.TestServerArgs{ Knobs: base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), }, - }) + Settings: cluster.MakeTestingClusterSettings(), + } + jobs.PollJobsMetricsInterval.Override(ctx, &args.Settings.SV, 10*time.Millisecond) + s, sqlDB, _ := serverutils.StartServer(t, args) defer s.Stopper().Stop(ctx) - jobs.PollJobsMetricsInterval.Override(ctx, &s.ClusterSettings().SV, 10*time.Millisecond) runner := sqlutils.MakeSQLRunner(sqlDB) reg := s.JobRegistry().(*jobs.Registry) waitForPausedCount := func(typ jobspb.Type, numPaused int64) { testutils.SucceedsSoon(t, func() error { currentlyPaused := reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value() - if reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value() != numPaused { + if currentlyPaused != numPaused { return fmt.Errorf( "expected (%+v) paused jobs of type (%+v), found (%+v)", numPaused, @@ -3545,6 +3553,36 @@ func TestPausedMetrics(t *testing.T) { Username: username.TestUserName(), }, } + + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + writePTSRecord := func(jobID jobspb.JobID) (uuid.UUID, error) { + id := uuid.MakeV4() + record := jobsprotectedts.MakeRecord( + id, int64(jobID), s.Clock().Now(), nil, + jobsprotectedts.Jobs, ptpb.MakeClusterTarget(), + ) + return id, + execCfg.InternalDB.Txn(context.Background(), func(ctx context.Context, txn isql.Txn) error { + return execCfg.ProtectedTimestampProvider.WithTxn(txn).Protect(context.Background(), record) + }) + } + relesePTSRecord := func(id uuid.UUID) error { + return execCfg.InternalDB.Txn(context.Background(), func(ctx context.Context, txn isql.Txn) error { + return execCfg.ProtectedTimestampProvider.WithTxn(txn).Release(context.Background(), id) + }) + } + + checkPTSCounts := func(typ jobspb.Type, count int64) { + testutils.SucceedsSoon(t, func() error { + m := reg.MetricsStruct().JobMetrics[typ] + if m.NumJobsWithPTS.Value() == count && (count == 0 || m.ProtectedAge.Value() > 0) { + return nil + } + return errors.Newf("still waiting for PTS count to reach %d: c=%d age=%d", + count, m.NumJobsWithPTS.Value(), m.ProtectedAge.Value()) + }) + } + for typ := range typeToRecord { jobs.RegisterConstructor(typ, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ @@ -3571,6 +3609,17 @@ func TestPausedMetrics(t *testing.T) { importJob := makeJob(context.Background(), jobspb.TypeImport) scJob := makeJob(context.Background(), jobspb.TypeSchemaChange) + // Write few PTS records + cfJobPTSID, err := writePTSRecord(cfJob.ID()) + require.NoError(t, err) + _, err = writePTSRecord(cfJob.ID()) + require.NoError(t, err) + importJobPTSID, err := writePTSRecord(importJob.ID()) + require.NoError(t, err) + + checkPTSCounts(jobspb.TypeChangefeed, 2) + checkPTSCounts(jobspb.TypeImport, 1) + // Pause all job types. runner.Exec(t, "PAUSE JOB $1", cfJob.ID()) waitForPausedCount(jobspb.TypeChangefeed, 1) @@ -3581,6 +3630,12 @@ func TestPausedMetrics(t *testing.T) { runner.Exec(t, "PAUSE JOB $1", scJob.ID()) waitForPausedCount(jobspb.TypeSchemaChange, 1) + // Release some of the pts records. + require.NoError(t, relesePTSRecord(cfJobPTSID)) + require.NoError(t, relesePTSRecord(importJobPTSID)) + checkPTSCounts(jobspb.TypeChangefeed, 1) + checkPTSCounts(jobspb.TypeImport, 0) + // Resume / cancel jobs. runner.Exec(t, "RESUME JOB $1", cfJob.ID()) waitForPausedCount(jobspb.TypeChangefeed, 1) diff --git a/pkg/jobs/jobsprotectedts/jobs_protected_ts.go b/pkg/jobs/jobsprotectedts/jobs_protected_ts.go index 5192b18a35f5..dd84cabe9046 100644 --- a/pkg/jobs/jobsprotectedts/jobs_protected_ts.go +++ b/pkg/jobs/jobsprotectedts/jobs_protected_ts.go @@ -55,7 +55,7 @@ func MakeStatusFunc(jr *jobs.Registry, metaType MetaType) ptreconcile.StatusFunc switch metaType { case Jobs: return func(ctx context.Context, txn isql.Txn, meta []byte) (shouldRemove bool, _ error) { - jobID, err := decodeID(meta) + jobID, err := DecodeID(meta) if err != nil { return false, err } @@ -71,7 +71,7 @@ func MakeStatusFunc(jr *jobs.Registry, metaType MetaType) ptreconcile.StatusFunc } case Schedules: return func(ctx context.Context, txn isql.Txn, meta []byte) (shouldRemove bool, _ error) { - scheduleID, err := decodeID(meta) + scheduleID, err := DecodeID(meta) if err != nil { return false, err } @@ -114,7 +114,8 @@ func encodeID(id int64) []byte { return []byte(strconv.FormatInt(id, 10)) } -func decodeID(meta []byte) (id int64, err error) { +// DecodeID decodes ID stored in the PTS record. +func DecodeID(meta []byte) (id int64, err error) { id, err = strconv.ParseInt(string(meta), 10, 64) if err != nil { return 0, errors.Wrapf(err, "failed to interpret meta %q as bytes", meta) diff --git a/pkg/jobs/metrics.go b/pkg/jobs/metrics.go index 3a6aa1c0e217..dcb0e1fa2c74 100644 --- a/pkg/jobs/metrics.go +++ b/pkg/jobs/metrics.go @@ -67,6 +67,9 @@ type JobTypeMetrics struct { // TODO (sajjad): FailOrCancelFailed metric is not updated after the modification // of retrying all reverting jobs. Remove this metric in v22.1. FailOrCancelFailed *metric.Counter + + NumJobsWithPTS *metric.Gauge + ProtectedAge *metric.Gauge } // MetricStruct implements the metric.Struct interface. @@ -174,6 +177,26 @@ func makeMetaFailOrCancelFailed(typeStr string) metric.Metadata { } } +func makeMetaProtectedCount(typeStr string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("jobs.%s.protected_record_count", typeStr), + Help: fmt.Sprintf("Number of protected timestamp records held by %s jobs", typeStr), + Measurement: "bytes", + Unit: metric.Unit_BYTES, + MetricType: io_prometheus_client.MetricType_GAUGE, + } +} + +func makeMetaProtectedAge(typeStr string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("jobs.%s.protected_age_sec", typeStr), + Help: fmt.Sprintf("The age of the oldest PTS record protected by %s jobs", typeStr), + Measurement: "seconds", + Unit: metric.Unit_SECONDS, + MetricType: io_prometheus_client.MetricType_GAUGE, + } +} + var ( metaAdoptIterations = metric.Metadata{ Name: "jobs.adopt_iterations", @@ -244,6 +267,8 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) { FailOrCancelCompleted: metric.NewCounter(makeMetaFailOrCancelCompeted(typeStr)), FailOrCancelRetryError: metric.NewCounter(makeMetaFailOrCancelRetryError(typeStr)), FailOrCancelFailed: metric.NewCounter(makeMetaFailOrCancelFailed(typeStr)), + NumJobsWithPTS: metric.NewGauge(makeMetaProtectedCount(typeStr)), + ProtectedAge: metric.NewGauge(makeMetaProtectedAge(typeStr)), } if opts, ok := options[jt]; ok && opts.metrics != nil { m.JobSpecificMetrics[jt] = opts.metrics diff --git a/pkg/jobs/metricspoller/BUILD.bazel b/pkg/jobs/metricspoller/BUILD.bazel new file mode 100644 index 000000000000..f8f42658ae96 --- /dev/null +++ b/pkg/jobs/metricspoller/BUILD.bazel @@ -0,0 +1,33 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "metricspoller", + srcs = [ + "job_statistics.go", + "poller.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/jobs/metricspoller", + visibility = ["//visibility:public"], + deps = [ + "//pkg/jobs", + "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprotectedts", + "//pkg/roachpb", + "//pkg/security/username", + "//pkg/settings/cluster", + "//pkg/sql", + "//pkg/sql/isql", + "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/metric", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", + "@com_github_prometheus_client_model//go", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/jobs/metricspoller/job_statistics.go b/pkg/jobs/metricspoller/job_statistics.go new file mode 100644 index 000000000000..f912606a14ad --- /dev/null +++ b/pkg/jobs/metricspoller/job_statistics.go @@ -0,0 +1,146 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package metricspoller + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +const pausedJobsCountQuery = string(` + SELECT job_type, count(*) + FROM system.jobs + WHERE status = '` + jobs.StatusPaused + `' + GROUP BY job_type`) + +// updatePausedMetrics counts the number of paused jobs per job type. +func updatePausedMetrics(ctx context.Context, execCtx sql.JobExecContext) error { + var metricUpdates map[jobspb.Type]int + err := execCtx.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + // In case of transaction retries, reset this map here. + metricUpdates = make(map[jobspb.Type]int) + + // Run transaction at low priority to ensure that it does not + // contend with foreground reads. + if err := txn.KV().SetUserPriority(roachpb.MinUserPriority); err != nil { + return err + } + rows, err := txn.QueryBufferedEx( + ctx, "poll-jobs-metrics-job", txn.KV(), sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, + pausedJobsCountQuery, + ) + if err != nil { + return errors.Wrap(err, "could not query jobs table") + } + + for _, row := range rows { + typeString := *row[0].(*tree.DString) + count := *row[1].(*tree.DInt) + typ, err := jobspb.TypeFromString(string(typeString)) + if err != nil { + return err + } + metricUpdates[typ] = int(count) + } + + return nil + }) + if err != nil { + return err + } + + metrics := execCtx.ExecCfg().JobRegistry.MetricsStruct() + for _, v := range jobspb.Type_value { + if metrics.JobMetrics[v] != nil { + metrics.JobMetrics[v].CurrentlyPaused.Update(int64(metricUpdates[jobspb.Type(v)])) + } + } + return nil +} + +// updatePTSStats update protected timestamp statistics per job type. +func updatePTSStats(ctx context.Context, execCtx sql.JobExecContext) error { + type ptsStat struct { + numRecords int64 + oldest hlc.Timestamp + } + var ptsStats map[jobspb.Type]*ptsStat + + execCfg := execCtx.ExecCfg() + if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + ptsStats = make(map[jobspb.Type]*ptsStat) + ptsState, err := execCfg.ProtectedTimestampProvider.WithTxn(txn).GetState(ctx) + if err != nil { + return err + } + for _, rec := range ptsState.Records { + if rec.MetaType != jobsprotectedts.GetMetaType(jobsprotectedts.Jobs) { + continue + } + id, err := jobsprotectedts.DecodeID(rec.Meta) + if err != nil { + return err + } + j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(id), txn) + if err != nil { + continue + } + p := j.Payload() + stats := ptsStats[p.Type()] + if stats == nil { + stats = &ptsStat{} + ptsStats[p.Type()] = stats + } + stats.numRecords++ + if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) { + stats.oldest = rec.Timestamp + } + } + + return nil + }); err != nil { + return err + } + + jobMetrics := execCtx.ExecCfg().JobRegistry.MetricsStruct() + for typ := 0; typ < jobspb.NumJobTypes; typ++ { + if jobspb.Type(typ) == jobspb.TypeUnspecified { // do not track TypeUnspecified + continue + } + m := jobMetrics.JobMetrics[typ] + stats, found := ptsStats[jobspb.Type(typ)] + if found { + m.NumJobsWithPTS.Update(stats.numRecords) + if stats.oldest.WallTime > 0 { + m.ProtectedAge.Update((execCfg.Clock.Now().WallTime - stats.oldest.WallTime) / 1e9) + } else { + m.ProtectedAge.Update(0) + } + } else { + // If we haven't found PTS records for a job type, then reset stats. + m.NumJobsWithPTS.Update(0) + m.ProtectedAge.Update(0) + } + } + + return nil +} diff --git a/pkg/jobs/metricspoller/poller.go b/pkg/jobs/metricspoller/poller.go new file mode 100644 index 000000000000..186687f58607 --- /dev/null +++ b/pkg/jobs/metricspoller/poller.go @@ -0,0 +1,110 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package metricspoller + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/logtags" + io_prometheus_client "github.com/prometheus/client_model/go" +) + +// metricsPoller is a singleton job whose purpose is to poll various metrics +// periodically. These metrics are meant to be cluster wide metrics -- for +// example, number of jobs currently paused in the cluster. While such metrics +// could be implemented locally by each node, doing so would result in the +// metric being inflated by the number of nodes. That's not ideal, and that's +// what the purpose of this job is: namely, to provide a convenient way to query +// various aspects of cluster state, and make that state available via correctly +// counted metrics. + +type metricsPoller struct { + job *jobs.Job +} + +var _ jobs.Resumer = &metricsPoller{} + +// OnFailOrCancel is a part of the Resumer interface. +func (mp *metricsPoller) OnFailOrCancel( + ctx context.Context, execCtx interface{}, jobErr error, +) error { + return nil +} + +// Resume is part of the Resumer interface. +func (mp *metricsPoller) Resume(ctx context.Context, execCtx interface{}) error { + // The metrics polling job is a forever running background job. It's always + // safe to wind the SQL pod down whenever it's running, something we + // indicate through the job's idle status. + mp.job.MarkIdle(true) + + exec := execCtx.(sql.JobExecContext) + metrics := exec.ExecCfg().JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypePollJobsStats].(pollerMetrics) + + t := timeutil.NewTimer() + defer t.Stop() + + runTask := func(name string, task func(ctx context.Context, execCtx sql.JobExecContext) error) error { + ctx = logtags.AddTag(ctx, "task", name) + return task(ctx, exec) + } + + for { + t.Reset(jobs.PollJobsMetricsInterval.Get(&exec.ExecCfg().Settings.SV)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + t.Read = true + if err := runTask("paused-jobs", updatePausedMetrics); err != nil { + log.Errorf(ctx, "Periodic stats collector task paused-jobs completed with error %s", err) + metrics.numErrors.Inc(1) + } + if err := runTask("pts-stats", updatePTSStats); err != nil { + log.Errorf(ctx, "Periodic stats collector task pts-stats completed with error %s", err) + metrics.numErrors.Inc(1) + } + } + } +} + +type pollerMetrics struct { + numErrors *metric.Counter +} + +func (m pollerMetrics) MetricStruct() {} + +func newPollerMetrics() metric.Struct { + return pollerMetrics{ + numErrors: metric.NewCounter(metric.Metadata{ + Name: "jobs.metrics.task_failed", + Help: "Number of metrics poller tasks that failed", + Measurement: "errors", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_COUNTER, + }), + } +} + +func init() { + createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + return &metricsPoller{job: job} + } + jobs.RegisterConstructor(jobspb.TypePollJobsStats, createResumerFn, + jobs.DisablesTenantCostControl, jobs.WithJobMetrics(newPollerMetrics())) +} diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 52eea1881eed..3062077873ac 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1148,83 +1148,6 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { }) } -// PollMetricsTask polls the jobs table for certain metrics at an interval. -func (r *Registry) PollMetricsTask(ctx context.Context) error { - var err error - t := timeutil.NewTimer() - defer t.Stop() - updateMetrics := func(ctx context.Context, s sqlliveness.Session) { - for { - t.Reset(PollJobsMetricsInterval.Get(&r.settings.SV)) - select { - case <-ctx.Done(): - err = ctx.Err() - return - case <-t.C: - t.Read = true - if err = r.updatePausedMetrics(ctx, s); err != nil { - log.Errorf(ctx, "failed to update paused metrics: %v", err) - return - } - } - } - } - r.withSession(ctx, updateMetrics) - return err -} - -const pausedJobsCountQuery = string(` - SELECT job_type, count(*) - FROM system.jobs - WHERE status = '` + StatusPaused + `' - GROUP BY job_type`) - -func (r *Registry) updatePausedMetrics(ctx context.Context, s sqlliveness.Session) error { - var metricUpdates map[jobspb.Type]int - err := r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - // In case of transaction retries, reset this map here. - metricUpdates = make(map[jobspb.Type]int) - - // Run the claim transaction at low priority to ensure that it does not - // contend with foreground reads. - if err := txn.KV().SetUserPriority(roachpb.MinUserPriority); err != nil { - return err - } - rows, err := txn.QueryBufferedEx( - ctx, "poll-jobs-metrics-job", txn.KV(), sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, - pausedJobsCountQuery, - ) - if err != nil { - return errors.Wrap(err, "could not query jobs table") - } - - for _, row := range rows { - typeString := *row[0].(*tree.DString) - count := *row[1].(*tree.DInt) - typ, err := jobspb.TypeFromString(string(typeString)) - if err != nil { - return err - } - metricUpdates[typ] = int(count) - } - - return nil - }) - if err == nil { - for _, v := range jobspb.Type_value { - if r.metrics.JobMetrics[v] != nil { - if _, ok := metricUpdates[jobspb.Type(v)]; ok { - r.metrics.JobMetrics[v].CurrentlyPaused.Update(int64(metricUpdates[jobspb.Type(v)])) - } else { - r.metrics.JobMetrics[v].CurrentlyPaused.Update(0) - } - } - } - } - - return err -} - func (r *Registry) maybeCancelJobs(ctx context.Context, s sqlliveness.Session) { r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 489a4dfb6c95..e09527891b7a 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -138,7 +138,6 @@ go_library( "inverted_join.go", "job_exec_context.go", "job_exec_context_test_util.go", - "job_statistics.go", "jobs_collection.go", "join.go", "join_predicate.go", diff --git a/pkg/sql/job_statistics.go b/pkg/sql/job_statistics.go deleted file mode 100644 index 436e92c4a9bd..000000000000 --- a/pkg/sql/job_statistics.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2023 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package sql - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" -) - -type metricsPoller struct { - job *jobs.Job -} - -var _ jobs.Resumer = &metricsPoller{} - -// OnFailOrCancel is a part of the Resumer interface. -func (mp *metricsPoller) OnFailOrCancel( - ctx context.Context, execCtx interface{}, jobErr error, -) error { - return nil -} - -// Resume is part of the Resumer interface. -func (mp *metricsPoller) Resume(ctx context.Context, execCtx interface{}) error { - // The metrics polling job is a forever running background job. It's always - // safe to wind the SQL pod down whenever it's running, something we - // indicate through the job's idle status. - mp.job.MarkIdle(true) - - exec := execCtx.(JobExecContext) - return exec.ExecCfg().JobRegistry.PollMetricsTask(ctx) -} - -func init() { - createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { - return &metricsPoller{job: job} - } - jobs.RegisterConstructor(jobspb.TypePollJobsStats, createResumerFn, jobs.UsesTenantCostControl) -} diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 67dfa14e08d9..ceab4da6dcfe 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -10,6 +10,13 @@ package catalog +import ( + "fmt" + "strings" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" +) + // chart_catalog.go represents a catalog of pre-defined DB Console charts // to aid users in debugging CockroachDB clusters. This file represents // a simplified structure of the catalog, meant to make it easier for @@ -3383,74 +3390,11 @@ var charts = []sectionDescription{ "jobs.running_non_idle", }, }, - { - Title: "Currently Running", - Metrics: []string{ - "jobs.auto_create_stats.currently_running", - "jobs.backup.currently_running", - "jobs.changefeed.currently_running", - "jobs.create_stats.currently_running", - "jobs.import.currently_running", - "jobs.restore.currently_running", - "jobs.schema_change.currently_running", - "jobs.new_schema_change.currently_running", - "jobs.schema_change_gc.currently_running", - "jobs.typedesc_schema_change.currently_running", - "jobs.stream_ingestion.currently_running", - "jobs.migration.currently_running", - "jobs.auto_span_config_reconciliation.currently_running", - "jobs.auto_sql_stats_compaction.currently_running", - "jobs.stream_replication.currently_running", - "jobs.key_visualizer.currently_running", - "jobs.poll_jobs_stats.currently_running", - }, - }, - { - Title: "Currently Idle", - Metrics: []string{ - "jobs.auto_create_stats.currently_idle", - "jobs.auto_span_config_reconciliation.currently_idle", - "jobs.auto_sql_stats_compaction.currently_idle", - "jobs.backup.currently_idle", - "jobs.changefeed.currently_idle", - "jobs.create_stats.currently_idle", - "jobs.import.currently_idle", - "jobs.migration.currently_idle", - "jobs.new_schema_change.currently_idle", - "jobs.restore.currently_idle", - "jobs.schema_change.currently_idle", - "jobs.schema_change_gc.currently_idle", - "jobs.stream_ingestion.currently_idle", - "jobs.stream_replication.currently_idle", - "jobs.typedesc_schema_change.currently_idle", - "jobs.key_visualizer.currently_idle", - "jobs.poll_jobs_stats.currently_idle", - }, - }, - { - Title: "Currently Paused", - Metrics: []string{ - "jobs.auto_create_stats.currently_paused", - "jobs.auto_span_config_reconciliation.currently_paused", - "jobs.auto_sql_stats_compaction.currently_paused", - "jobs.backup.currently_paused", - "jobs.changefeed.currently_paused", - "jobs.create_stats.currently_paused", - "jobs.import.currently_paused", - "jobs.migration.currently_paused", - "jobs.new_schema_change.currently_paused", - "jobs.restore.currently_paused", - "jobs.schema_change.currently_paused", - "jobs.schema_change_gc.currently_paused", - "jobs.stream_ingestion.currently_paused", - "jobs.stream_replication.currently_paused", - "jobs.typedesc_schema_change.currently_paused", - "jobs.auto_schema_telemetry.currently_paused", - "jobs.row_level_ttl.currently_paused", - "jobs.poll_jobs_stats.currently_paused", - "jobs.key_visualizer.currently_paused", - }, - }, + jobTypeCharts("Currently Running", "currently_running"), + jobTypeCharts("Currently Idle", "currently_idle"), + jobTypeCharts("Currently Paused", "currently_paused"), + jobTypeCharts("PTS Age", "protected_age_sec"), + jobTypeCharts("PTS Record Count", "protected_record_count"), { Title: "Auto Create Stats", Metrics: []string{ @@ -3941,3 +3885,19 @@ var charts = []sectionDescription{ }, }, } + +func jobTypeCharts(title string, varName string) chartDescription { + var metrics []string + for i := 0; i < jobspb.NumJobTypes; i++ { + jt := jobspb.Type(i) + if jt == jobspb.TypeUnspecified { + continue + } + metrics = append(metrics, + fmt.Sprintf("jobs.%s.%s", strings.ToLower(jobspb.Type_name[int32(i)]), varName)) + } + return chartDescription{ + Title: title, + Metrics: metrics, + } +} diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index f39274e13dc2..442805783691 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/jobs/metricspoller", "//pkg/keys", "//pkg/keyvisualizer/keyvisjob", "//pkg/kv", diff --git a/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go index 332b795fd602..83d5cc2ecca8 100644 --- a/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go +++ b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + _ "github.com/cockroachdb/cockroach/pkg/jobs/metricspoller" // Ensure job implementation is linked. "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"