diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 65cd07b445b4..1c2b6d33995a 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -683,13 +683,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ), TableStatsCache: stats.NewTableStatisticsCache( - ctx, cfg.TableStatCacheSize, cfg.db, cfg.circularInternalExecutor, - codec, cfg.Settings, - cfg.rangeFeedFactory, collectionFactory, ), @@ -1079,6 +1076,9 @@ func (s *SQLServer) preStart( return err } s.stmtDiagnosticsRegistry.Start(ctx, stopper) + if err := s.execCfg.TableStatsCache.Start(ctx, s.execCfg.Codec, s.execCfg.RangeFeedFactory); err != nil { + return err + } // Before serving SQL requests, we have to make sure the database is // in an acceptable form for this version of the software. diff --git a/pkg/sql/stats/BUILD.bazel b/pkg/sql/stats/BUILD.bazel index 6ad3932cb2f3..50a00fe88311 100644 --- a/pkg/sql/stats/BUILD.bazel +++ b/pkg/sql/stats/BUILD.bazel @@ -113,6 +113,7 @@ go_test( "//pkg/util/retry", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/stats/automatic_stats_test.go b/pkg/sql/stats/automatic_stats_test.go index 592c6d7c5cc5..2a5abdcaf615 100644 --- a/pkg/sql/stats/automatic_stats_test.go +++ b/pkg/sql/stats/automatic_stats_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" ) func TestMaybeRefreshStats(t *testing.T) { @@ -64,15 +65,13 @@ func TestMaybeRefreshStats(t *testing.T) { executor := s.InternalExecutor().(sqlutil.InternalExecutor) descA := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a") cache := NewTableStatisticsCache( - ctx, 10, /* cacheSize */ kvDB, executor, - keys.SystemSQLCodec, s.ClusterSettings(), - s.RangeFeedFactory().(*rangefeed.Factory), s.CollectionFactory().(*descs.CollectionFactory), ) + require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) refresher := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */) // There should not be any stats yet. @@ -144,15 +143,13 @@ func TestAverageRefreshTime(t *testing.T) { executor := s.InternalExecutor().(sqlutil.InternalExecutor) table := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a") cache := NewTableStatisticsCache( - ctx, 10, /* cacheSize */ kvDB, executor, - keys.SystemSQLCodec, s.ClusterSettings(), - s.RangeFeedFactory().(*rangefeed.Factory), s.CollectionFactory().(*descs.CollectionFactory), ) + require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) refresher := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */) // curTime is used as the current time throughout the test to ensure that the @@ -393,15 +390,13 @@ func TestAutoStatsReadOnlyTables(t *testing.T) { executor := s.InternalExecutor().(sqlutil.InternalExecutor) cache := NewTableStatisticsCache( - ctx, 10, /* cacheSize */ kvDB, executor, - keys.SystemSQLCodec, s.ClusterSettings(), - s.RangeFeedFactory().(*rangefeed.Factory), s.CollectionFactory().(*descs.CollectionFactory), ) + require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) refresher := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */) AutomaticStatisticsClusterMode.Override(ctx, &st.SV, true) @@ -441,15 +436,13 @@ func TestNoRetryOnFailure(t *testing.T) { executor := s.InternalExecutor().(sqlutil.InternalExecutor) cache := NewTableStatisticsCache( - ctx, 10, /* cacheSize */ kvDB, executor, - keys.SystemSQLCodec, s.ClusterSettings(), - s.RangeFeedFactory().(*rangefeed.Factory), s.CollectionFactory().(*descs.CollectionFactory), ) + require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) r := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */) // Try to refresh stats on a table that doesn't exist. diff --git a/pkg/sql/stats/create_stats_job_test.go b/pkg/sql/stats/create_stats_job_test.go index 0e1506a20377..f0d340c4c090 100644 --- a/pkg/sql/stats/create_stats_job_test.go +++ b/pkg/sql/stats/create_stats_job_test.go @@ -178,7 +178,7 @@ func TestAtMostOneRunningCreateStats(t *testing.T) { } } - // Attempt to start an automatic stats run. It should fail. + // Attempt to Start an automatic stats run. It should fail. autoStatsRunShouldFail() // PAUSE JOB does not bloack until the job is paused but only requests it. @@ -208,7 +208,7 @@ func TestAtMostOneRunningCreateStats(t *testing.T) { // Starting another automatic stats run should still fail. autoStatsRunShouldFail() - // Attempt to start a regular stats run. It should succeed. + // Attempt to Start a regular stats run. It should succeed. errCh2 := make(chan error) go func() { _, err := conn.Exec(`CREATE STATISTICS s2 FROM d.t`) diff --git a/pkg/sql/stats/delete_stats_test.go b/pkg/sql/stats/delete_stats_test.go index 83c3d1c09811..c37fb7914634 100644 --- a/pkg/sql/stats/delete_stats_test.go +++ b/pkg/sql/stats/delete_stats_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" ) func TestDeleteOldStatsForColumns(t *testing.T) { @@ -41,15 +42,13 @@ func TestDeleteOldStatsForColumns(t *testing.T) { defer s.Stopper().Stop(ctx) ex := s.InternalExecutor().(sqlutil.InternalExecutor) cache := NewTableStatisticsCache( - ctx, 10, /* cacheSize */ db, ex, - keys.SystemSQLCodec, s.ClusterSettings(), - s.RangeFeedFactory().(*rangefeed.Factory), s.CollectionFactory().(*descs.CollectionFactory), ) + require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) // The test data must be ordered by CreatedAt DESC so the calculated set of // expected deleted stats is correct. diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go index 68830b6ae899..893b0a38fbcb 100644 --- a/pkg/sql/stats/stats_cache.go +++ b/pkg/sql/stats/stats_cache.go @@ -67,7 +67,6 @@ type TableStatisticsCache struct { } ClientDB *kv.DB SQLExecutor sqlutil.InternalExecutor - Codec keys.SQLCodec Settings *cluster.Settings // Used to resolve descriptors. @@ -112,19 +111,15 @@ type cacheEntry struct { // NewTableStatisticsCache creates a new TableStatisticsCache that can hold // statistics for tables. func NewTableStatisticsCache( - ctx context.Context, cacheSize int, db *kv.DB, sqlExecutor sqlutil.InternalExecutor, - codec keys.SQLCodec, settings *cluster.Settings, - rangeFeedFactory *rangefeed.Factory, cf *descs.CollectionFactory, ) *TableStatisticsCache { tableStatsCache := &TableStatisticsCache{ ClientDB: db, SQLExecutor: sqlExecutor, - Codec: codec, Settings: settings, collectionFactory: cf, } @@ -132,7 +127,13 @@ func NewTableStatisticsCache( Policy: cache.CacheLRU, ShouldEvict: func(s int, key, value interface{}) bool { return s > cacheSize }, }) + return tableStatsCache +} +// Start begins watching for updates in the stats table. +func (sc *TableStatisticsCache) Start( + ctx context.Context, codec keys.SQLCodec, rangeFeedFactory *rangefeed.Factory, +) error { // Set up a range feed to watch for updates to system.table_statistics. statsTablePrefix := codec.TablePrefix(keys.TableStatisticsTableID) @@ -145,7 +146,7 @@ func NewTableStatisticsCache( var lastTS hlc.Timestamp handleEvent := func(ctx context.Context, kv *roachpb.RangeFeedValue) { - tableID, err := decodeTableStatisticsKV(codec, kv, &tableStatsCache.datumAlloc) + tableID, err := decodeTableStatisticsKV(codec, kv, &sc.datumAlloc) if err != nil { log.Warningf(ctx, "failed to decode table statistics row %v: %v", kv.Key, err) return @@ -159,7 +160,7 @@ func NewTableStatisticsCache( } lastTableID = tableID lastTS = ts - tableStatsCache.refreshTableStats(ctx, tableID, ts) + sc.refreshTableStats(ctx, tableID, ts) } // Notes: @@ -167,15 +168,14 @@ func NewTableStatisticsCache( // call Close() ourselves. // - an error here only happens if the server is already shutting down; we // can safely ignore it. - _, _ = rangeFeedFactory.RangeFeed( + _, err := rangeFeedFactory.RangeFeed( ctx, "table-stats-cache", []roachpb.Span{statsTableSpan}, - db.Clock().Now(), + sc.ClientDB.Clock().Now(), handleEvent, ) - - return tableStatsCache + return err } // decodeTableStatisticsKV decodes the table ID from a range feed event on @@ -541,7 +541,7 @@ func (sc *TableStatisticsCache) parseStats( // get from the lease manager will be able to be used to decode these stats, // even if it wasn't the descriptor that was used to collect the stats. // If have types that are not backwards compatible in this way, then we - // will need to start writing a timestamp on the stats objects and request + // will need to Start writing a timestamp on the stats objects and request // TypeDescriptor's with the timestamp that the stats were recorded with. // // TODO(ajwerner): We now do delete members from enum types. See #67050. diff --git a/pkg/sql/stats/stats_cache_test.go b/pkg/sql/stats/stats_cache_test.go index 460f4c68e7e9..54faa44bc43d 100644 --- a/pkg/sql/stats/stats_cache_test.go +++ b/pkg/sql/stats/stats_cache_test.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) func insertTableStat( @@ -240,15 +241,13 @@ func TestCacheBasic(t *testing.T) { // will result in the cache getting populated. When the stats cache size is // exceeded, entries should be evicted according to the LRU policy. sc := NewTableStatisticsCache( - ctx, 2, /* cacheSize */ db, ex, - keys.SystemSQLCodec, s.ClusterSettings(), - s.RangeFeedFactory().(*rangefeed.Factory), s.CollectionFactory().(*descs.CollectionFactory), ) + require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) for _, tableID := range tableIDs { checkStatsForTable(ctx, t, sc, expectedStats[tableID], tableID) } @@ -350,15 +349,13 @@ CREATE STATISTICS s FROM tt; _ = kvDB // Make a stats cache. sc := NewTableStatisticsCache( - ctx, 1, kvDB, s.InternalExecutor().(sqlutil.InternalExecutor), - keys.SystemSQLCodec, s.ClusterSettings(), - s.RangeFeedFactory().(*rangefeed.Factory), s.CollectionFactory().(*descs.CollectionFactory), ) + require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) tbl := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "tt") // Get stats for our table. We are ensuring here that the access to the stats // for tt properly hydrates the user defined type t before access. @@ -411,15 +408,13 @@ func TestCacheWait(t *testing.T) { } sort.Sort(tableIDs) sc := NewTableStatisticsCache( - ctx, len(tableIDs), /* cacheSize */ db, ex, - keys.SystemSQLCodec, s.ClusterSettings(), - s.RangeFeedFactory().(*rangefeed.Factory), s.CollectionFactory().(*descs.CollectionFactory), ) + require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) for _, tableID := range tableIDs { checkStatsForTable(ctx, t, sc, expectedStats[tableID], tableID) } @@ -468,15 +463,13 @@ func TestCacheAutoRefresh(t *testing.T) { s := tc.Server(0) sc := NewTableStatisticsCache( - ctx, 10, /* cacheSize */ s.DB(), s.InternalExecutor().(sqlutil.InternalExecutor), - keys.SystemSQLCodec, s.ClusterSettings(), - s.RangeFeedFactory().(*rangefeed.Factory), s.CollectionFactory().(*descs.CollectionFactory), ) + require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) sr0 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) sr0.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false")