Skip to content

Commit

Permalink
sql: add in-memory sessions FIFO cache for closed sessions.
Browse files Browse the repository at this point in the history
This the first phase of cockroachdb#67888 which simply adds a FIFO cache for closed
sessions. The cache's eviction policy is both row-based and time-based.
Both the capacity and the time to live can be set using the `settings`
package.

Further PRs will focus on exposing these closed sessions to the frontend via
the `ListSessions` endpoint, optimizing the capacity and timeToLive
variables, and making appropriate UI changes.

Release note: None
  • Loading branch information
Gerardo Torres committed Mar 23, 2022
1 parent e0a4c51 commit 483fb05
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ server.web_session.purge.period duration 1h0m0s the time until old sessions are
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
sql.auth.resolve_membership_single_scan.enabled boolean true determines whether to populate the role membership cache with a single scan
sql.closed_session_cache.capacity integer 100 the maximum number of sessions in the cache
sql.closed_session_cache.time_to_live integer 3600 the maximum time to live, in seconds
sql.contention.event_store.capacity byte size 64 MiB the in-memory storage capacity per-node of contention event store
sql.contention.event_store.duration_threshold duration 0s minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events
sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use (set to 0 to disable)
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
<tr><td><code>server.web_session.purge.ttl</code></td><td>duration</td><td><code>1h0m0s</code></td><td>if nonzero, entries in system.web_sessions older than this duration are periodically purged</td></tr>
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.auth.resolve_membership_single_scan.enabled</code></td><td>boolean</td><td><code>true</code></td><td>determines whether to populate the role membership cache with a single scan</td></tr>
<tr><td><code>sql.closed_session_cache.capacity</code></td><td>integer</td><td><code>100</code></td><td>the maximum number of sessions in the cache</td></tr>
<tr><td><code>sql.closed_session_cache.time_to_live</code></td><td>integer</td><td><code>3600</code></td><td>the maximum time to live, in seconds</td></tr>
<tr><td><code>sql.contention.event_store.capacity</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the in-memory storage capacity per-node of contention event store</td></tr>
<tr><td><code>sql.contention.event_store.duration_threshold</code></td><td>duration</td><td><code>0s</code></td><td>minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events</td></tr>
<tr><td><code>sql.contention.txn_id_cache.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the maximum byte size TxnID cache will use (set to 0 to disable)</td></tr>
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"cancel_queries.go",
"cancel_sessions.go",
"check.go",
"closed_session_cache.go",
"cluster_wide_id.go",
"comment_on_column.go",
"comment_on_constraint.go",
Expand Down Expand Up @@ -402,6 +403,7 @@ go_library(
"//pkg/util/admission",
"//pkg/util/bitarray",
"//pkg/util/buildutil",
"//pkg/util/cache",
"//pkg/util/cancelchecker",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
Expand Down Expand Up @@ -468,6 +470,7 @@ go_test(
"backfill_test.go",
"builtin_mem_usage_test.go",
"builtin_test.go",
"closed_session_cache_test.go",
"comment_on_column_test.go",
"comment_on_constraint_test.go",
"comment_on_database_test.go",
Expand Down Expand Up @@ -698,6 +701,7 @@ go_test(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uint128",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
Expand Down
146 changes: 146 additions & 0 deletions pkg/sql/closed_session_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"time"

"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// ClosedSessionCacheCapacity is the cluster setting that controls the maximum number
// of sessions in the cache.
var ClosedSessionCacheCapacity = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.closed_session_cache.capacity",
"the maximum number of sessions in the cache",
100, // TODO(gtr): Totally arbitrary for now, adjust later.
).WithPublic()

// ClosedSessionCacheTimeToLive is the cluster setting that controls the maximum time
// to live for a session's information in the cache, in seconds.
var ClosedSessionCacheTimeToLive = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.closed_session_cache.time_to_live",
"the maximum time to live, in seconds",
3600, // One hour
).WithPublic()

// ClosedSessionCache is an in-memory FIFO cache for closed sessions.
type ClosedSessionCache struct {
st *cluster.Settings

mu struct {
syncutil.RWMutex
data *cache.UnorderedCache
}
}

// NewClosedSessionCache returns a new ClosedSessionCache.
func NewClosedSessionCache(st *cluster.Settings) *ClosedSessionCache {
c := &ClosedSessionCache{st: st}

c.mu.data = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheFIFO,
ShouldEvict: func(size int, _, _ interface{}) bool {
capacity := ClosedSessionCacheCapacity.Get(&st.SV)
return int64(size) > capacity
},
})

return c
}

// Add adds a closed session to the ClosedSessionCache.
func (c *ClosedSessionCache) Add(id ClusterWideID, session serverpb.Session) {
c.mu.Lock()
defer c.mu.Unlock()

node := &SessionNode{id: id, data: session, timestamp: timeutil.Now()}
c.mu.data.Add(id, node)
}

func (c *ClosedSessionCache) size() int {
c.mu.Lock()
defer c.mu.Unlock()

return c.mu.data.Len()
}

// GetSerializedSessions returns a list of []serverpb.Session, ordered from
// newest to oldest in the cache.
func (c *ClosedSessionCache) GetSerializedSessions() []serverpb.Session {
var serializedSessions []serverpb.Session

sessions := c.getSessions()
for _, s := range sessions {
serializedSessions = append(serializedSessions, s.data)
}

return serializedSessions
}

