diff --git a/pkg/sql/catalog/schematelemetry/BUILD.bazel b/pkg/sql/catalog/schematelemetry/BUILD.bazel index 09275272a093..c40b5af72002 100644 --- a/pkg/sql/catalog/schematelemetry/BUILD.bazel +++ b/pkg/sql/catalog/schematelemetry/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "//pkg/sql/sem/builtins", "//pkg/sql/sem/builtins/builtinconstants", "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", "//pkg/sql/sqltelemetry", "//pkg/util/hlc", "//pkg/util/log", @@ -46,8 +47,10 @@ go_test( ], args = ["-test.timeout=295s"], deps = [ + ":schematelemetry", "//pkg/base", "//pkg/jobs", + "//pkg/jobs/jobspb", "//pkg/jobs/jobstest", "//pkg/scheduledjobs", "//pkg/security/securityassets", diff --git a/pkg/sql/catalog/schematelemetry/scheduled_job_executor.go b/pkg/sql/catalog/schematelemetry/scheduled_job_executor.go index 4b937c646950..209048ffb863 100644 --- a/pkg/sql/catalog/schematelemetry/scheduled_job_executor.go +++ b/pkg/sql/catalog/schematelemetry/scheduled_job_executor.go @@ -27,20 +27,20 @@ import ( ) type schemaTelemetryExecutor struct { - metrics schemaTelemetryMetrics + metrics schemaTelemetryJobMetrics } var _ jobs.ScheduledJobController = (*schemaTelemetryExecutor)(nil) var _ jobs.ScheduledJobExecutor = (*schemaTelemetryExecutor)(nil) -type schemaTelemetryMetrics struct { +type schemaTelemetryJobMetrics struct { *jobs.ExecutorMetrics } -var _ metric.Struct = &schemaTelemetryMetrics{} +var _ metric.Struct = &schemaTelemetryJobMetrics{} // MetricStruct is part of the metric.Struct interface. -func (m *schemaTelemetryMetrics) MetricStruct() {} +func (m *schemaTelemetryJobMetrics) MetricStruct() {} // OnDrop is part of the jobs.ScheduledJobController interface. func (s schemaTelemetryExecutor) OnDrop( @@ -120,7 +120,7 @@ func init() { func() (jobs.ScheduledJobExecutor, error) { m := jobs.MakeExecutorMetrics(tree.ScheduledSchemaTelemetryExecutor.InternalName()) return &schemaTelemetryExecutor{ - metrics: schemaTelemetryMetrics{ + metrics: schemaTelemetryJobMetrics{ ExecutorMetrics: &m, }, }, nil diff --git a/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go b/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go index 4b59bad1bb10..33584a275174 100644 --- a/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go +++ b/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go @@ -18,12 +18,38 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" ) +type Metrics struct { + InvalidObjects *metric.Gauge +} + +func newMetrics() Metrics { + return Metrics{ + InvalidObjects: metric.NewGauge(metric.Metadata{ + Name: "sql.schema.invalid_objects", + Help: "Gauge of detected invalid objects within the system.descriptor table (measured by querying crdb_internal.invalid_objects)", + Measurement: "Objects", + Unit: metric.Unit_COUNT, + }), + } +} + +// MetricStruct implements the metric.Struct interface. +func (Metrics) MetricStruct() {} + type schemaTelemetryResumer struct { job *jobs.Job st *cluster.Settings @@ -53,8 +79,15 @@ func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{}) aostDuration = d } } - asOf := p.ExecCfg().Clock.Now().Add(aostDuration.Nanoseconds(), 0) + const maxRecords = 10000 + asOf := p.ExecCfg().Clock.Now().Add(aostDuration.Nanoseconds(), 0) + metrics := p.ExecCfg().JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeAutoSchemaTelemetry].(Metrics) + + if err := processInvalidObjects(ctx, p.ExecCfg(), asOf, &metrics, maxRecords); err != nil { + return err + } + events, err := CollectClusterSchemaForTelemetry(ctx, p.ExecCfg(), asOf, uuid.FastMakeV4(), maxRecords) if err != nil || len(events) == 0 { return err @@ -66,9 +99,64 @@ func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{}) sql.LogExternally, events..., ) + return nil } +func processInvalidObjects( + ctx context.Context, + cfg *sql.ExecutorConfig, + asOf hlc.Timestamp, + metrics *Metrics, + maxRecords int, +) error { + return sql.DescsTxn(ctx, cfg, func(ctx context.Context, txn isql.Txn, col *descs.Collection) (retErr error) { + err := txn.KV().SetFixedTimestamp(ctx, asOf) + if err != nil { + return err + } + + rows, err := txn.QueryIteratorEx(ctx, "sql-telemetry-invalid-objects", txn.KV(), sessiondata.NodeUserSessionDataOverride, `SELECT id, error FROM "".crdb_internal.invalid_objects LIMIT $1`, maxRecords) + if err != nil { + return err + } + + defer func(it isql.Rows) { + retErr = errors.CombineErrors(retErr, it.Close()) + }(rows) + + count := int64(0) + for { + ok, err := rows.Next(ctx) + if err != nil { + return err + } + if !ok { + break + } + + count++ + row := rows.Cur() + + descID, ok := row[0].(*tree.DInt) + if !ok { + return errors.AssertionFailedf("expected id to be int (was %T)", row[0]) + } + + validationErr, ok := row[1].(*tree.DString) + if !ok { + return errors.AssertionFailedf("expected err to be string (was %T)", row[1]) + } + + log.Warningf(ctx, "found invalid object with ID %d: %q", descID, validationErr) + } + + metrics.InvalidObjects.Update(count) + + return nil + }) +} + // OnFailOrCancel is part of the jobs.Resumer interface. func (t schemaTelemetryResumer) OnFailOrCancel( ctx context.Context, execCtx interface{}, _ error, @@ -86,5 +174,6 @@ func init() { } }, jobs.DisablesTenantCostControl, + jobs.WithJobMetrics(newMetrics()), ) } diff --git a/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go b/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go index 46d1b271c904..9e76afee3a27 100644 --- a/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go +++ b/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go @@ -13,14 +13,18 @@ package schematelemetry_test import ( "context" "fmt" + "math" + "regexp" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -106,7 +110,80 @@ func TestSchemaTelemetryJob(t *testing.T) { id := res[0][0] tdb.ExecSucceedsSoon(t, fmt.Sprintf("PAUSE SCHEDULE %s", id)) tdb.CheckQueryResults(t, qHasJob, [][]string{{"0"}}) + + // NB: The following block is copied directly from + // pkg/sql/crdb_internal_test.go. It may be worthwhile to add utilities for + // generating descriptor corruption in the future rather than copying the same codeblock around. + + // Create some tables that we can corrupt the descriptors of. + tdb.Exec(t, ` +CREATE DATABASE t; +CREATE TABLE t.test (k INT8); +CREATE TABLE fktbl (id INT8 PRIMARY KEY); +CREATE TABLE tbl ( + customer INT8 NOT NULL REFERENCES fktbl (id) +); +CREATE TABLE nojob (k INT8); + `) + + // Retrieve their IDs. + databaseID := int(sqlutils.QueryDatabaseID(t, db, "t")) + tableTID := int(sqlutils.QueryTableID(t, db, "t", "public", "test")) + tableFkTblID := int(sqlutils.QueryTableID(t, db, "defaultdb", "public", "fktbl")) + tableNoJobID := int(sqlutils.QueryTableID(t, db, "defaultdb", "public", "nojob")) + const fakeID = 12345 + + // Now introduce some inconsistencies. + tdb.Exec(t, fmt.Sprintf(` +INSERT INTO system.users VALUES ('node', NULL, true, 3); +GRANT node TO root; +DELETE FROM system.descriptor WHERE id = %d; +DELETE FROM system.descriptor WHERE id = %d; +SELECT + crdb_internal.unsafe_upsert_descriptor( + id, + crdb_internal.json_to_pb( + 'cockroach.sql.sqlbase.Descriptor', + json_set( + json_set( + crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', descriptor, false), + ARRAY['table', 'mutationJobs'], + jsonb_build_array(jsonb_build_object('job_id', 123456, 'mutation_id', 1)) + ), + ARRAY['table', 'mutations'], + jsonb_build_array(jsonb_build_object('mutation_id', 1)) + ) + ), + true + ) +FROM + system.descriptor +WHERE + id = %d; +UPDATE system.namespace SET id = %d WHERE id = %d; + `, databaseID, tableFkTblID, tableNoJobID, fakeID, tableTID)) + + // Grab a handle to the job's metrics struct. + metrics := s.JobRegistry().(*jobs.Registry).MetricsStruct().JobSpecificMetrics[jobspb.TypeAutoSchemaTelemetry].(schematelemetry.Metrics) + // Run a schema telemetry job and wait for it to succeed. tdb.Exec(t, qJob) tdb.CheckQueryResultsRetry(t, qHasJob, [][]string{{"1"}}) + + // Assert that the InvalidObjects gauge is set to the number of expected + // invalid object. Our above query should have generated 9 corruptions. See + // the pkg/sql/crdb_internal_test.go:TestInvalidObjects for the breakdown of + // what exactly was done. + require.Equal(t, int64(9), metrics.InvalidObjects.Value()) + + // Ensure that our logs are flushed to disk before asserting about log + // entries. + log.Flush() + + // Ensure that a log line is emitted for each invalid object, with a loose + // enforcement of the log structure. + errorRE := regexp.MustCompile(`found invalid object with ID \d+: ".+"`) + entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1000, errorRE, log.SelectEditMode(false, false)) + require.NoError(t, err) + require.Len(t, entries, 9) } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index b076c442a9c7..02e5e571903f 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -2291,6 +2291,10 @@ var charts = []sectionDescription{ }, AxisLabel: "Schema Change Executions", }, + { + Title: "Invalid Objects", + Metrics: []string{"sql.schema.invalid_objects"}, + }, }, }, {