Skip to content

Commit

Permalink
server: Split startup from initialization of TableStatsCache.
Browse files Browse the repository at this point in the history
Table statistics cache starts a rangefeed process very early
during server initialization.  This type of work should be started
when "start" method is called.  This PR does just that.

Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Jan 26, 2022
1 parent e1ceeda commit faa1986
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 44 deletions.
6 changes: 3 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,13 +683,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
),

TableStatsCache: stats.NewTableStatisticsCache(
ctx,
cfg.TableStatCacheSize,
cfg.db,
cfg.circularInternalExecutor,
codec,
cfg.Settings,
cfg.rangeFeedFactory,
collectionFactory,
),

Expand Down Expand Up @@ -1079,6 +1076,9 @@ func (s *SQLServer) preStart(
return err
}
s.stmtDiagnosticsRegistry.Start(ctx, stopper)
if err := s.execCfg.TableStatsCache.Start(ctx, s.execCfg.Codec, s.execCfg.RangeFeedFactory); err != nil {
return err
}

// Before serving SQL requests, we have to make sure the database is
// in an acceptable form for this version of the software.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/stats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ go_test(
"//pkg/util/retry",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)

Expand Down
17 changes: 5 additions & 12 deletions pkg/sql/stats/automatic_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestMaybeRefreshStats(t *testing.T) {
Expand Down Expand Up @@ -64,15 +65,13 @@ func TestMaybeRefreshStats(t *testing.T) {
executor := s.InternalExecutor().(sqlutil.InternalExecutor)
descA := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a")
cache := NewTableStatisticsCache(
ctx,
10, /* cacheSize */
kvDB,
executor,
keys.SystemSQLCodec,
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
s.CollectionFactory().(*descs.CollectionFactory),
)
require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory)))
refresher := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */)

// There should not be any stats yet.
Expand Down Expand Up @@ -144,15 +143,13 @@ func TestAverageRefreshTime(t *testing.T) {
executor := s.InternalExecutor().(sqlutil.InternalExecutor)
table := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a")
cache := NewTableStatisticsCache(
ctx,
10, /* cacheSize */
kvDB,
executor,
keys.SystemSQLCodec,
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
s.CollectionFactory().(*descs.CollectionFactory),
)
require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory)))
refresher := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */)

// curTime is used as the current time throughout the test to ensure that the
Expand Down Expand Up @@ -393,15 +390,13 @@ func TestAutoStatsReadOnlyTables(t *testing.T) {

executor := s.InternalExecutor().(sqlutil.InternalExecutor)
cache := NewTableStatisticsCache(
ctx,
10, /* cacheSize */
kvDB,
executor,
keys.SystemSQLCodec,
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
s.CollectionFactory().(*descs.CollectionFactory),
)
require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory)))
refresher := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */)

AutomaticStatisticsClusterMode.Override(ctx, &st.SV, true)
Expand Down Expand Up @@ -441,15 +436,13 @@ func TestNoRetryOnFailure(t *testing.T) {

executor := s.InternalExecutor().(sqlutil.InternalExecutor)
cache := NewTableStatisticsCache(
ctx,
10, /* cacheSize */
kvDB,
executor,
keys.SystemSQLCodec,
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
s.CollectionFactory().(*descs.CollectionFactory),
)
require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory)))
r := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */)

// Try to refresh stats on a table that doesn't exist.
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/stats/create_stats_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestAtMostOneRunningCreateStats(t *testing.T) {
}
}

// Attempt to start an automatic stats run. It should fail.
// Attempt to Start an automatic stats run. It should fail.
autoStatsRunShouldFail()

// PAUSE JOB does not bloack until the job is paused but only requests it.
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestAtMostOneRunningCreateStats(t *testing.T) {
// Starting another automatic stats run should still fail.
autoStatsRunShouldFail()

// Attempt to start a regular stats run. It should succeed.
// Attempt to Start a regular stats run. It should succeed.
errCh2 := make(chan error)
go func() {
_, err := conn.Exec(`CREATE STATISTICS s2 FROM d.t`)
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/stats/delete_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestDeleteOldStatsForColumns(t *testing.T) {
Expand All @@ -41,15 +42,13 @@ func TestDeleteOldStatsForColumns(t *testing.T) {
defer s.Stopper().Stop(ctx)
ex := s.InternalExecutor().(sqlutil.InternalExecutor)
cache := NewTableStatisticsCache(
ctx,
10, /* cacheSize */
db,
ex,
keys.SystemSQLCodec,
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
s.CollectionFactory().(*descs.CollectionFactory),
)
require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory)))

// The test data must be ordered by CreatedAt DESC so the calculated set of
// expected deleted stats is correct.
Expand Down
24 changes: 12 additions & 12 deletions pkg/sql/stats/stats_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ type TableStatisticsCache struct {
}
ClientDB *kv.DB
SQLExecutor sqlutil.InternalExecutor
Codec keys.SQLCodec
Settings *cluster.Settings

// Used to resolve descriptors.
Expand Down Expand Up @@ -112,27 +111,29 @@ type cacheEntry struct {
// NewTableStatisticsCache creates a new TableStatisticsCache that can hold
// statistics for <cacheSize> tables.
func NewTableStatisticsCache(
ctx context.Context,
cacheSize int,
db *kv.DB,
sqlExecutor sqlutil.InternalExecutor,
codec keys.SQLCodec,
settings *cluster.Settings,
rangeFeedFactory *rangefeed.Factory,
cf *descs.CollectionFactory,
) *TableStatisticsCache {
tableStatsCache := &TableStatisticsCache{
ClientDB: db,
SQLExecutor: sqlExecutor,
Codec: codec,
Settings: settings,
collectionFactory: cf,
}
tableStatsCache.mu.cache = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheLRU,
ShouldEvict: func(s int, key, value interface{}) bool { return s > cacheSize },
})
return tableStatsCache
}

