diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 8275485b2fc6..fce87aabf0fc 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -160,6 +160,8 @@ sql.multiple_modifications_of_table.enabled boolean false if true, allow stateme sql.multiregion.drop_primary_region.enabled boolean true allows dropping the PRIMARY REGION of a database if it is the last region sql.notices.enabled boolean true enable notices in the server/client protocol being sent sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability +sql.session_cache.capacity integer 100 the maximum number of sessions in the cache +sql.session_cache.time_to_live integer 3600 the maximum time to live, in seconds sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index fe736143bdb3..529323b3f693 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -175,6 +175,8 @@ sql.multiregion.drop_primary_region.enabledbooleantrueallows dropping the PRIMARY REGION of a database if it is the last region sql.notices.enabledbooleantrueenable notices in the server/client protocol being sent sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabledbooleanfalseif enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability +sql.session_cache.capacityinteger100the maximum number of sessions in the cache +sql.session_cache.time_to_liveinteger3600the maximum time to live, in seconds sql.spatial.experimental_box2d_comparison_operators.enabledbooleanfalseenables the use of certain experimental box2d comparison operators sql.stats.automatic_collection.enabledbooleantrueautomatic statistics collection mode sql.stats.automatic_collection.fraction_stale_rowsfloat0.2target fraction of stale rows per table that will trigger a statistics refresh diff --git a/pkg/sql/session_cache.go b/pkg/sql/session_cache.go new file mode 100644 index 000000000000..5963e1a99035 --- /dev/null +++ b/pkg/sql/session_cache.go @@ -0,0 +1,165 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "fmt" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// SessionCacheCapacity is the cluster setting that controls the maximum number +// of sessions in the cache. +var SessionCacheCapacity = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.session_cache.capacity", + "the maximum number of sessions in the cache", + 100, // Totally arbitrary for now +).WithPublic() + +// SessionCacheTimeToLive is the cluster setting that controls the maxumum time +// to live for a session, in seconds. +var SessionCacheTimeToLive = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.session_cache.time_to_live", + "the maximum time to live, in seconds", + 3600, // One hour +).WithPublic() + +// SessionCache is an in-memory FIFO cache for closed sessions. +type SessionCache struct { + st *cluster.Settings + + mu struct { + syncutil.RWMutex + queue *cache.UnorderedCache + } +} + +// NewSessionCache returns a new session cache. +func NewSessionCache(st *cluster.Settings) *SessionCache { + c := &SessionCache{st: st} + + c.mu.queue = cache.NewUnorderedCache(cache.Config{ + Policy: cache.CacheFIFO, + ShouldEvict: func(size int, _, _ interface{}) bool { + capacity := SessionCacheCapacity.Get(&st.SV) + return int64(size) > capacity + }, + }) + + return c +} + +func (c *SessionCache) add(id ClusterWideID, session registrySession) { + c.mu.Lock() + defer c.mu.Unlock() + + node := &SessionNode{id: id, data: session, timestamp: timeutil.Now()} + c.mu.queue.Add(id, node) +} + +func (c *SessionCache) size() int { + c.mu.Lock() + defer c.mu.Unlock() + + return c.mu.queue.Len() +} + +// GetSerializedSessions returns a list of []serverpb.Session, ordered from +// newest to oldest in the cache. +func (c *SessionCache) GetSerializedSessions() []serverpb.Session { + var serializedSessions []serverpb.Session + + sessions := c.getSessions() + for _, s := range sessions { + serializedSessions = append(serializedSessions, s.data.serialize()) + } + + return serializedSessions +} + +// viewCachedSessions returns a string representation of the sessions in the +// cache, ordered from newest to oldest. This function is used for testing +// purposes. +func (c *SessionCache) viewCachedSessions() string { + var result []string + + sessions := c.getSessions() + for _, s := range sessions { + result = append(result, fmt.Sprintf("id: %s age: %s session: {}", s.id, s.getAgeString())) + } + + if len(result) == 0 { + return "empty" + } + return strings.Join(result, "\n") +} + +// getSessions iterates through each SessionNode, evicting those whose age +// exceeds the cache's maximum time to live, and returns a list of the +// remaining SessionNodes +func (c *SessionCache) getSessions() []*SessionNode { + c.mu.Lock() + defer c.mu.Unlock() + + var result []*SessionNode + var sessionsToEvict []*SessionNode + + c.mu.queue.Do(func(entry *cache.Entry) { + node := entry.Value.(*SessionNode) + if int64(node.getAge().Seconds()) > SessionCacheTimeToLive.Get(&c.st.SV) { + sessionsToEvict = append(sessionsToEvict, node) + } else { + result = append(result, node) + } + }) + + c.evictSessionsLocked(sessionsToEvict) + return result +} + +func (c *SessionCache) evictSessionsLocked(toEvict []*SessionNode) { + for _, entry := range toEvict { + c.mu.queue.Del(entry.id) + } +} + +// Clear clears the cache. +func (c *SessionCache) Clear() { + c.mu.Lock() + defer c.mu.Unlock() + + c.mu.queue.Clear() +} + +// SessionNode encapsulates the session information that will be inserted into +// the cache. +type SessionNode struct { + id ClusterWideID + data registrySession + timestamp time.Time +} + +func (n *SessionNode) getAge() time.Duration { + return timeutil.Since(n.timestamp) +} + +func (n *SessionNode) getAgeString() string { + return n.getAge().Round(time.Second).String() +} diff --git a/pkg/sql/session_cache_test.go b/pkg/sql/session_cache_test.go new file mode 100644 index 000000000000..b49d40047a77 --- /dev/null +++ b/pkg/sql/session_cache_test.go @@ -0,0 +1,90 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "fmt" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/uint128" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +func TestSessionCacheBasic(t *testing.T) { + defer leaktest.AfterTest(t)() + var cache *SessionCache + + datadriven.Walk(t, testutils.TestDataPath(t, "session_cache"), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + var capacity, timeToLive int + d.ScanArgs(t, "capacity", &capacity) + d.ScanArgs(t, "timeToLive", &timeToLive) + + ctx := context.Background() + st := &cluster.Settings{} + cache = NewSessionCache(st) + + SessionCacheCapacity.Override(ctx, &st.SV, int64(capacity)) + SessionCacheTimeToLive.Override(ctx, &st.SV, int64(timeToLive)) + + return fmt.Sprintf("cache_size: %d", cache.size()) + case "addSession": + var idStr string + d.ScanArgs(t, "id", &idStr) + id, err := uint128.FromString(idStr) + require.NoError(t, err) + + session := &connExecutor{} + sessionID := ClusterWideID{id} + cache.add(sessionID, session) + + return fmt.Sprintf("cache_size: %d", cache.size()) + case "addSessionBatch": + var startIDStr string + var sessions int + var seconds int + d.ScanArgs(t, "startId", &startIDStr) + d.ScanArgs(t, "sessions", &sessions) + d.ScanArgs(t, "seconds", &seconds) + id, err := uint128.FromString(startIDStr) + require.NoError(t, err) + + for i := 0; i < sessions; i++ { + time.Sleep(time.Duration(seconds) * time.Second) + session := &connExecutor{} + sessionID := ClusterWideID{id} + cache.add(sessionID, session) + id = id.Add(1) + } + + return fmt.Sprintf("cache_size: %d", cache.size()) + case "wait": + var secondsStr int + d.ScanArgs(t, "seconds", &secondsStr) + time.Sleep(time.Duration(secondsStr) * time.Second) + + return "ok" + case "show": + return cache.viewCachedSessions() + } + return "" + }) + }) + +} diff --git a/pkg/sql/testdata/session_cache b/pkg/sql/testdata/session_cache new file mode 100644 index 000000000000..3b3fa8e2d937 --- /dev/null +++ b/pkg/sql/testdata/session_cache @@ -0,0 +1,71 @@ +# Initialize a cache with a capacity of 4 members and a timeToLive of 10 +# seconds. +init capacity=4 timeToLive=10 +---- +cache_size: 0 + +# Add 6 (empty) sessions with ids ranging from 00 to 05, inclusive. +addSession id=00 +---- +cache_size: 1 + +addSession id=01 +---- +cache_size: 2 + +addSession id=02 +---- +cache_size: 3 + +addSession id=03 +---- +cache_size: 4 + +addSession id=04 +---- +cache_size: 4 + +addSession id=05 +---- +cache_size: 4 + +# There have been 6 sessions added but the capacity is 4. The sessions with +# id=00 an id=01 should have been evicted. +show +---- +id: 00000000000000000000000000000005 age: 0s session: {} +id: 00000000000000000000000000000004 age: 0s session: {} +id: 00000000000000000000000000000003 age: 0s session: {} +id: 00000000000000000000000000000002 age: 0s session: {} + +# Wait for 15 seconds, enough time for all sessions in the cache to be evicted. +wait seconds=15 +---- +ok + +# The cache should be empty now. +show +---- +empty + +# Once the cache is empty, add a session every 3 seconds for a total of 4 +# sessions and 12 seconds. The cache should be full. +addSessionBatch startId=06 sessions=4 seconds=3 +---- +cache_size: 4 + +# Wait 5 seconds so that that the oldest session is 17 seconds old and the +# newest is 5 seconds old. +wait seconds=5 +---- +ok + +# After 15 seconds, only sessions added in the last 10 seconds should be in the +# cache, those with id=08 (8 seconds old) and id=09 (5 seconds old). Note that +# the session with id=07 would be 11 seconds old, which is greater than the +# allowed timeToLive so that session and all sessions added before that one +# should have been evicted. +show +---- +id: 00000000000000000000000000000009 age: 5s session: {} +id: 00000000000000000000000000000008 age: 8s session: {}