Skip to content

Commit

Permalink
sql: add in-memory sessions FIFO cache for closed sessions.
Browse files Browse the repository at this point in the history
This the first phase of cockroachdb#67888 which simply adds a FIFO cache for closed
sessions. The cache's eviction policy is both row-based and time-based.
Both the capacity and the time to live can be set using the `settings`
package.

Further PRs will focus on exposing these closed sessions to the frontend via
the `ListSessions` endpoint, optimizing the capacity and timeToLive
variables, and making appropriate UI changes.

Release note: None
  • Loading branch information
Gerardo Torres committed Mar 23, 2022
1 parent e0a4c51 commit 72afc16
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ sql.multiple_modifications_of_table.enabled boolean false if true, allow stateme
sql.multiregion.drop_primary_region.enabled boolean true allows dropping the PRIMARY REGION of a database if it is the last region
sql.notices.enabled boolean true enable notices in the server/client protocol being sent
sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability
sql.session_cache.capacity integer 100 the maximum number of sessions in the cache
sql.session_cache.time_to_live integer 3600 the maximum time to live, in seconds
sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators
sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode
sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@
<tr><td><code>sql.multiregion.drop_primary_region.enabled</code></td><td>boolean</td><td><code>true</code></td><td>allows dropping the PRIMARY REGION of a database if it is the last region</td></tr>
<tr><td><code>sql.notices.enabled</code></td><td>boolean</td><td><code>true</code></td><td>enable notices in the server/client protocol being sent</td></tr>
<tr><td><code>sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability</td></tr>
<tr><td><code>sql.session_cache.capacity</code></td><td>integer</td><td><code>100</code></td><td>the maximum number of sessions in the cache</td></tr>
<tr><td><code>sql.session_cache.time_to_live</code></td><td>integer</td><td><code>3600</code></td><td>the maximum time to live, in seconds</td></tr>
<tr><td><code>sql.spatial.experimental_box2d_comparison_operators.enabled</code></td><td>boolean</td><td><code>false</code></td><td>enables the use of certain experimental box2d comparison operators</td></tr>
<tr><td><code>sql.stats.automatic_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>automatic statistics collection mode</td></tr>
<tr><td><code>sql.stats.automatic_collection.fraction_stale_rows</code></td><td>float</td><td><code>0.2</code></td><td>target fraction of stale rows per table that will trigger a statistics refresh</td></tr>
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"cancel_queries.go",
"cancel_sessions.go",
"check.go",
"closed_session_cache.go",
"cluster_wide_id.go",
"comment_on_column.go",
"comment_on_constraint.go",
Expand Down Expand Up @@ -402,6 +403,7 @@ go_library(
"//pkg/util/admission",
"//pkg/util/bitarray",
"//pkg/util/buildutil",
"//pkg/util/cache",
"//pkg/util/cancelchecker",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
Expand Down Expand Up @@ -468,6 +470,7 @@ go_test(
"backfill_test.go",
"builtin_mem_usage_test.go",
"builtin_test.go",
"closed_session_cache_test.go",
"comment_on_column_test.go",
"comment_on_constraint_test.go",
"comment_on_database_test.go",
Expand Down Expand Up @@ -698,6 +701,7 @@ go_test(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uint128",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
Expand Down
146 changes: 146 additions & 0 deletions pkg/sql/closed_session_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"time"

"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// ClosedSessionCacheCapacity is the cluster setting that controls the maximum number
// of sessions in the cache.
var ClosedSessionCacheCapacity = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.closed_session_cache.capacity",
"the maximum number of sessions in the cache",
100, // TODO(gtr): Totally arbitrary for now, adjust later.
).WithPublic()

// ClosedSessionCacheTimeToLive is the cluster setting that controls the maximum time
// to live for a session's information in the cache, in seconds.
var ClosedSessionCacheTimeToLive = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.closed_session_cache.time_to_live",
"the maximum time to live, in seconds",
3600, // One hour
).WithPublic()

// ClosedSessionCache is an in-memory FIFO cache for closed sessions.
type ClosedSessionCache struct {
st *cluster.Settings

mu struct {
syncutil.RWMutex
data *cache.UnorderedCache
}
}

// NewClosedSessionCache returns a new ClosedSessionCache.
func NewClosedSessionCache(st *cluster.Settings) *ClosedSessionCache {
c := &ClosedSessionCache{st: st}

c.mu.data = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheFIFO,
ShouldEvict: func(size int, _, _ interface{}) bool {
capacity := ClosedSessionCacheCapacity.Get(&st.SV)
return int64(size) > capacity
},
})

return c
}

// Add adds a closed session to the ClosedSessionCache.
func (c *ClosedSessionCache) Add(id ClusterWideID, session serverpb.Session) {
c.mu.Lock()
defer c.mu.Unlock()

node := &SessionNode{id: id, data: session, timestamp: timeutil.Now()}
c.mu.data.Add(id, node)
}

func (c *ClosedSessionCache) size() int {
c.mu.Lock()
defer c.mu.Unlock()

return c.mu.data.Len()
}

// GetSerializedSessions returns a list of []serverpb.Session, ordered from
// newest to oldest in the cache.
func (c *ClosedSessionCache) GetSerializedSessions() []serverpb.Session {
var serializedSessions []serverpb.Session

sessions := c.getSessions()
for _, s := range sessions {
serializedSessions = append(serializedSessions, s.data)
}

return serializedSessions
}