// Start begins watching for updates in the stats table.
func (sc *TableStatisticsCache) Start(
ctx context.Context, codec keys.SQLCodec, rangeFeedFactory *rangefeed.Factory,
) error {
// Set up a range feed to watch for updates to system.table_statistics.

statsTablePrefix := codec.TablePrefix(keys.TableStatisticsTableID)
Expand All @@ -145,7 +146,7 @@ func NewTableStatisticsCache(
var lastTS hlc.Timestamp

handleEvent := func(ctx context.Context, kv *roachpb.RangeFeedValue) {
tableID, err := decodeTableStatisticsKV(codec, kv, &tableStatsCache.datumAlloc)
tableID, err := decodeTableStatisticsKV(codec, kv, &sc.datumAlloc)
if err != nil {
log.Warningf(ctx, "failed to decode table statistics row %v: %v", kv.Key, err)
return
Expand All @@ -159,23 +160,22 @@ func NewTableStatisticsCache(
}
lastTableID = tableID
lastTS = ts
tableStatsCache.refreshTableStats(ctx, tableID, ts)
sc.refreshTableStats(ctx, tableID, ts)
}

// Notes:
// - the range feed automatically stops on server shutdown, we don't need to
// call Close() ourselves.
// - an error here only happens if the server is already shutting down; we
// can safely ignore it.
_, _ = rangeFeedFactory.RangeFeed(
_, err := rangeFeedFactory.RangeFeed(
ctx,
"table-stats-cache",
[]roachpb.Span{statsTableSpan},
db.Clock().Now(),
sc.ClientDB.Clock().Now(),
handleEvent,
)

return tableStatsCache
return err
}

// decodeTableStatisticsKV decodes the table ID from a range feed event on
Expand Down Expand Up @@ -541,7 +541,7 @@ func (sc *TableStatisticsCache) parseStats(
// get from the lease manager will be able to be used to decode these stats,
// even if it wasn't the descriptor that was used to collect the stats.
// If have types that are not backwards compatible in this way, then we
// will need to start writing a timestamp on the stats objects and request
// will need to Start writing a timestamp on the stats objects and request
// TypeDescriptor's with the timestamp that the stats were recorded with.
//
// TODO(ajwerner): We now do delete members from enum types. See #67050.
Expand Down
17 changes: 5 additions & 12 deletions pkg/sql/stats/stats_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func insertTableStat(
Expand Down Expand Up @@ -240,15 +241,13 @@ func TestCacheBasic(t *testing.T) {
// will result in the cache getting populated. When the stats cache size is
// exceeded, entries should be evicted according to the LRU policy.
sc := NewTableStatisticsCache(
ctx,
2, /* cacheSize */
db,
ex,
keys.SystemSQLCodec,
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
s.CollectionFactory().(*descs.CollectionFactory),
)
require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory)))
for _, tableID := range tableIDs {
checkStatsForTable(ctx, t, sc, expectedStats[tableID], tableID)
}
Expand Down Expand Up @@ -350,15 +349,13 @@ CREATE STATISTICS s FROM tt;
_ = kvDB
// Make a stats cache.
sc := NewTableStatisticsCache(
ctx,
1,
kvDB,
s.InternalExecutor().(sqlutil.InternalExecutor),
keys.SystemSQLCodec,
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
s.CollectionFactory().(*descs.CollectionFactory),
)
require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory)))
tbl := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "tt")
// Get stats for our table. We are ensuring here that the access to the stats
// for tt properly hydrates the user defined type t before access.
Expand Down Expand Up @@ -411,15 +408,13 @@ func TestCacheWait(t *testing.T) {
}
sort.Sort(tableIDs)
sc := NewTableStatisticsCache(
ctx,
len(tableIDs), /* cacheSize */
db,
ex,
keys.SystemSQLCodec,
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
s.CollectionFactory().(*descs.CollectionFactory),
)
require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory)))
for _, tableID := range tableIDs {
checkStatsForTable(ctx, t, sc, expectedStats[tableID], tableID)
}
Expand Down Expand Up @@ -468,15 +463,13 @@ func TestCacheAutoRefresh(t *testing.T) {

s := tc.Server(0)
sc := NewTableStatisticsCache(
ctx,
10, /* cacheSize */
s.DB(),
s.InternalExecutor().(sqlutil.InternalExecutor),
keys.SystemSQLCodec,
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
s.CollectionFactory().(*descs.CollectionFactory),
)
require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory)))

sr0 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
sr0.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false")
Expand Down

0 comments on commit faa1986

Please sign in to comment.