-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathcache.go
149 lines (140 loc) · 5.45 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package cacheutil
import (
"context"
"unsafe"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)
// Cache is a shared cache for hashed passwords and other information used
// during user authentication and session initialization.
type Cache struct {
syncutil.Mutex
boundAccount mon.BoundAccount
tableVersions []descpb.DescriptorVersion
// TODO(richardjcai): In go1.18 we can use generics.
cache map[interface{}]interface{}
populateCacheGroup singleflight.Group
stopper *stop.Stopper
}
// NewCache initializes a new Cache.
// numSystemTables is the number of system tables backing the cache.
// We use it to initialize the tableVersions slice to 0 for each table.
func NewCache(account mon.BoundAccount, stopper *stop.Stopper, numSystemTables int) *Cache {
tableVersions := make([]descpb.DescriptorVersion, numSystemTables)
return &Cache{
tableVersions: tableVersions,
boundAccount: account,
stopper: stopper,
}
}
// GetValueLocked returns the value and if the key is found in the cache.
// The cache lock must be held while calling this.
func (c *Cache) GetValueLocked(key interface{}) (interface{}, bool) {
val, ok := c.cache[key]
return val, ok
}
// LoadValueOutsideOfCache loads the value for the given requestKey using the provided
// function. It ensures that there is only at most one in-flight request for
// each key at any time.
func (c *Cache) LoadValueOutsideOfCache(
ctx context.Context, requestKey string, fn func(loadCtx context.Context) (interface{}, error),
) (interface{}, error) {
ch, _ := c.populateCacheGroup.DoChan(requestKey, func() (interface{}, error) {
// Use a different context to fetch, so that it isn't possible for
// one query to timeout and cause all the goroutines that are waiting
// to get a timeout error.
loadCtx, cancel := c.stopper.WithCancelOnQuiesce(
logtags.WithTags(context.Background(), logtags.FromContext(ctx)),
)
defer cancel()
return fn(loadCtx)
})
select {
case res := <-ch:
if res.Err != nil {
return nil, res.Err
}
return res.Val, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// MaybeWriteBackToCache tries to put the key, value into the
// cache, and returns true if it succeeded. If the underlying system
// tables have been modified since they were read, the cache is not
// updated.
// Note that reading from system tables may give us data from a newer table
// version than the one we pass in here, that is okay since the cache will
// be invalidated upon the next read.
func (c *Cache) MaybeWriteBackToCache(
ctx context.Context, tableVersions []descpb.DescriptorVersion, key interface{}, entry interface{},
) bool {
c.Lock()
defer c.Unlock()
// Table versions have changed while we were looking: don't cache the data.
if len(c.tableVersions) != len(tableVersions) {
panic(errors.AssertionFailedf("cache.tableVersions slice must be the same len as tableVersions, c.tableVersions: %v, tableVersions: %v", c.tableVersions, tableVersions))
}
for i := 0; i < len(c.tableVersions); i++ {
if c.tableVersions[i] != tableVersions[i] {
return false
}
}
// Table version remains the same: update map, unlock, return.
const sizeOfEntry = int(unsafe.Sizeof(entry))
if err := c.boundAccount.Grow(ctx, int64(sizeOfEntry)); err != nil {
// If there is no memory available to cache the entry, we can still
// proceed with authentication so that users are not locked out of
// the database.
log.Ops.Warningf(ctx, "no memory available to cache info: %v", err)
} else {
c.cache[key] = entry
}
return true
}
// ClearCacheIfStaleLocked compares the cached table versions to the current table
// versions. If the cached versions are older, the cache is cleared. If the
// cached versions are newer, then false is returned to indicate that the
// cached data should not be used.
// The cache must be locked while this is called.
func (c *Cache) ClearCacheIfStaleLocked(
ctx context.Context, tableVersions []descpb.DescriptorVersion,
) (isEligibleForCache bool) {
if len(c.tableVersions) != len(tableVersions) {
panic(errors.AssertionFailedf("cache.tableVersions slice must be the same len as tableVersions, c.tableVersions: %v, tableVersions: %v", c.tableVersions, tableVersions))
}
for i := 0; i < len(c.tableVersions); i++ {
// If any table is out of date, clear the cache.
if c.tableVersions[i] < tableVersions[i] {
// If the cache is based on old table versions,
// then update versions and drop the map.
c.tableVersions = tableVersions
c.cache = make(map[interface{}]interface{})
c.boundAccount.Empty(ctx)
return false
}
}
for i := 0; i < len(c.tableVersions); i++ {
// If the cache is based on a newer table version, then
// don't use the cache for this transaction.
if c.tableVersions[i] > tableVersions[i] {
return false
}
}
return true
}