From 886dc6447369ec49991a59416780517971600f94 Mon Sep 17 00:00:00 2001 From: Chris Seto Date: Mon, 14 Aug 2023 17:25:42 +0000 Subject: [PATCH] schematelemetry: emit metrics and logs about invalid objects MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Short of continuously polling `crdb_internal.invalid_objects`, there was not a convenient way to monitor a cluster for descriptor corruption. Having such an indicator would allow customers to perform preflight checks ahead of upgrades to avoid being stuck in a mixed version state. It would also allow CRL to more easily monitor cloud clusters for corruptions in the wild. This commit updates the schematelemetry job to additionally update the `sql.schema.invalid_objects` gauge and emit logs for any encountered corruptions. Informs: #104266 Epic: CRDB-28665 Release note (ops change): Added a new sql.schema.invalid_objects gauge metric. This gauge is periodically updated based on the schedule set by the sql.schema.telemetry.recurrence cluster setting. When it is updated, it counts the number of schema objects (tables, types, schemas, databases, and functions) that are in an invalid state according to CockroachDB’s internal validation checks. This metric is expected to be zero in a healthy cluster, and if it is not, it indicates that there is a problem that must be repaired. --- pkg/sql/catalog/schematelemetry/BUILD.bazel | 3 + .../schematelemetry/scheduled_job_executor.go | 10 +- .../schematelemetry/schema_telemetry_job.go | 91 ++++++++++++++++++- .../schematelemetry/schema_telemetry_test.go | 77 ++++++++++++++++ pkg/ts/catalog/chart_catalog.go | 4 + 5 files changed, 179 insertions(+), 6 deletions(-) diff --git a/pkg/sql/catalog/schematelemetry/BUILD.bazel b/pkg/sql/catalog/schematelemetry/BUILD.bazel index 09275272a093..c40b5af72002 100644 --- a/pkg/sql/catalog/schematelemetry/BUILD.bazel +++ b/pkg/sql/catalog/schematelemetry/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "//pkg/sql/sem/builtins", "//pkg/sql/sem/builtins/builtinconstants", "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", "//pkg/sql/sqltelemetry", "//pkg/util/hlc", "//pkg/util/log", @@ -46,8 +47,10 @@ go_test( ], args = ["-test.timeout=295s"], deps = [ + ":schematelemetry", "//pkg/base", "//pkg/jobs", + "//pkg/jobs/jobspb", "//pkg/jobs/jobstest", "//pkg/scheduledjobs", "//pkg/security/securityassets", diff --git a/pkg/sql/catalog/schematelemetry/scheduled_job_executor.go b/pkg/sql/catalog/schematelemetry/scheduled_job_executor.go index 4b937c646950..209048ffb863 100644 --- a/pkg/sql/catalog/schematelemetry/scheduled_job_executor.go +++ b/pkg/sql/catalog/schematelemetry/scheduled_job_executor.go @@ -27,20 +27,20 @@ import ( ) type schemaTelemetryExecutor struct { - metrics schemaTelemetryMetrics + metrics schemaTelemetryJobMetrics } var _ jobs.ScheduledJobController = (*schemaTelemetryExecutor)(nil) var _ jobs.ScheduledJobExecutor = (*schemaTelemetryExecutor)(nil) -type schemaTelemetryMetrics struct { +type schemaTelemetryJobMetrics struct { *jobs.ExecutorMetrics } -var _ metric.Struct = &schemaTelemetryMetrics{} +var _ metric.Struct = &schemaTelemetryJobMetrics{} // MetricStruct is part of the metric.Struct interface. -func (m *schemaTelemetryMetrics) MetricStruct() {} +func (m *schemaTelemetryJobMetrics) MetricStruct() {} // OnDrop is part of the jobs.ScheduledJobController interface. func (s schemaTelemetryExecutor) OnDrop( @@ -120,7 +120,7 @@ func init() { func() (jobs.ScheduledJobExecutor, error) { m := jobs.MakeExecutorMetrics(tree.ScheduledSchemaTelemetryExecutor.InternalName()) return &schemaTelemetryExecutor{ - metrics: schemaTelemetryMetrics{ + metrics: schemaTelemetryJobMetrics{ ExecutorMetrics: &m, }, }, nil diff --git a/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go b/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go index 4b59bad1bb10..33584a275174 100644 --- a/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go +++ b/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go @@ -18,12 +18,38 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" ) +type Metrics struct { + InvalidObjects *metric.Gauge +} + +func newMetrics() Metrics { + return Metrics{ + InvalidObjects: metric.NewGauge(metric.Metadata{ + Name: "sql.schema.invalid_objects", + Help: "Gauge of detected invalid objects within the system.descriptor table (measured by querying crdb_internal.invalid_objects)", + Measurement: "Objects", + Unit: metric.Unit_COUNT, + }), + } +} + +// MetricStruct implements the metric.Struct interface. +func (Metrics) MetricStruct() {} + type schemaTelemetryResumer struct { job *jobs.Job st *cluster.Settings @@ -53,8 +79,15 @@ func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{}) aostDuration = d } } - asOf := p.ExecCfg().Clock.Now().Add(aostDuration.Nanoseconds(), 0) + const maxRecords = 10000 + asOf := p.ExecCfg().Clock.Now().Add(aostDuration.Nanoseconds(), 0) + metrics := p.ExecCfg().JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeAutoSchemaTelemetry].(Metrics) + + if err := processInvalidObjects(ctx, p.ExecCfg(), asOf, &metrics, maxRecords); err != nil { + return err + } + events, err := CollectClusterSchemaForTelemetry(ctx, p.ExecCfg(), asOf, uuid.FastMakeV4(), maxRecords) if err != nil || len(events) == 0 { return err @@ -66,9 +99,64 @@ func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{}) sql.LogExternally, events..., ) + return nil } +func processInvalidObjects( + ctx context.Context, + cfg *sql.ExecutorConfig, + asOf hlc.Timestamp, + metrics *Metrics, + maxRecords int, +) error { + return sql.DescsTxn(ctx, cfg, func(ctx context.Context, txn isql.Txn, col *descs.Collection) (retErr error) { + err := txn.KV().SetFixedTimestamp(ctx, asOf) + if err != nil { + return err + } + + rows, err := txn.QueryIteratorEx(ctx, "sql-telemetry-invalid-objects", txn.KV(), sessiondata.NodeUserSessionDataOverride, `SELECT id, error FROM "".crdb_internal.invalid_objects LIMIT $1`, maxRecords) + if err != nil { + return err + } + + defer func(it isql.Rows) { + retErr = errors.CombineErrors(retErr, it.Close()) + }(rows) + + count := int64(0) + for { + ok, err := rows.Next(ctx) + if err != nil { + return err + } + if !ok { + break + } + + count++ + row := rows.Cur() + + descID, ok := row[0].(*tree.DInt) + if !ok { + return errors.AssertionFailedf("expected id to be int (was %T)", row[0]) + } + + validationErr, ok := row[1].(*tree.DString) + if !ok { + return errors.AssertionFailedf("expected err to be string (was %T)", row[1]) + } + + log.Warningf(ctx, "found invalid object with ID %d: %q", descID, validationErr) + } + + metrics.InvalidObjects.Update(count) + + return nil + }) +} + // OnFailOrCancel is part of the jobs.Resumer interface. func (t schemaTelemetryResumer) OnFailOrCancel( ctx context.Context, execCtx interface{}, _ error, @@ -86,5 +174,6 @@ func init() { } }, jobs.DisablesTenantCostControl, + jobs.WithJobMetrics(newMetrics()), ) } diff --git a/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go b/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go index 46d1b271c904..9e76afee3a27 100644 --- a/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go +++ b/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go @@ -13,14 +13,18 @@ package schematelemetry_test import ( "context" "fmt" + "math" + "regexp" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -106,7 +110,80 @@ func TestSchemaTelemetryJob(t *testing.T) { id := res[0][0] tdb.ExecSucceedsSoon(t, fmt.Sprintf("PAUSE SCHEDULE %s", id)) tdb.CheckQueryResults(t, qHasJob, [][]string{{"0"}}) + + // NB: The following block is copied directly from + // pkg/sql/crdb_internal_test.go. It may be worthwhile to add utilities for + // generating descriptor corruption in the future rather than copying the same codeblock around. + + // Create some tables that we can corrupt the descriptors of. + tdb.Exec(t, ` +CREATE DATABASE t; +CREATE TABLE t.test (k INT8); +CREATE TABLE fktbl (id INT8 PRIMARY KEY); +CREATE TABLE tbl ( + customer INT8 NOT NULL REFERENCES fktbl (id) +); +CREATE TABLE nojob (k INT8); + `) + + // Retrieve their IDs. + databaseID := int(sqlutils.QueryDatabaseID(t, db, "t")) + tableTID := int(sqlutils.QueryTableID(t, db, "t", "public", "test")) + tableFkTblID := int(sqlutils.QueryTableID(t, db, "defaultdb", "public", "fktbl")) + tableNoJobID := int(sqlutils.QueryTableID(t, db, "defaultdb", "public", "nojob")) + const fakeID = 12345 + + // Now introduce some inconsistencies. + tdb.Exec(t, fmt.Sprintf(` +INSERT INTO system.users VALUES ('node', NULL, true, 3); +GRANT node TO root; +DELETE FROM system.descriptor WHERE id = %d; +DELETE FROM system.descriptor WHERE id = %d; +SELECT + crdb_internal.unsafe_upsert_descriptor( + id, + crdb_internal.json_to_pb( + 'cockroach.sql.sqlbase.Descriptor', + json_set( + json_set( + crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', descriptor, false), + ARRAY['table', 'mutationJobs'], + jsonb_build_array(jsonb_build_object('job_id', 123456, 'mutation_id', 1)) + ), + ARRAY['table', 'mutations'], + jsonb_build_array(jsonb_build_object('mutation_id', 1)) + ) + ), + true + ) +FROM + system.descriptor +WHERE + id = %d; +UPDATE system.namespace SET id = %d WHERE id = %d; + `, databaseID, tableFkTblID, tableNoJobID, fakeID, tableTID)) + + // Grab a handle to the job's metrics struct. + metrics := s.JobRegistry().(*jobs.Registry).MetricsStruct().JobSpecificMetrics[jobspb.TypeAutoSchemaTelemetry].(schematelemetry.Metrics) + // Run a schema telemetry job and wait for it to succeed. tdb.Exec(t, qJob) tdb.CheckQueryResultsRetry(t, qHasJob, [][]string{{"1"}}) + + // Assert that the InvalidObjects gauge is set to the number of expected + // invalid object. Our above query should have generated 9 corruptions. See + // the pkg/sql/crdb_internal_test.go:TestInvalidObjects for the breakdown of + // what exactly was done. + require.Equal(t, int64(9), metrics.InvalidObjects.Value()) + + // Ensure that our logs are flushed to disk before asserting about log + // entries. + log.Flush() + + // Ensure that a log line is emitted for each invalid object, with a loose + // enforcement of the log structure. + errorRE := regexp.MustCompile(`found invalid object with ID \d+: ".+"`) + entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1000, errorRE, log.SelectEditMode(false, false)) + require.NoError(t, err) + require.Len(t, entries, 9) } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index b076c442a9c7..02e5e571903f 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -2291,6 +2291,10 @@ var charts = []sectionDescription{ }, AxisLabel: "Schema Change Executions", }, + { + Title: "Invalid Objects", + Metrics: []string{"sql.schema.invalid_objects"}, + }, }, }, {