Skip to content

Commit

Permalink
sql: add in-memory sessions FIFO cache for closed sessions.
Browse files Browse the repository at this point in the history
This the first phase of cockroachdb#67888 which simply adds a FIFO cache for closed
sessions. The cache's eviction policy is both row-based and time-based.
Both the capacity and the time to live can be set using the `settings`
package.

Further PRs will focus on exposing these closed sessions to the frontend via
the `ListSessions` endpoint, optimizing the capacity and timeToLive
variables, and making appropriate UI changes.

Release justification: low risk high benefit change
Release note: None
  • Loading branch information
Gerardo Torres committed Mar 12, 2022
1 parent cfa9a7f commit fcd9964
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 0 deletions.
156 changes: 156 additions & 0 deletions pkg/sql/session_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"fmt"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

var SessionCacheCapacity = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.session_cache.capacity",
"the maximum number of sessions in the cache.",
100, // Totally arbitrary for now
).WithPublic()

var SessionCacheTimeToLive = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.session_cache.time_to_live",
"the maximum time to live, in seconds.",
3600, // One hour
).WithPublic()

// SessionCache is an in-memory FIFO cache for closed sessions.
type SessionCache struct {
st *cluster.Settings

mu struct {
syncutil.RWMutex
queue *cache.UnorderedCache
}
}

func NewSessionCache(st *cluster.Settings) *SessionCache {
c := &SessionCache{st: st}

c.mu.queue = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheFIFO,
ShouldEvict: func(size int, _, _ interface{}) bool {
capacity := SessionCacheCapacity.Get(&st.SV)
return int64(size) > capacity
},
})

return c
}

func (c *SessionCache) add(id ClusterWideID, session registrySession) {
c.mu.Lock()
defer c.mu.Unlock()

node := &SessionNode{id: id, data: session, timestamp: time.Now()}
c.mu.queue.Add(id, node)
}

func (c *SessionCache) size() int {
c.mu.Lock()
defer c.mu.Unlock()

return c.mu.queue.Len()
}

// getSerializedSessions returns a list of []serverpb.Session, ordered from
// newest to oldest in the cache.
func (c *SessionCache) getSerializedSessions() []serverpb.Session {
var serializedSessions []serverpb.Session

sessions := c.getSessions()
for _, s := range sessions {
serializedSessions = append(serializedSessions, s.data.serialize())
}

return serializedSessions
}

// viewCachedSessions returns a string representation of the sessions in the
// cache, ordered from newest to oldest. This function is used for testing
// purposes.
func (c *SessionCache) viewCachedSessions() string {
var result []string

sessions := c.getSessions()
for _, s := range sessions {
result = append(result, fmt.Sprintf("id: %s age: %s session: {}", s.id, s.getAgeString()))
}

if len(result) == 0 {
return "empty"
}
return strings.Join(result, "\n")
}

// getSessions iterates through each SessionNode, evicting those whose age
// exceeds the cache's maximum time to live, and returns a list of the
// remaining SessionNodes
func (c *SessionCache) getSessions() []*SessionNode {
c.mu.Lock()
defer c.mu.Unlock()

var result []*SessionNode
var sessionsToEvict []*SessionNode

c.mu.queue.Do(func(entry *cache.Entry) {
node := entry.Value.(*SessionNode)
if int64(node.getAge().Seconds()) > SessionCacheTimeToLive.Get(&c.st.SV) {
sessionsToEvict = append(sessionsToEvict, node)
} else {
result = append(result, node)
}
})

c.evictSessionsLocked(sessionsToEvict)
return result
}

func (c *SessionCache) evictSessionsLocked(toEvict []*SessionNode) {
for _, entry := range toEvict {
c.mu.queue.Del(entry.id)
}
}

func (c *SessionCache) clear() {
c.mu.Lock()
defer c.mu.Unlock()

c.mu.queue.Clear()
}

type SessionNode struct {
id ClusterWideID
data registrySession
timestamp time.Time
}

func (n *SessionNode) getAge() time.Duration {
return time.Now().Sub(n.timestamp)
}