// getSessions iterates through each SessionNode, evicting those whose age
// exceeds the cache's maximum time to live, and returns a list of the
// remaining SessionNodes
func (c *ClosedSessionCache) getSessions() []*SessionNode {
c.mu.Lock()
defer c.mu.Unlock()

var result []*SessionNode
var sessionsToEvict []*SessionNode

c.mu.data.Do(func(entry *cache.Entry) {
node := entry.Value.(*SessionNode)
if int64(node.getAge().Seconds()) > ClosedSessionCacheTimeToLive.Get(&c.st.SV) {
sessionsToEvict = append(sessionsToEvict, node)
} else {
result = append(result, node)
}
})

c.evictSessionsLocked(sessionsToEvict)
return result
}

func (c *ClosedSessionCache) evictSessionsLocked(toEvict []*SessionNode) {
for _, entry := range toEvict {
c.mu.data.Del(entry.id)
}
}

func (c *ClosedSessionCache) clear() {
c.mu.Lock()
defer c.mu.Unlock()

c.mu.data.Clear()
}

// SessionNode encapsulates the session information that will be inserted into
// the cache.
type SessionNode struct {
id ClusterWideID
data serverpb.Session
timestamp time.Time
}

func (n *SessionNode) getAge() time.Duration {
return timeutil.Since(n.timestamp)
}

func (n *SessionNode) getAgeString() string {
return n.getAge().Round(time.Second).String()
}
101 changes: 101 additions & 0 deletions pkg/sql/closed_session_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)

func TestSessionCacheBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
var cache *ClosedSessionCache

datadriven.Walk(t, testutils.TestDataPath(t, "closed_session_cache"), func(t *testing.T, path string) {
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
var capacity, timeToLive int
d.ScanArgs(t, "capacity", &capacity)
d.ScanArgs(t, "timeToLive", &timeToLive)

ctx := context.Background()
st := &cluster.Settings{}
cache = NewClosedSessionCache(st)

ClosedSessionCacheCapacity.Override(ctx, &st.SV, int64(capacity))
ClosedSessionCacheTimeToLive.Override(ctx, &st.SV, int64(timeToLive))

return fmt.Sprintf("cache_size: %d", cache.size())
case "addSession":
var idStr string
d.ScanArgs(t, "id", &idStr)
id, err := uint128.FromString(idStr)
require.NoError(t, err)

session := serverpb.Session{}
sessionID := ClusterWideID{id}
cache.Add(sessionID, session)

return fmt.Sprintf("cache_size: %d", cache.size())
case "addSessionBatch":
var startIDStr string
var sessions int
var seconds int
d.ScanArgs(t, "startId", &startIDStr)
d.ScanArgs(t, "sessions", &sessions)
d.ScanArgs(t, "seconds", &seconds)
id, err := uint128.FromString(startIDStr)
require.NoError(t, err)

for i := 0; i < sessions; i++ {
time.Sleep(time.Duration(seconds) * time.Second)
session := serverpb.Session{}
sessionID := ClusterWideID{id}
cache.Add(sessionID, session)
id = id.Add(1)
}

return fmt.Sprintf("cache_size: %d", cache.size())
case "wait":
var secondsStr int
d.ScanArgs(t, "seconds", &secondsStr)
time.Sleep(time.Duration(secondsStr) * time.Second)

return "ok"
case "show":
var result []string

sessions := cache.getSessions()
for _, s := range sessions {
result = append(result, fmt.Sprintf("id: %s age: %s session: {}", s.id, s.getAgeString()))
}
if len(result) == 0 {
return "empty"
}
return strings.Join(result, "\n")
}
return ""
})
})

}
71 changes: 71 additions & 0 deletions pkg/sql/testdata/closed_session_cache
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Initialize a cache with a capacity of 4 members and a timeToLive of 10
# seconds.
init capacity=4 timeToLive=10
----
cache_size: 0

# Add 6 (empty) sessions with ids ranging from 00 to 05, inclusive.
addSession id=00
----
cache_size: 1

addSession id=01
----
cache_size: 2

addSession id=02
----
cache_size: 3

addSession id=03
----
cache_size: 4

addSession id=04
----
cache_size: 4

addSession id=05
----
cache_size: 4

# There have been 6 sessions added but the capacity is 4. The sessions with
# id=00 an id=01 should have been evicted.
show
----
id: 00000000000000000000000000000005 age: 0s session: {}
id: 00000000000000000000000000000004 age: 0s session: {}
id: 00000000000000000000000000000003 age: 0s session: {}
id: 00000000000000000000000000000002 age: 0s session: {}

# Wait for 15 seconds, enough time for all sessions in the cache to be evicted.
wait seconds=15
----
ok

# The cache should be empty now.
show
----
empty

# Once the cache is empty, add a session every 3 seconds for a total of 4
# sessions and 12 seconds. The cache should be full.
addSessionBatch startId=06 sessions=4 seconds=3
----
cache_size: 4

# Wait 5 seconds so that that the oldest session is 17 seconds old and the
# newest is 5 seconds old.
wait seconds=5
----
ok

# After 15 seconds, only sessions added in the last 10 seconds should be in the
# cache, those with id=08 (8 seconds old) and id=09 (5 seconds old). Note that
# the session with id=07 would be 11 seconds old, which is greater than the
# allowed timeToLive so that session and all sessions added before that one
# should have been evicted.
show
----
id: 00000000000000000000000000000009 age: 5s session: {}
id: 00000000000000000000000000000008 age: 8s session: {}

0 comments on commit 72afc16

Please sign in to comment.