Skip to content

Commit

Permalink
schematelemetry: emit metrics and logs about invalid objects
Browse files Browse the repository at this point in the history
Short of continuously polling `crdb_internal.invalid_objects`, there was
not a convenient way to monitor a cluster for descriptor corruption.

Having such an indicator would allow customers to perform preflight
checks ahead of upgrades to avoid being stuck in a mixed version state.
It would also allow CRL to more easily monitor cloud clusters for
corruptions in the wild.

This commit updates the schematelemetry job to additionally update the
`sql.schema.invalid_objects` gauge and emit logs for any encountered
corruptions.

Informs: #104266
Epic: CRDB-28665

Release note (ops change): Added a new sql.schema.invalid_objects gauge
metric. This gauge is periodically updated based on the schedule set by
the sql.schema.telemetry.recurrence cluster setting. When it is updated,
it counts the number of schema objects (tables, types, schemas, databases,
and functions) that are in an invalid state according to CockroachDB’s
internal validation checks. This metric is expected to be zero in a healthy
cluster, and if it is not, it indicates that there is a problem that must
be repaired.
  • Loading branch information
chrisseto authored and rafiss committed Aug 30, 2023
1 parent a104323 commit a49fe07
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 6 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/catalog/schematelemetry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/builtins/builtinconstants",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqltelemetry",
"//pkg/util/hlc",
"//pkg/util/log",
Expand All @@ -46,8 +47,10 @@ go_test(
],
args = ["-test.timeout=295s"],
deps = [
":schematelemetry",
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobstest",
"//pkg/scheduledjobs",
"//pkg/security/securityassets",
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/catalog/schematelemetry/scheduled_job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ import (
)

type schemaTelemetryExecutor struct {
metrics schemaTelemetryMetrics
metrics schemaTelemetryJobMetrics
}

var _ jobs.ScheduledJobController = (*schemaTelemetryExecutor)(nil)
var _ jobs.ScheduledJobExecutor = (*schemaTelemetryExecutor)(nil)

type schemaTelemetryMetrics struct {
type schemaTelemetryJobMetrics struct {
*jobs.ExecutorMetrics
}

var _ metric.Struct = &schemaTelemetryMetrics{}
var _ metric.Struct = &schemaTelemetryJobMetrics{}

// MetricStruct is part of the metric.Struct interface.
func (m *schemaTelemetryMetrics) MetricStruct() {}
func (m *schemaTelemetryJobMetrics) MetricStruct() {}

// OnDrop is part of the jobs.ScheduledJobController interface.
func (s schemaTelemetryExecutor) OnDrop(
Expand Down Expand Up @@ -120,7 +120,7 @@ func init() {
func() (jobs.ScheduledJobExecutor, error) {
m := jobs.MakeExecutorMetrics(tree.ScheduledSchemaTelemetryExecutor.InternalName())
return &schemaTelemetryExecutor{
metrics: schemaTelemetryMetrics{
metrics: schemaTelemetryJobMetrics{
ExecutorMetrics: &m,
},
}, nil
Expand Down
91 changes: 90 additions & 1 deletion pkg/sql/catalog/schematelemetry/schema_telemetry_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,38 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

type Metrics struct {
InvalidObjects *metric.Gauge
}

func newMetrics() Metrics {
return Metrics{
InvalidObjects: metric.NewGauge(metric.Metadata{
Name: "sql.schema.invalid_objects",
Help: "Gauge of detected invalid objects within the system.descriptor table (measured by querying crdb_internal.invalid_objects)",
Measurement: "Objects",
Unit: metric.Unit_COUNT,
}),
}
}

// MetricStruct implements the metric.Struct interface.
func (Metrics) MetricStruct() {}

type schemaTelemetryResumer struct {
job *jobs.Job
st *cluster.Settings
Expand Down Expand Up @@ -53,8 +79,15 @@ func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{})
aostDuration = d
}
}
asOf := p.ExecCfg().Clock.Now().Add(aostDuration.Nanoseconds(), 0)

const maxRecords = 10000
asOf := p.ExecCfg().Clock.Now().Add(aostDuration.Nanoseconds(), 0)
metrics := p.ExecCfg().JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeAutoSchemaTelemetry].(Metrics)

if err := processInvalidObjects(ctx, p.ExecCfg(), asOf, &metrics, maxRecords); err != nil {
return err
}

events, err := CollectClusterSchemaForTelemetry(ctx, p.ExecCfg(), asOf, uuid.FastMakeV4(), maxRecords)
if err != nil || len(events) == 0 {
return err
Expand All @@ -66,9 +99,64 @@ func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{})
sql.LogExternally,
events...,
)

return nil
}

func processInvalidObjects(
ctx context.Context,
cfg *sql.ExecutorConfig,
asOf hlc.Timestamp,
metrics *Metrics,
maxRecords int,
) error {
return sql.DescsTxn(ctx, cfg, func(ctx context.Context, txn isql.Txn, col *descs.Collection) (retErr error) {
err := txn.KV().SetFixedTimestamp(ctx, asOf)
if err != nil {
return err
}

rows, err := txn.QueryIteratorEx(ctx, "sql-telemetry-invalid-objects", txn.KV(), sessiondata.NodeUserSessionDataOverride, `SELECT id, error FROM "".crdb_internal.invalid_objects LIMIT $1`, maxRecords)
if err != nil {
return err
}

defer func(it isql.Rows) {
retErr = errors.CombineErrors(retErr, it.Close())
}(rows)

count := int64(0)
for {
ok, err := rows.Next(ctx)
if err != nil {
return err
}
if !ok {
break
}

count++
row := rows.Cur()

descID, ok := row[0].(*tree.DInt)
if !ok {
return errors.AssertionFailedf("expected id to be int (was %T)", row[0])
}

validationErr, ok := row[1].(*tree.DString)
if !ok {
return errors.AssertionFailedf("expected err to be string (was %T)", row[1])
}

log.Warningf(ctx, "found invalid object with ID %d: %q", descID, validationErr)
}

metrics.InvalidObjects.Update(count)

return nil
})
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (t schemaTelemetryResumer) OnFailOrCancel(
ctx context.Context, execCtx interface{}, _ error,
Expand All @@ -86,5 +174,6 @@ func init() {
}
},
jobs.DisablesTenantCostControl,
jobs.WithJobMetrics(newMetrics()),
)
}
77 changes: 77 additions & 0 deletions pkg/sql/catalog/schematelemetry/schema_telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ package schematelemetry_test
import (
"context"
"fmt"
"math"
"regexp"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -106,7 +110,80 @@ func TestSchemaTelemetryJob(t *testing.T) {
id := res[0][0]
tdb.ExecSucceedsSoon(t, fmt.Sprintf("PAUSE SCHEDULE %s", id))
tdb.CheckQueryResults(t, qHasJob, [][]string{{"0"}})

// NB: The following block is copied directly from
// pkg/sql/crdb_internal_test.go. It may be worthwhile to add utilities for
// generating descriptor corruption in the future rather than copying the same codeblock around.

// Create some tables that we can corrupt the descriptors of.
tdb.Exec(t, `
CREATE DATABASE t;
CREATE TABLE t.test (k INT8);
CREATE TABLE fktbl (id INT8 PRIMARY KEY);
CREATE TABLE tbl (
customer INT8 NOT NULL REFERENCES fktbl (id)
);
CREATE TABLE nojob (k INT8);
`)

// Retrieve their IDs.
databaseID := int(sqlutils.QueryDatabaseID(t, db, "t"))
tableTID := int(sqlutils.QueryTableID(t, db, "t", "public", "test"))
tableFkTblID := int(sqlutils.QueryTableID(t, db, "defaultdb", "public", "fktbl"))
tableNoJobID := int(sqlutils.QueryTableID(t, db, "defaultdb", "public", "nojob"))
const fakeID = 12345

// Now introduce some inconsistencies.
tdb.Exec(t, fmt.Sprintf(`
INSERT INTO system.users VALUES ('node', NULL, true, 3);
GRANT node TO root;
DELETE FROM system.descriptor WHERE id = %d;
DELETE FROM system.descriptor WHERE id = %d;
SELECT
crdb_internal.unsafe_upsert_descriptor(
id,
crdb_internal.json_to_pb(
'cockroach.sql.sqlbase.Descriptor',
json_set(
json_set(
crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', descriptor, false),
ARRAY['table', 'mutationJobs'],
jsonb_build_array(jsonb_build_object('job_id', 123456, 'mutation_id', 1))
),
ARRAY['table', 'mutations'],
jsonb_build_array(jsonb_build_object('mutation_id', 1))
)
),
true
)
FROM
system.descriptor
WHERE
id = %d;
UPDATE system.namespace SET id = %d WHERE id = %d;
`, databaseID, tableFkTblID, tableNoJobID, fakeID, tableTID))

// Grab a handle to the job's metrics struct.
metrics := s.JobRegistry().(*jobs.Registry).MetricsStruct().JobSpecificMetrics[jobspb.TypeAutoSchemaTelemetry].(schematelemetry.Metrics)

// Run a schema telemetry job and wait for it to succeed.
tdb.Exec(t, qJob)
tdb.CheckQueryResultsRetry(t, qHasJob, [][]string{{"1"}})

// Assert that the InvalidObjects gauge is set to the number of expected
// invalid object. Our above query should have generated 9 corruptions. See
// the pkg/sql/crdb_internal_test.go:TestInvalidObjects for the breakdown of
// what exactly was done.
require.Equal(t, int64(9), metrics.InvalidObjects.Value())

// Ensure that our logs are flushed to disk before asserting about log
// entries.
log.Flush()

// Ensure that a log line is emitted for each invalid object, with a loose
// enforcement of the log structure.
errorRE := regexp.MustCompile(`found invalid object with ID \d+: ".+"`)
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1000, errorRE, log.SelectEditMode(false, false))
require.NoError(t, err)
require.Len(t, entries, 9)
}
4 changes: 4 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,10 @@ var charts = []sectionDescription{
},
AxisLabel: "Schema Change Executions",
},
{
Title: "Invalid Objects",
Metrics: []string{"sql.schema.invalid_objects"},
},
},
},
{
Expand Down

0 comments on commit a49fe07

Please sign in to comment.