// getSessions iterates through each SessionNode, evicting those whose age
// exceeds the cache's maximum time to live, and returns a list of the
// remaining SessionNodes
func (c *ClosedSessionCache) getSessions() []*SessionNode {
c.mu.Lock()
defer c.mu.Unlock()

var result []*SessionNode
var sessionsToEvict []*SessionNode

c.mu.data.Do(func(entry *cache.Entry) {
node := entry.Value.(*SessionNode)
if int64(node.getAge().Seconds()) > ClosedSessionCacheTimeToLive.Get(&c.st.SV) {
sessionsToEvict = append(sessionsToEvict, node)
} else {
result = append(result, node)
}
})

c.evictSessionsLocked(sessionsToEvict)
return result
}

func (c *ClosedSessionCache) evictSessionsLocked(toEvict []*SessionNode) {
for _, entry := range toEvict {
c.mu.data.Del(entry.id)
}
}

func (c *ClosedSessionCache) clear() {
c.mu.Lock()
defer c.mu.Unlock()

c.mu.data.Clear()
}

// SessionNode encapsulates the session information that will be inserted into
// the cache.
type SessionNode struct {
id ClusterWideID
data serverpb.Session
timestamp time.Time
}

func (n *SessionNode) getAge() time.Duration {
return timeutil.Since(n.timestamp)
}

func (n *SessionNode) getAgeString() string {
return n.getAge().Round(time.Second).String()
}
105 changes: 105 additions & 0 deletions pkg/sql/closed_session_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)

func TestSessionCacheBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
var cache *ClosedSessionCache

datadriven.Walk(t, testutils.TestDataPath(t, "closed_session_cache"), func(t *testing.T, path string) {
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
var capacity, timeToLive int
d.ScanArgs(t, "capacity", &capacity)
d.ScanArgs(t, "timeToLive", &timeToLive)

ctx := context.Background()
st := &cluster.Settings{}
cache = NewClosedSessionCache(st)

ClosedSessionCacheCapacity.Override(ctx, &st.SV, int64(capacity))
ClosedSessionCacheTimeToLive.Override(ctx, &st.SV, int64(timeToLive))

return fmt.Sprintf("cache_size: %d", cache.size())
case "addSession":
var idStr string
d.ScanArgs(t, "id", &idStr)
id, err := uint128.FromString(idStr)
require.NoError(t, err)

session := serverpb.Session{}
sessionID := ClusterWideID{id}
cache.Add(sessionID, session)

return fmt.Sprintf("cache_size: %d", cache.size())
case "addSessionBatch":
var startIDStr string
var sessions int
var seconds int
d.ScanArgs(t, "startId", &startIDStr)
d.ScanArgs(t, "sessions", &sessions)
d.ScanArgs(t, "seconds", &seconds)
id, err := uint128.FromString(startIDStr)
require.NoError(t, err)

for i := 0; i < sessions; i++ {
time.Sleep(time.Duration(seconds) * time.Second)
session := serverpb.Session{}
sessionID := ClusterWideID{id}
cache.Add(sessionID, session)
id = id.Add(1)
}

return fmt.Sprintf("cache_size: %d", cache.size())
case "wait":
var secondsStr int
d.ScanArgs(t, "seconds", &secondsStr)
time.Sleep(time.Duration(secondsStr) * time.Second)

return "ok"
case "clear":
cache.clear()

return fmt.Sprintf("cache_size: %d", cache.size())
case "show":
var result []string

sessions := cache.getSessions()
for _, s := range sessions {
result = append(result, fmt.Sprintf("id: %s age: %s session: {}", s.id, s.getAgeString()))
}
if len(result) == 0 {
return "empty"
}
return strings.Join(result, "\n")
}
return ""
})
})

}
71 changes: 71 additions & 0 deletions pkg/sql/testdata/closed_session_cache
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Initialize a cache with a capacity of 4 members and a timeToLive of 10
# seconds.
init capacity=4 timeToLive=10
----
cache_size: 0

# Add 6 (empty) sessions with ids ranging from 00 to 05, inclusive.
addSession id=00
----
cache_size: 1

addSession id=01
----
cache_size: 2

addSession id=02
----
cache_size: 3

addSession id=03
----
cache_size: 4

addSession id=04
----
cache_size: 4

addSession id=05
----
cache_size: 4

# There have been 6 sessions added but the capacity is 4. The sessions with
# id=00 an id=01 should have been evicted.
show
----
id: 00000000000000000000000000000005 age: 0s session: {}
id: 00000000000000000000000000000004 age: 0s session: {}
id: 00000000000000000000000000000003 age: 0s session: {}
id: 00000000000000000000000000000002 age: 0s session: {}

# Wait for 15 seconds, enough time for all sessions in the cache to be evicted.
wait seconds=15
----
ok

# The cache should be empty now.
show
----
empty

# Once the cache is empty, add a session every 3 seconds for a total of 4
# sessions and 12 seconds. The cache should be full.
addSessionBatch startId=06 sessions=4 seconds=3
----
cache_size: 4

# Wait 5 seconds so that that the oldest session is 17 seconds old and the
# newest is 5 seconds old.
wait seconds=5
----
ok

# After 15 seconds, only sessions added in the last 10 seconds should be in the
# cache, those with id=08 (8 seconds old) and id=09 (5 seconds old). Note that
# the session with id=07 would be 11 seconds old, which is greater than the
# allowed timeToLive so that session and all sessions added before that one
# should have been evicted.
show
----
id: 00000000000000000000000000000009 age: 5s session: {}
id: 00000000000000000000000000000008 age: 8s session: {}

0 comments on commit 483fb05

Please sign in to comment.