From 483fb05f3d4671ae844085ae5ac7d1c34307f442 Mon Sep 17 00:00:00 2001 From: Gerardo Torres Date: Wed, 9 Mar 2022 17:59:36 -0500 Subject: [PATCH] sql: add in-memory sessions FIFO cache for closed sessions. This the first phase of #67888 which simply adds a FIFO cache for closed sessions. The cache's eviction policy is both row-based and time-based. Both the capacity and the time to live can be set using the `settings` package. Further PRs will focus on exposing these closed sessions to the frontend via the `ListSessions` endpoint, optimizing the capacity and timeToLive variables, and making appropriate UI changes. Release note: None --- .../settings/settings-for-tenants.txt | 2 + docs/generated/settings/settings.html | 2 + pkg/sql/BUILD.bazel | 4 + pkg/sql/closed_session_cache.go | 146 ++++++++++++++++++ pkg/sql/closed_session_cache_test.go | 105 +++++++++++++ pkg/sql/testdata/closed_session_cache | 71 +++++++++ 6 files changed, 330 insertions(+) create mode 100644 pkg/sql/closed_session_cache.go create mode 100644 pkg/sql/closed_session_cache_test.go create mode 100644 pkg/sql/testdata/closed_session_cache diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 8275485b2fc6..bd0a9569fa1d 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -86,6 +86,8 @@ server.web_session.purge.period duration 1h0m0s the time until old sessions are server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid sql.auth.resolve_membership_single_scan.enabled boolean true determines whether to populate the role membership cache with a single scan +sql.closed_session_cache.capacity integer 100 the maximum number of sessions in the cache +sql.closed_session_cache.time_to_live integer 3600 the maximum time to live, in seconds sql.contention.event_store.capacity byte size 64 MiB the in-memory storage capacity per-node of contention event store sql.contention.event_store.duration_threshold duration 0s minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use (set to 0 to disable) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index fe736143bdb3..5ac781548d27 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -100,6 +100,8 @@ server.web_session.purge.ttlduration1h0m0sif nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeoutduration168h0m0sthe duration that a newly created web session will be valid sql.auth.resolve_membership_single_scan.enabledbooleantruedetermines whether to populate the role membership cache with a single scan +sql.closed_session_cache.capacityinteger100the maximum number of sessions in the cache +sql.closed_session_cache.time_to_liveinteger3600the maximum time to live, in seconds sql.contention.event_store.capacitybyte size64 MiBthe in-memory storage capacity per-node of contention event store sql.contention.event_store.duration_thresholdduration0sminimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events sql.contention.txn_id_cache.max_sizebyte size64 MiBthe maximum byte size TxnID cache will use (set to 0 to disable) diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index e66c79b0cc45..71bc28fc678b 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "cancel_queries.go", "cancel_sessions.go", "check.go", + "closed_session_cache.go", "cluster_wide_id.go", "comment_on_column.go", "comment_on_constraint.go", @@ -402,6 +403,7 @@ go_library( "//pkg/util/admission", "//pkg/util/bitarray", "//pkg/util/buildutil", + "//pkg/util/cache", "//pkg/util/cancelchecker", "//pkg/util/contextutil", "//pkg/util/ctxgroup", @@ -468,6 +470,7 @@ go_test( "backfill_test.go", "builtin_mem_usage_test.go", "builtin_test.go", + "closed_session_cache_test.go", "comment_on_column_test.go", "comment_on_constraint_test.go", "comment_on_database_test.go", @@ -698,6 +701,7 @@ go_test( "//pkg/util/timeutil", "//pkg/util/tracing", "//pkg/util/tracing/tracingpb", + "//pkg/util/uint128", "//pkg/util/uuid", "@com_github_cockroachdb_apd_v3//:apd", "@com_github_cockroachdb_cockroach_go_v2//crdb", diff --git a/pkg/sql/closed_session_cache.go b/pkg/sql/closed_session_cache.go new file mode 100644 index 000000000000..2ab42162ec98 --- /dev/null +++ b/pkg/sql/closed_session_cache.go @@ -0,0 +1,146 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// ClosedSessionCacheCapacity is the cluster setting that controls the maximum number +// of sessions in the cache. +var ClosedSessionCacheCapacity = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.closed_session_cache.capacity", + "the maximum number of sessions in the cache", + 100, // TODO(gtr): Totally arbitrary for now, adjust later. +).WithPublic() + +// ClosedSessionCacheTimeToLive is the cluster setting that controls the maximum time +// to live for a session's information in the cache, in seconds. +var ClosedSessionCacheTimeToLive = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.closed_session_cache.time_to_live", + "the maximum time to live, in seconds", + 3600, // One hour +).WithPublic() + +// ClosedSessionCache is an in-memory FIFO cache for closed sessions. +type ClosedSessionCache struct { + st *cluster.Settings + + mu struct { + syncutil.RWMutex + data *cache.UnorderedCache + } +} + +// NewClosedSessionCache returns a new ClosedSessionCache. +func NewClosedSessionCache(st *cluster.Settings) *ClosedSessionCache { + c := &ClosedSessionCache{st: st} + + c.mu.data = cache.NewUnorderedCache(cache.Config{ + Policy: cache.CacheFIFO, + ShouldEvict: func(size int, _, _ interface{}) bool { + capacity := ClosedSessionCacheCapacity.Get(&st.SV) + return int64(size) > capacity + }, + }) + + return c +} + +// Add adds a closed session to the ClosedSessionCache. +func (c *ClosedSessionCache) Add(id ClusterWideID, session serverpb.Session) { + c.mu.Lock() + defer c.mu.Unlock() + + node := &SessionNode{id: id, data: session, timestamp: timeutil.Now()} + c.mu.data.Add(id, node) +} + +func (c *ClosedSessionCache) size() int { + c.mu.Lock() + defer c.mu.Unlock() + + return c.mu.data.Len() +} + +// GetSerializedSessions returns a list of []serverpb.Session, ordered from +// newest to oldest in the cache. +func (c *ClosedSessionCache) GetSerializedSessions() []serverpb.Session { + var serializedSessions []serverpb.Session + + sessions := c.getSessions() + for _, s := range sessions { + serializedSessions = append(serializedSessions, s.data) + } + + return serializedSessions +} + +// getSessions iterates through each SessionNode, evicting those whose age +// exceeds the cache's maximum time to live, and returns a list of the +// remaining SessionNodes +func (c *ClosedSessionCache) getSessions() []*SessionNode { + c.mu.Lock() + defer c.mu.Unlock() + + var result []*SessionNode + var sessionsToEvict []*SessionNode + + c.mu.data.Do(func(entry *cache.Entry) { + node := entry.Value.(*SessionNode) + if int64(node.getAge().Seconds()) > ClosedSessionCacheTimeToLive.Get(&c.st.SV) { + sessionsToEvict = append(sessionsToEvict, node) + } else { + result = append(result, node) + } + }) + + c.evictSessionsLocked(sessionsToEvict) + return result +} + +func (c *ClosedSessionCache) evictSessionsLocked(toEvict []*SessionNode) { + for _, entry := range toEvict { + c.mu.data.Del(entry.id) + } +} + +func (c *ClosedSessionCache) clear() { + c.mu.Lock() + defer c.mu.Unlock() + + c.mu.data.Clear() +} + +// SessionNode encapsulates the session information that will be inserted into +// the cache. +type SessionNode struct { + id ClusterWideID + data serverpb.Session + timestamp time.Time +} + +func (n *SessionNode) getAge() time.Duration { + return timeutil.Since(n.timestamp) +} + +func (n *SessionNode) getAgeString() string { + return n.getAge().Round(time.Second).String() +} diff --git a/pkg/sql/closed_session_cache_test.go b/pkg/sql/closed_session_cache_test.go new file mode 100644 index 000000000000..5d7646b7e35b --- /dev/null +++ b/pkg/sql/closed_session_cache_test.go @@ -0,0 +1,105 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/uint128" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +func TestSessionCacheBasic(t *testing.T) { + defer leaktest.AfterTest(t)() + var cache *ClosedSessionCache + + datadriven.Walk(t, testutils.TestDataPath(t, "closed_session_cache"), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + var capacity, timeToLive int + d.ScanArgs(t, "capacity", &capacity) + d.ScanArgs(t, "timeToLive", &timeToLive) + + ctx := context.Background() + st := &cluster.Settings{} + cache = NewClosedSessionCache(st) + + ClosedSessionCacheCapacity.Override(ctx, &st.SV, int64(capacity)) + ClosedSessionCacheTimeToLive.Override(ctx, &st.SV, int64(timeToLive)) + + return fmt.Sprintf("cache_size: %d", cache.size()) + case "addSession": + var idStr string + d.ScanArgs(t, "id", &idStr) + id, err := uint128.FromString(idStr) + require.NoError(t, err) + + session := serverpb.Session{} + sessionID := ClusterWideID{id} + cache.Add(sessionID, session) + + return fmt.Sprintf("cache_size: %d", cache.size()) + case "addSessionBatch": + var startIDStr string + var sessions int + var seconds int + d.ScanArgs(t, "startId", &startIDStr) + d.ScanArgs(t, "sessions", &sessions) + d.ScanArgs(t, "seconds", &seconds) + id, err := uint128.FromString(startIDStr) + require.NoError(t, err) + + for i := 0; i < sessions; i++ { + time.Sleep(time.Duration(seconds) * time.Second) + session := serverpb.Session{} + sessionID := ClusterWideID{id} + cache.Add(sessionID, session) + id = id.Add(1) + } + + return fmt.Sprintf("cache_size: %d", cache.size()) + case "wait": + var secondsStr int + d.ScanArgs(t, "seconds", &secondsStr) + time.Sleep(time.Duration(secondsStr) * time.Second) + + return "ok" + case "clear": + cache.clear() + + return fmt.Sprintf("cache_size: %d", cache.size()) + case "show": + var result []string + + sessions := cache.getSessions() + for _, s := range sessions { + result = append(result, fmt.Sprintf("id: %s age: %s session: {}", s.id, s.getAgeString())) + } + if len(result) == 0 { + return "empty" + } + return strings.Join(result, "\n") + } + return "" + }) + }) + +} diff --git a/pkg/sql/testdata/closed_session_cache b/pkg/sql/testdata/closed_session_cache new file mode 100644 index 000000000000..3b3fa8e2d937 --- /dev/null +++ b/pkg/sql/testdata/closed_session_cache @@ -0,0 +1,71 @@ +# Initialize a cache with a capacity of 4 members and a timeToLive of 10 +# seconds. +init capacity=4 timeToLive=10 +---- +cache_size: 0 + +# Add 6 (empty) sessions with ids ranging from 00 to 05, inclusive. +addSession id=00 +---- +cache_size: 1 + +addSession id=01 +---- +cache_size: 2 + +addSession id=02 +---- +cache_size: 3 + +addSession id=03 +---- +cache_size: 4 + +addSession id=04 +---- +cache_size: 4 + +addSession id=05 +---- +cache_size: 4 + +# There have been 6 sessions added but the capacity is 4. The sessions with +# id=00 an id=01 should have been evicted. +show +---- +id: 00000000000000000000000000000005 age: 0s session: {} +id: 00000000000000000000000000000004 age: 0s session: {} +id: 00000000000000000000000000000003 age: 0s session: {} +id: 00000000000000000000000000000002 age: 0s session: {} + +# Wait for 15 seconds, enough time for all sessions in the cache to be evicted. +wait seconds=15 +---- +ok + +# The cache should be empty now. +show +---- +empty + +# Once the cache is empty, add a session every 3 seconds for a total of 4 +# sessions and 12 seconds. The cache should be full. +addSessionBatch startId=06 sessions=4 seconds=3 +---- +cache_size: 4 + +# Wait 5 seconds so that that the oldest session is 17 seconds old and the +# newest is 5 seconds old. +wait seconds=5 +---- +ok + +# After 15 seconds, only sessions added in the last 10 seconds should be in the +# cache, those with id=08 (8 seconds old) and id=09 (5 seconds old). Note that +# the session with id=07 would be 11 seconds old, which is greater than the +# allowed timeToLive so that session and all sessions added before that one +# should have been evicted. +show +---- +id: 00000000000000000000000000000009 age: 5s session: {} +id: 00000000000000000000000000000008 age: 8s session: {}