-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
stats_cache.go
490 lines (443 loc) · 15.4 KB
/
stats_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
// Copyright 2017 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package stats
import (
"context"
"sync"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)
// A TableStatistic object holds a statistic for a particular column or group
// of columns.
type TableStatistic struct {
TableStatisticProto
// Histogram is the decoded histogram data.
Histogram []cat.HistogramBucket
}
// A TableStatisticsCache contains two underlying LRU caches:
// (1) A cache of []*TableStatistic objects, keyed by table ID.
// Each entry consists of all the statistics for different columns and
// column groups for the given table.
// (2) A cache of *HistogramData objects, keyed by
// HistogramCacheKey{table ID, statistic ID}.
type TableStatisticsCache struct {
// NB: This can't be a RWMutex for lookup because UnorderedCache.Get
// manipulates an internal LRU list.
mu struct {
syncutil.Mutex
cache *cache.UnorderedCache
// Used for testing; keeps track of how many times we actually read stats
// from the system table.
numInternalQueries int64
}
ClientDB *kv.DB
SQLExecutor sqlutil.InternalExecutor
Codec keys.SQLCodec
LeaseMgr *lease.Manager
Settings *cluster.Settings
}
// The cache stores *cacheEntry objects. The fields are protected by the
// cache-wide mutex.
type cacheEntry struct {
// If mustWait is true, we do not have any statistics for this table and we
// are in the process of fetching the stats from the database. Other callers
// can wait on the waitCond until this is false.
mustWait bool
waitCond sync.Cond
// If refreshing is true, the current statistics for this table are stale,
// and we are in the process of fetching the updated stats from the database.
// In the mean time, other callers can use the stale stats and do not need to
// wait.
//
// If a goroutine tries to perform a refresh when a refresh is already
// in progress, it will see that refreshing=true and will set the
// mustRefreshAgain flag to true before returning. When the original
// goroutine that was performing the refresh returns from the database and
// sees that mustRefreshAgain=true, it will trigger another refresh.
refreshing bool
mustRefreshAgain bool
stats []*TableStatistic
// err is populated if the internal query to retrieve stats hit an error.
err error
}
// NewTableStatisticsCache creates a new TableStatisticsCache that can hold
// statistics for <cacheSize> tables.
func NewTableStatisticsCache(
cacheSize int,
gw gossip.OptionalGossip,
db *kv.DB,
sqlExecutor sqlutil.InternalExecutor,
codec keys.SQLCodec,
leaseManager *lease.Manager,
settings *cluster.Settings,
) *TableStatisticsCache {
tableStatsCache := &TableStatisticsCache{
ClientDB: db,
SQLExecutor: sqlExecutor,
Codec: codec,
LeaseMgr: leaseManager,
Settings: settings,
}
tableStatsCache.mu.cache = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheLRU,
ShouldEvict: func(s int, key, value interface{}) bool { return s > cacheSize },
})
// The stat cache requires redundant callbacks as it is using gossip to
// signal the presence of new stats, not to actually propagate them.
if g, ok := gw.Optional(47925); ok {
g.RegisterCallback(
gossip.MakePrefixPattern(gossip.KeyTableStatAddedPrefix),
tableStatsCache.tableStatAddedGossipUpdate,
gossip.Redundant,
)
}
return tableStatsCache
}
// tableStatAddedGossipUpdate is the gossip callback that fires when a new
// statistic is available for a table.
func (sc *TableStatisticsCache) tableStatAddedGossipUpdate(key string, value roachpb.Value) {
tableID, err := gossip.TableIDFromTableStatAddedKey(key)
if err != nil {
log.Errorf(context.Background(), "tableStatAddedGossipUpdate(%s) error: %v", key, err)
return
}
sc.RefreshTableStats(context.Background(), descpb.ID(tableID))
}
// GetTableStats looks up statistics for the requested table ID in the cache,
// and if the stats are not present in the cache, it looks them up in
// system.table_statistics.
//
// The statistics are ordered by their CreatedAt time (newest-to-oldest).
func (sc *TableStatisticsCache) GetTableStats(
ctx context.Context, tableID descpb.ID,
) ([]*TableStatistic, error) {
if descpb.IsReservedID(tableID) {
// Don't try to get statistics for system tables (most importantly,
// for table_statistics itself).
return nil, nil
}
if descpb.IsVirtualTable(tableID) {
// Don't try to get statistics for virtual tables.
return nil, nil
}
sc.mu.Lock()
defer sc.mu.Unlock()
if found, e := sc.lookupStatsLocked(ctx, tableID); found {
return e.stats, e.err
}
return sc.addCacheEntryLocked(ctx, tableID)
}
// lookupStatsLocked retrieves any existing stats for the given table.
//
// If another goroutine is in the process of retrieving the same stats, this
// method waits until that completes.
//
// Assumes that the caller holds sc.mu. Note that the mutex can be unlocked and
// locked again if we need to wait (this can only happen when found=true).
func (sc *TableStatisticsCache) lookupStatsLocked(
ctx context.Context, tableID descpb.ID,
) (found bool, e *cacheEntry) {
eUntyped, ok := sc.mu.cache.Get(tableID)
if !ok {
return false, nil
}
e = eUntyped.(*cacheEntry)
if e.mustWait {
// We are in the process of grabbing stats for this table. Wait until
// that is complete, at which point e.stats will be populated.
if log.V(1) {
log.Infof(ctx, "waiting for statistics for table %d", tableID)
}
e.waitCond.Wait()
} else {
if log.V(2) {
log.Infof(ctx, "statistics for table %d found in cache", tableID)
}
}
return true, e
}
// addCacheEntryLocked creates a new cache entry and retrieves table statistics
// from the database. It does this in a way so that the other goroutines that
// need the same stats can wait on us:
// - an cache entry with wait=true is created;
// - mutex is unlocked;
// - stats are retrieved from database:
// - mutex is locked again and the entry is updated.
//
func (sc *TableStatisticsCache) addCacheEntryLocked(
ctx context.Context, tableID descpb.ID,
) (stats []*TableStatistic, err error) {
if log.V(1) {
log.Infof(ctx, "reading statistics for table %d", tableID)
}
// Add a cache entry that other queries can find and wait on until we have the
// stats.
e := &cacheEntry{
mustWait: true,
waitCond: sync.Cond{L: &sc.mu},
}
sc.mu.cache.Add(tableID, e)
sc.mu.numInternalQueries++
func() {
sc.mu.Unlock()
defer sc.mu.Lock()
stats, err = sc.getTableStatsFromDB(ctx, tableID)
}()
e.mustWait = false
e.stats, e.err = stats, err
// Wake up any other callers that are waiting on these stats.
e.waitCond.Broadcast()
if err != nil {
// Don't keep the cache entry around, so that we retry the query.
sc.mu.cache.Del(tableID)
}
return stats, err
}
// refreshCacheEntry retrieves table statistics from the database and updates
// an existing cache entry. It does this in a way so that the other goroutines
// can continue using the stale stats from the existing entry until the new
// stats are added:
// - the existing cache entry is retrieved;
// - mutex is unlocked;
// - stats are retrieved from database:
// - mutex is locked again and the entry is updated.
//
func (sc *TableStatisticsCache) refreshCacheEntry(ctx context.Context, tableID descpb.ID) {
sc.mu.Lock()
defer sc.mu.Unlock()
if log.V(1) {
log.Infof(ctx, "reading statistics for table %d", tableID)
}
// If the stats don't already exist in the cache, don't bother performing
// the refresh. If e.err is not nil, the stats are in the process of being
// removed from the cache (see addCacheEntryLocked), so don't refresh in this
// case either.
found, e := sc.lookupStatsLocked(ctx, tableID)
if !found || e.err != nil {
return
}
// Don't perform a refresh if a refresh is already in progress, but let that
// goroutine know it needs to refresh again.
if e.refreshing {
e.mustRefreshAgain = true
return
}
e.refreshing = true
var stats []*TableStatistic
var err error
for {
func() {
sc.mu.numInternalQueries++
sc.mu.Unlock()
defer sc.mu.Lock()
stats, err = sc.getTableStatsFromDB(ctx, tableID)
}()
if !e.mustRefreshAgain {
break
}
e.mustRefreshAgain = false
}
e.stats, e.err = stats, err
e.refreshing = false
if err != nil {
// Don't keep the cache entry around, so that we retry the query.
sc.mu.cache.Del(tableID)
}
}
// RefreshTableStats refreshes the cached statistics for the given table ID
// by fetching the new stats from the database.
func (sc *TableStatisticsCache) RefreshTableStats(ctx context.Context, tableID descpb.ID) {
if log.V(1) {
log.Infof(ctx, "refreshing statistics for table %d", tableID)
}
// Perform an asynchronous refresh of the cache.
go sc.refreshCacheEntry(ctx, tableID)
}
// InvalidateTableStats invalidates the cached statistics for the given table ID.
//
// Note that RefreshTableStats should normally be used instead of this function.
// This function is used only when we want to guarantee that the next query
// uses updated stats.
func (sc *TableStatisticsCache) InvalidateTableStats(ctx context.Context, tableID descpb.ID) {
if log.V(1) {
log.Infof(ctx, "evicting statistics for table %d", tableID)
}
sc.mu.Lock()
defer sc.mu.Unlock()
sc.mu.cache.Del(tableID)
}
const (
tableIDIndex = iota
statisticsIDIndex
nameIndex
columnIDsIndex
createdAtIndex
rowCountIndex
distinctCountIndex
nullCountIndex
histogramIndex
statsLen
)
// parseStats converts the given datums to a TableStatistic object. It might
// need to run a query to get user defined type metadata.
func (sc *TableStatisticsCache) parseStats(
ctx context.Context, datums tree.Datums,
) (*TableStatistic, error) {
if datums == nil || datums.Len() == 0 {
return nil, nil
}
// Validate the input length.
if datums.Len() != statsLen {
return nil, errors.Errorf("%d values returned from table statistics lookup. Expected %d", datums.Len(), statsLen)
}
// Validate the input types.
expectedTypes := []struct {
fieldName string
fieldIndex int
expectedType *types.T
nullable bool
}{
{"tableID", tableIDIndex, types.Int, false},
{"statisticsID", statisticsIDIndex, types.Int, false},
{"name", nameIndex, types.String, true},
{"columnIDs", columnIDsIndex, types.IntArray, false},
{"createdAt", createdAtIndex, types.Timestamp, false},
{"rowCount", rowCountIndex, types.Int, false},
{"distinctCount", distinctCountIndex, types.Int, false},
{"nullCount", nullCountIndex, types.Int, false},
{"histogram", histogramIndex, types.Bytes, true},
}
for _, v := range expectedTypes {
if !datums[v.fieldIndex].ResolvedType().Equivalent(v.expectedType) &&
(!v.nullable || datums[v.fieldIndex].ResolvedType().Family() != types.UnknownFamily) {
return nil, errors.Errorf("%s returned from table statistics lookup has type %s. Expected %s",
v.fieldName, datums[v.fieldIndex].ResolvedType(), v.expectedType)
}
}
// Extract datum values.
res := &TableStatistic{
TableStatisticProto: TableStatisticProto{
TableID: descpb.ID((int32)(*datums[tableIDIndex].(*tree.DInt))),
StatisticID: (uint64)(*datums[statisticsIDIndex].(*tree.DInt)),
CreatedAt: datums[createdAtIndex].(*tree.DTimestamp).Time,
RowCount: (uint64)(*datums[rowCountIndex].(*tree.DInt)),
DistinctCount: (uint64)(*datums[distinctCountIndex].(*tree.DInt)),
NullCount: (uint64)(*datums[nullCountIndex].(*tree.DInt)),
},
}
columnIDs := datums[columnIDsIndex].(*tree.DArray)
res.ColumnIDs = make([]descpb.ColumnID, len(columnIDs.Array))
for i, d := range columnIDs.Array {
res.ColumnIDs[i] = descpb.ColumnID((int32)(*d.(*tree.DInt)))
}
if datums[nameIndex] != tree.DNull {
res.Name = string(*datums[nameIndex].(*tree.DString))
}
if datums[histogramIndex] != tree.DNull {
res.HistogramData = &HistogramData{}
if err := protoutil.Unmarshal(
[]byte(*datums[histogramIndex].(*tree.DBytes)),
res.HistogramData,
); err != nil {
return nil, err
}
// Decode the histogram data so that it's usable by the opt catalog.
res.Histogram = make([]cat.HistogramBucket, len(res.HistogramData.Buckets))
// Hydrate the type in case any user defined types are present.
// There are cases where typ is nil, so don't do anything if so.
if typ := res.HistogramData.ColumnType; typ != nil && typ.UserDefined() {
// The metadata accessed here is never older than the metadata used when
// collecting the stats. Changes to types are backwards compatible across
// versions, so using a newer version of the type metadata here is safe.
// Additionally, the leased type that is accessed here is guaranteed to
// be able to read values of the type at any time due to invariants
// maintained when types are changed.
err := sc.ClientDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
collection := descs.NewCollection(sc.LeaseMgr, sc.Settings)
defer collection.ReleaseAll(ctx)
resolver := descs.NewDistSQLTypeResolver(collection, txn)
var err error
res.HistogramData.ColumnType, err = resolver.ResolveTypeByOID(ctx, typ.Oid())
return err
})
if err != nil {
return nil, err
}
}
var a sqlbase.DatumAlloc
for i := range res.Histogram {
bucket := &res.HistogramData.Buckets[i]
datum, _, err := sqlbase.DecodeTableKey(&a, res.HistogramData.ColumnType, bucket.UpperBound, encoding.Ascending)
if err != nil {
return nil, err
}
res.Histogram[i] = cat.HistogramBucket{
NumEq: float64(bucket.NumEq),
NumRange: float64(bucket.NumRange),
DistinctRange: bucket.DistinctRange,
UpperBound: datum,
}
}
}
return res, nil
}
// getTableStatsFromDB retrieves the statistics in system.table_statistics
// for the given table ID.
func (sc *TableStatisticsCache) getTableStatsFromDB(
ctx context.Context, tableID descpb.ID,
) ([]*TableStatistic, error) {
const getTableStatisticsStmt = `
SELECT
"tableID",
"statisticID",
name,
"columnIDs",
"createdAt",
"rowCount",
"distinctCount",
"nullCount",
histogram
FROM system.table_statistics
WHERE "tableID" = $1
ORDER BY "createdAt" DESC
`
rows, err := sc.SQLExecutor.Query(
ctx, "get-table-statistics", nil /* txn */, getTableStatisticsStmt, tableID,
)
if err != nil {
return nil, err
}
var statsList []*TableStatistic
for _, row := range rows {
stats, err := sc.parseStats(ctx, row)
if err != nil {
return nil, err
}
statsList = append(statsList, stats)
}
return statsList, nil
}