Skip to content

Commit

Permalink
server, sql: expose closed sessions to ListSessions endpoint
Browse files Browse the repository at this point in the history
This is the second phase of #67888 which is to expose closed sessions to
the ListSessions endpoint.

Previously, the ListSessions endpoint only returned open sessions. This
change builds on top of the previous PR to add the `ClosedSessionsCache`
and now allows the ListSessions endpoint to also return closed sessions
in its response. Additionally, the `serverpb.Session` object was updated
to inlcude new `end` and `status` fields which specify the time the
session ended and the status (opened, closed) of the session,
respectively.

Release note (api change): `ListSessions` now returns closed sessions in
addition to open sessions; `serverpb.Session` now has `end` and `status`
fields.
  • Loading branch information
Gerardo Torres committed Mar 28, 2022
1 parent 0b7de82 commit fcfbed3
Show file tree
Hide file tree
Showing 16 changed files with 92 additions and 32 deletions.
4 changes: 4 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -2096,6 +2096,8 @@ Session represents one SQL session.
| max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) |
| active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) |
| last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) |
| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The sessions status. | [reserved](#support-status) |
| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) |



Expand Down Expand Up @@ -2227,6 +2229,8 @@ Session represents one SQL session.
| max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) |
| active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) |
| last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) |
| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The sessions status. | [reserved](#support-status) |
| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) |



Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ server.web_session.purge.period duration 1h0m0s the time until old sessions are
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
sql.auth.resolve_membership_single_scan.enabled boolean true determines whether to populate the role membership cache with a single scan
sql.closed_session_cache.capacity integer 100 the maximum number of sessions in the cache
sql.closed_session_cache.capacity integer 1000 the maximum number of sessions in the cache
sql.closed_session_cache.time_to_live integer 3600 the maximum time to live, in seconds
sql.contention.event_store.capacity byte size 64 MiB the in-memory storage capacity per-node of contention event store
sql.contention.event_store.duration_threshold duration 0s minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
<tr><td><code>server.web_session.purge.ttl</code></td><td>duration</td><td><code>1h0m0s</code></td><td>if nonzero, entries in system.web_sessions older than this duration are periodically purged</td></tr>
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.auth.resolve_membership_single_scan.enabled</code></td><td>boolean</td><td><code>true</code></td><td>determines whether to populate the role membership cache with a single scan</td></tr>
<tr><td><code>sql.closed_session_cache.capacity</code></td><td>integer</td><td><code>100</code></td><td>the maximum number of sessions in the cache</td></tr>
<tr><td><code>sql.closed_session_cache.capacity</code></td><td>integer</td><td><code>1000</code></td><td>the maximum number of sessions in the cache</td></tr>
<tr><td><code>sql.closed_session_cache.time_to_live</code></td><td>integer</td><td><code>3600</code></td><td>the maximum time to live, in seconds</td></tr>
<tr><td><code>sql.contention.event_store.capacity</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the in-memory storage capacity per-node of contention event store</td></tr>
<tr><td><code>sql.contention.event_store.duration_threshold</code></td><td>duration</td><td><code>0s</code></td><td>minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events</td></tr>
Expand Down
15 changes: 15 additions & 0 deletions docs/generated/swagger/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,12 @@
"type": "string",
"x-go-name": "ClientAddress"
},
"end": {
"description": "Timestamp of session's end.",
"type": "string",
"format": "date-time",
"x-go-name": "End"
},
"id": {
"description": "ID of the session (uint128 represented as raw bytes).",
"type": "array",
Expand Down Expand Up @@ -1190,6 +1196,9 @@
"format": "date-time",
"x-go-name": "Start"
},
"status": {
"$ref": "#/definitions/Session_Status"
},
"username": {
"description": "Username of the user for this session.",
"type": "string",
Expand All @@ -1198,6 +1207,12 @@
},
"x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb"
},
"Session_Status": {
"type": "integer",
"format": "int32",
"title": "Enum for sessions status.",
"x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb"
},
"StoreID": {
"type": "integer",
"format": "int32",
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
sHTTP := newHTTPServer(cfg.BaseConfig, rpcContext, parseNodeIDFn, getNodeIDHTTPAddressFn)

sessionRegistry := sql.NewSessionRegistry()
closedSessionCache := sql.NewClosedSessionCache(cfg.Settings, time.Now)
flowScheduler := flowinfra.NewFlowScheduler(cfg.AmbientCtx, stopper, st)

sStatus := newStatusServer(
Expand All @@ -690,6 +691,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
node.stores,
stopper,
sessionRegistry,
closedSessionCache,
flowScheduler,
internalExecutor,
)
Expand Down Expand Up @@ -739,6 +741,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
registry: registry,
recorder: recorder,
sessionRegistry: sessionRegistry,
closedSessionCache: closedSessionCache,
flowScheduler: flowScheduler,
circularInternalExecutor: internalExecutor,
circularJobRegistry: jobRegistry,
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type SQLServer struct {
// sessionRegistry can be queried for info on running SQL sessions. It is
// shared between the sql.Server and the statusServer.
sessionRegistry *sql.SessionRegistry
closedSessionCache *sql.ClosedSessionCache
jobRegistry *jobs.Registry
startupMigrationsMgr *startupmigrations.Manager
statsRefresher *stats.Refresher
Expand Down Expand Up @@ -266,6 +267,9 @@ type sqlServerArgs struct {
// Used for SHOW/CANCEL QUERIE(S)/SESSION(S).
sessionRegistry *sql.SessionRegistry

// Used to store closed sessions.
closedSessionCache *sql.ClosedSessionCache

// Used to track the DistSQL flows scheduled on this node but initiated on
// behalf of other nodes.
flowScheduler *flowinfra.FlowScheduler
Expand Down Expand Up @@ -695,6 +699,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
SQLStatusServer: cfg.sqlStatusServer,
RegionsServer: cfg.regionsServer,
SessionRegistry: cfg.sessionRegistry,
ClosedSessionCache: cfg.closedSessionCache,
ContentionRegistry: contentionRegistry,
SQLLiveness: cfg.sqlLivenessProvider,
JobRegistry: jobRegistry,
Expand Down Expand Up @@ -1025,6 +1030,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
tracingService: tracingService,
tenantConnect: cfg.tenantConnect,
sessionRegistry: cfg.sessionRegistry,
closedSessionCache: cfg.closedSessionCache,
jobRegistry: jobRegistry,
statsRefresher: statsRefresher,
temporaryObjectCleaner: temporaryObjectCleaner,
Expand Down
10 changes: 10 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,16 @@ message Session {
// The SQL statement fingerprint of the last query executed on this session,
// compatible with StatementStatisticsKey.
string last_active_query_no_constants = 13;
// Enum for sessions status.
enum Status {
OPEN = 0;
CLOSED = 1;
}
// The sessions status.
Status status = 14;
// Timestamp of session's end.
google.protobuf.Timestamp end = 15
[ (gogoproto.nullable) = true, (gogoproto.stdtime) = true ];
}

// An error wrapper object for ListSessionsResponse.
Expand Down
36 changes: 21 additions & 15 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,14 @@ type baseStatusServer struct {
serverpb.UnimplementedStatusServer

log.AmbientContext
privilegeChecker *adminPrivilegeChecker
sessionRegistry *sql.SessionRegistry
flowScheduler *flowinfra.FlowScheduler
st *cluster.Settings
sqlServer *SQLServer
rpcCtx *rpc.Context
stopper *stop.Stopper
privilegeChecker *adminPrivilegeChecker
sessionRegistry *sql.SessionRegistry
closedSessionCache *sql.ClosedSessionCache
flowScheduler *flowinfra.FlowScheduler
st *cluster.Settings
sqlServer *SQLServer
rpcCtx *rpc.Context
stopper *stop.Stopper
}

// getLocalSessions returns a list of local sessions on this node. Note that the
Expand Down Expand Up @@ -198,7 +199,10 @@ func (b *baseStatusServer) getLocalSessions(

registry := b.sessionRegistry
sessions := registry.SerializeAll()
userSessions := make([]serverpb.Session, 0, len(sessions))
closedSessions := b.closedSessionCache.GetSerializedSessions()

userSessions := make([]serverpb.Session, 0, len(sessions)+len(closedSessions))
sessions = append(sessions, closedSessions...)

for _, session := range sessions {
if reqUsername.Normalized() != session.Username && !showAll {
Expand Down Expand Up @@ -498,19 +502,21 @@ func newStatusServer(
stores *kvserver.Stores,
stopper *stop.Stopper,
sessionRegistry *sql.SessionRegistry,
closedSessionCache *sql.ClosedSessionCache,
flowScheduler *flowinfra.FlowScheduler,
internalExecutor *sql.InternalExecutor,
) *statusServer {
ambient.AddLogTag("status", nil)
server := &statusServer{
baseStatusServer: &baseStatusServer{
AmbientContext: ambient,
privilegeChecker: adminAuthzCheck,
sessionRegistry: sessionRegistry,
flowScheduler: flowScheduler,
st: st,
rpcCtx: rpcCtx,
stopper: stopper,
AmbientContext: ambient,
privilegeChecker: adminAuthzCheck,
sessionRegistry: sessionRegistry,
closedSessionCache: closedSessionCache,
flowScheduler: flowScheduler,
st: st,
rpcCtx: rpcCtx,
stopper: stopper,
},
cfg: cfg,
admin: adminServer,
Expand Down
4 changes: 3 additions & 1 deletion pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func startTenantInternal(
// the SQL server object.
tenantStatusServer := newTenantStatusServer(
baseCfg.AmbientCtx, &adminPrivilegeChecker{ie: args.circularInternalExecutor},
args.sessionRegistry, args.flowScheduler, baseCfg.Settings, nil,
args.sessionRegistry, args.closedSessionCache, args.flowScheduler, baseCfg.Settings, nil,
args.rpcContext, args.stopper,
)

Expand Down Expand Up @@ -525,6 +525,7 @@ func makeTenantSQLServerArgs(
grpcServer.setMode(modeOperational)

sessionRegistry := sql.NewSessionRegistry()
closedSessionCache := sql.NewClosedSessionCache(st, time.Now)
flowScheduler := flowinfra.NewFlowScheduler(baseCfg.AmbientCtx, stopper, st)
return sqlServerArgs{
sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{
Expand Down Expand Up @@ -560,6 +561,7 @@ func makeTenantSQLServerArgs(
registry: registry,
recorder: recorder,
sessionRegistry: sessionRegistry,
closedSessionCache: closedSessionCache,
flowScheduler: flowScheduler,
circularInternalExecutor: circularInternalExecutor,
circularJobRegistry: circularJobRegistry,
Expand Down
18 changes: 10 additions & 8 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func newTenantStatusServer(
ambient log.AmbientContext,
privilegeChecker *adminPrivilegeChecker,
sessionRegistry *sql.SessionRegistry,
closedSessionCache *sql.ClosedSessionCache,
flowScheduler *flowinfra.FlowScheduler,
st *cluster.Settings,
sqlServer *SQLServer,
Expand All @@ -90,14 +91,15 @@ func newTenantStatusServer(
ambient.AddLogTag("tenant-status", nil)
return &tenantStatusServer{
baseStatusServer: baseStatusServer{
AmbientContext: ambient,
privilegeChecker: privilegeChecker,
sessionRegistry: sessionRegistry,
flowScheduler: flowScheduler,
st: st,
sqlServer: sqlServer,
rpcCtx: rpcCtx,
stopper: stopper,
AmbientContext: ambient,
privilegeChecker: privilegeChecker,
sessionRegistry: sessionRegistry,
closedSessionCache: closedSessionCache,
flowScheduler: flowScheduler,
st: st,
sqlServer: sqlServer,
rpcCtx: rpcCtx,
stopper: stopper,
},
}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/sql/closed_session_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var ClosedSessionCacheCapacity = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.closed_session_cache.capacity",
"the maximum number of sessions in the cache",
100, // TODO(gtr): Totally arbitrary for now, adjust later.
1000, // TODO(gtr): Totally arbitrary for now, adjust later.
).WithPublic()

// ClosedSessionCacheTimeToLive is the cluster setting that controls the maximum time
Expand Down Expand Up @@ -66,11 +66,14 @@ func NewClosedSessionCache(st *cluster.Settings, timeSrc timeSource) *ClosedSess
return c
}

// Add adds a closed session to the ClosedSessionCache.
func (c *ClosedSessionCache) Add(id ClusterWideID, session serverpb.Session) {
// add adds a closed session to the ClosedSessionCache.
func (c *ClosedSessionCache) add(id ClusterWideID, session serverpb.Session) {
c.mu.Lock()
defer c.mu.Unlock()

end := time.Now()
session.End = &end
session.Status = serverpb.Session_CLOSED
node := &sessionNode{id: id, data: session, timestamp: c.timeSrc()}
c.mu.data.Add(id, node)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/closed_session_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestSessionCacheBasic(t *testing.T) {

session := serverpb.Session{}
sessionID := ClusterWideID{id}
cache.Add(sessionID, session)
cache.add(sessionID, session)

return fmt.Sprintf("cache_size: %d", cache.size())
case "addSessionBatch":
Expand All @@ -79,7 +79,7 @@ func TestSessionCacheBasic(t *testing.T) {
cache.timeSrc = addSeconds(cache.timeSrc, seconds)
session := serverpb.Session{}
sessionID := ClusterWideID{id}
cache.Add(sessionID, session)
cache.add(sessionID, session)
id = id.Add(1)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1776,6 +1776,7 @@ func (ex *connExecutor) run(
ex.server.cfg.SessionRegistry.register(ex.sessionID, ex.queryCancelKey, ex)
ex.planner.extendedEvalCtx.setSessionID(ex.sessionID)
defer ex.server.cfg.SessionRegistry.deregister(ex.sessionID, ex.queryCancelKey)
defer ex.server.cfg.ClosedSessionCache.add(ex.sessionID, ex.serialize())
for {
ex.curStmtAST = nil
if err := ctx.Err(); err != nil {
Expand Down Expand Up @@ -3091,6 +3092,7 @@ func (ex *connExecutor) serialize() serverpb.Session {
AllocBytes: ex.mon.AllocBytes(),
MaxAllocBytes: ex.mon.MaximumBytes(),
LastActiveQueryNoConstants: lastActiveQueryNoConstants,
Status: serverpb.Session_OPEN,
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ func startConnExecutor(
SystemConfig: config.NewConstantSystemConfigProvider(
config.NewSystemConfig(zonepb.DefaultZoneConfigRef()),
),
SessionRegistry: NewSessionRegistry(),
SessionRegistry: NewSessionRegistry(),
ClosedSessionCache: NewClosedSessionCache(st, time.Now),
NodeInfo: NodeInfo{
NodeID: nodeID,
ClusterID: func() uuid.UUID { return uuid.UUID{} },
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,7 @@ type ExecutorConfig struct {
RegionsServer serverpb.RegionsServer
MetricsRecorder nodeStatusGenerator
SessionRegistry *SessionRegistry
ClosedSessionCache *ClosedSessionCache
SQLLiveness sqlliveness.Liveness
JobRegistry *jobs.Registry
VirtualSchemas *VirtualSchemaHolder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import {
CancelSessionRequestMessage,
} from "src/api/terminateQueryApi";

import Status = cockroach.server.serverpb.Session.Status;

const history = createMemoryHistory({ initialEntries: ["/sessions"] });

const toUuid = function(s: string): Uint8Array {
Expand All @@ -45,6 +47,7 @@ export const idleSession: SessionInfo = {
alloc_bytes: Long.fromNumber(0),
max_alloc_bytes: Long.fromNumber(10240),
active_queries: [],
status: Status.OPEN,
toJSON: () => ({}),
},
};
Expand Down Expand Up @@ -82,6 +85,7 @@ export const idleTransactionSession: SessionInfo = {
},
last_active_query_no_constants: "SHOW database",
active_queries: [],
status: Status.OPEN,
toJSON: () => ({}),
},
};
Expand Down Expand Up @@ -133,6 +137,7 @@ export const activeSession: SessionInfo = {
num_auto_retries: 3,
},
last_active_query_no_constants: "SHOW database",
status: Status.OPEN,
toJSON: () => ({}),
},
};
Expand Down

0 comments on commit fcfbed3

Please sign in to comment.