func (n *SessionNode) getAgeString() string {
return fmt.Sprintf("%s", n.getAge().Round(time.Second))
}
88 changes: 88 additions & 0 deletions pkg/sql/session_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)

func TestSessionCacheBasic(t *testing.T) {
var cache *SessionCache

datadriven.Walk(t, testutils.TestDataPath(t), func(t *testing.T, path string) {
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
var capacity, timeToLive int
d.ScanArgs(t, "capacity", &capacity)
d.ScanArgs(t, "timeToLive", &timeToLive)

ctx := context.Background()
st := &cluster.Settings{}
cache = NewSessionCache(st)

SessionCacheCapacity.Override(ctx, &st.SV, int64(capacity))
SessionCacheTimeToLive.Override(ctx, &st.SV, int64(timeToLive))

return fmt.Sprintf("cache_size: %d", cache.size())
case "addSession":
var idStr string
d.ScanArgs(t, "id", &idStr)
id, err := uint128.FromString(idStr)
require.NoError(t, err)

session := &connExecutor{}
sessionId := ClusterWideID{id}
cache.add(sessionId, session)

return fmt.Sprintf("cache_size: %d", cache.size())
case "addSessionBatch":
var startIdStr string
var sessions int
var seconds int
d.ScanArgs(t, "startId", &startIdStr)
d.ScanArgs(t, "sessions", &sessions)
d.ScanArgs(t, "seconds", &seconds)
id, err := uint128.FromString(startIdStr)
require.NoError(t, err)

for i := 0; i < sessions; i++ {
time.Sleep(time.Duration(seconds) * time.Second)
session := &connExecutor{}
sessionId := ClusterWideID{id}
cache.add(sessionId, session)
id = id.Add(1)
}

return fmt.Sprintf("cache_size: %d", cache.size())
case "wait":
var secondsStr int
d.ScanArgs(t, "seconds", &secondsStr)
time.Sleep(time.Duration(secondsStr) * time.Second)

return "ok"
case "show":
return cache.viewCachedSessions()
}
return ""
})
})

}
71 changes: 71 additions & 0 deletions pkg/sql/testdata/session_cache
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Initialize a cache with a capacity of 4 members and a timeToLive of 10
# seconds.
init capacity=4 timeToLive=10
----
cache_size: 0

# Add 6 (empty) sessions with ids ranging from 00 to 05, inclusive.
addSession id=00
----
cache_size: 1

addSession id=01
----
cache_size: 2

addSession id=02
----
cache_size: 3

addSession id=03
----
cache_size: 4

addSession id=04
----
cache_size: 4

addSession id=05
----
cache_size: 4

# There have been 6 sessions added but the capacity is 4. The sessions with
# id=00 an id=01 should have been evicted.
show
----
id: 00000000000000000000000000000005 age: 0s session: {}
id: 00000000000000000000000000000004 age: 0s session: {}
id: 00000000000000000000000000000003 age: 0s session: {}
id: 00000000000000000000000000000002 age: 0s session: {}

# Wait for 15 seconds, enough time for all sessions in the cache to be evicted.
wait seconds=15
----
ok

# The cache should be empty now.
show
----
empty

# Once the cache is empty, add a session every 3 seconds for a total of 4
# sessions and 12 seconds. The cache should be full.
addSessionBatch startId=06 sessions=4 seconds=3
----
cache_size: 4

# Wait 5 seconds so that that the oldest session is 17 seconds old and the
# newest is 5 seconds old.
wait seconds=5
----
ok

# After 15 seconds, only sessions added in the last 10 seconds should be in the
# cache, those with id=08 (8 seconds old) and id=09 (5 seconds old). Note that
# the session with id=07 would be 11 seconds old, which is greater than the
# allowed timeToLive so that session and all sessions added before that one
# should have been evicted.
show
----
id: 00000000000000000000000000000009 age: 5s session: {}
id: 00000000000000000000000000000008 age: 8s session: {}

0 comments on commit fcd9964

Please sign in to comment.