diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md
index a7894badd201..e318ab43dd71 100644
--- a/docs/generated/http/full.md
+++ b/docs/generated/http/full.md
@@ -2063,6 +2063,7 @@ Request object for ListSessions and ListLocalSessions.
| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| username | [string](#cockroach.server.serverpb.ListSessionsRequest-string) | | Username of the user making this request. The caller is responsible to normalize the username (= case fold and perform unicode NFC normalization). | [reserved](#support-status) |
+| exclude_closed_sessions | [bool](#cockroach.server.serverpb.ListSessionsRequest-bool) | | Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response. | [reserved](#support-status) |
@@ -2108,6 +2109,8 @@ Session represents one SQL session.
| max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) |
| active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) |
| last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) |
+| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) |
+| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) |
@@ -2194,6 +2197,7 @@ Request object for ListSessions and ListLocalSessions.
| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| username | [string](#cockroach.server.serverpb.ListSessionsRequest-string) | | Username of the user making this request. The caller is responsible to normalize the username (= case fold and perform unicode NFC normalization). | [reserved](#support-status) |
+| exclude_closed_sessions | [bool](#cockroach.server.serverpb.ListSessionsRequest-bool) | | Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response. | [reserved](#support-status) |
@@ -2239,6 +2243,8 @@ Session represents one SQL session.
| max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) |
| active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) |
| last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) |
+| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) |
+| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) |
diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index e0b21475145d..651c41902924 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -87,7 +87,7 @@ server.web_session.purge.period duration 1h0m0s the time until old sessions are
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
sql.auth.resolve_membership_single_scan.enabled boolean true determines whether to populate the role membership cache with a single scan
-sql.closed_session_cache.capacity integer 100 the maximum number of sessions in the cache
+sql.closed_session_cache.capacity integer 1000 the maximum number of sessions in the cache
sql.closed_session_cache.time_to_live integer 3600 the maximum time to live, in seconds
sql.contention.event_store.capacity byte size 64 MiB the in-memory storage capacity per-node of contention event store
sql.contention.event_store.duration_threshold duration 0s minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index e28194f2d2af..2f5ac811597d 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -102,7 +102,7 @@
server.web_session.purge.ttl | duration | 1h0m0s | if nonzero, entries in system.web_sessions older than this duration are periodically purged |
server.web_session_timeout | duration | 168h0m0s | the duration that a newly created web session will be valid |
sql.auth.resolve_membership_single_scan.enabled | boolean | true | determines whether to populate the role membership cache with a single scan |
-sql.closed_session_cache.capacity | integer | 100 | the maximum number of sessions in the cache |
+sql.closed_session_cache.capacity | integer | 1000 | the maximum number of sessions in the cache |
sql.closed_session_cache.time_to_live | integer | 3600 | the maximum time to live, in seconds |
sql.contention.event_store.capacity | byte size | 64 MiB | the in-memory storage capacity per-node of contention event store |
sql.contention.event_store.duration_threshold | duration | 0s | minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events |
diff --git a/docs/generated/swagger/spec.json b/docs/generated/swagger/spec.json
index f6bd69dd9c43..69ffddbb0389 100644
--- a/docs/generated/swagger/spec.json
+++ b/docs/generated/swagger/spec.json
@@ -551,6 +551,12 @@
"name": "username",
"in": "query"
},
+ {
+ "type": "bool",
+ "description": "Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response.",
+ "name": "exclude_closed_sessions",
+ "in": "query"
+ },
{
"type": "integer",
"description": "Maximum number of results to return in this call.",
@@ -1180,6 +1186,12 @@
"type": "string",
"x-go-name": "ClientAddress"
},
+ "end": {
+ "description": "Timestamp of session's end.",
+ "type": "string",
+ "format": "date-time",
+ "x-go-name": "End"
+ },
"id": {
"description": "ID of the session (uint128 represented as raw bytes).",
"type": "array",
@@ -1214,6 +1226,9 @@
"format": "date-time",
"x-go-name": "Start"
},
+ "status": {
+ "$ref": "#/definitions/Session_Status"
+ },
"username": {
"description": "Username of the user for this session.",
"type": "string",
@@ -1222,6 +1237,12 @@
},
"x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb"
},
+ "Session_Status": {
+ "type": "integer",
+ "format": "int32",
+ "title": "Enum for sessions status.",
+ "x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb"
+ },
"StoreID": {
"type": "integer",
"format": "int32",
diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
index abd649705951..be2840a77b20 100644
--- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
+++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
@@ -242,15 +242,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0
----
id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries
-query ITTTTTTTTTTT colnames
+query ITTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0
----
-node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes
+node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
-query ITTTTTTTTTTT colnames
+query ITTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0
----
-node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes
+node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
query IIITTTI colnames
SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0
diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go
index ece17ea56773..9aeb0bdf2cda 100644
--- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go
+++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go
@@ -806,7 +806,8 @@ WHERE
func selectClusterSessionIDs(t *testing.T, conn *sqlutils.SQLRunner) []string {
var sessionIDs []string
- rows := conn.QueryStr(t, "SELECT session_id FROM crdb_internal.cluster_sessions")
+ rows := conn.QueryStr(t,
+ "SELECT session_id FROM crdb_internal.cluster_sessions WHERE status = 'ACTIVE' OR status = 'IDLE'")
for _, row := range rows {
sessionIDs = append(sessionIDs, row[0])
}
@@ -822,7 +823,7 @@ func testTenantStatusCancelSession(t *testing.T, helper *tenantTestHelper) {
httpPod1 := helper.testCluster().tenantAdminHTTPClient(t, 1)
defer httpPod1.Close()
listSessionsResp := serverpb.ListSessionsResponse{}
- httpPod1.GetJSON("/_status/sessions", &listSessionsResp)
+ httpPod1.GetJSON("/_status/sessions?exclude_closed_sessions=true", &listSessionsResp)
var session serverpb.Session
for _, s := range listSessionsResp.Sessions {
if s.LastActiveQuery == "SELECT 1" {
diff --git a/pkg/server/api_v2.go b/pkg/server/api_v2.go
index 8b8fb96fb5b7..2f4336b621ee 100644
--- a/pkg/server/api_v2.go
+++ b/pkg/server/api_v2.go
@@ -227,6 +227,12 @@ type listSessionsResponse struct {
// description: Username of user to return sessions for; if unspecified,
// sessions from all users are returned.
// required: false
+// - name: exclude_closed_sessions
+// type: bool
+// in: query
+// description: Boolean to exclude closed sessions; if unspecified, defaults
+// to false and closed sessions are included in the response.
+// required: false
// - name: limit
// type: integer
// in: query
@@ -250,7 +256,8 @@ func (a *apiV2Server) listSessions(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
limit, start := getRPCPaginationValues(r)
reqUsername := r.URL.Query().Get("username")
- req := &serverpb.ListSessionsRequest{Username: reqUsername}
+ reqExcludeClosed := r.URL.Query().Get("exclude_closed_sessions") == "true"
+ req := &serverpb.ListSessionsRequest{Username: reqUsername, ExcludeClosedSessions: reqExcludeClosed}
response := &listSessionsResponse{}
outgoingCtx := apiToOutgoingGatewayCtx(ctx, r)
diff --git a/pkg/server/api_v2_test.go b/pkg/server/api_v2_test.go
index ade26d5430b8..f6a7f444bdcd 100644
--- a/pkg/server/api_v2_test.go
+++ b/pkg/server/api_v2_test.go
@@ -59,6 +59,7 @@ func TestListSessionsV2(t *testing.T) {
req, err := http.NewRequest("GET", ts1.AdminURL()+apiV2Path+"sessions/", nil)
require.NoError(t, err)
query := req.URL.Query()
+ query.Add("exclude_closed_sessions", "true")
if limit > 0 {
query.Add("limit", strconv.Itoa(limit))
}
diff --git a/pkg/server/server.go b/pkg/server/server.go
index c1820a07d7d0..c37d2c1e162b 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -689,6 +689,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
sHTTP := newHTTPServer(cfg.BaseConfig, rpcContext, parseNodeIDFn, getNodeIDHTTPAddressFn)
sessionRegistry := sql.NewSessionRegistry()
+ closedSessionCache := sql.NewClosedSessionCache(cfg.Settings, sqlMonitorAndMetrics.rootSQLMemoryMonitor, time.Now)
flowScheduler := flowinfra.NewFlowScheduler(cfg.AmbientCtx, stopper, st)
sStatus := newStatusServer(
@@ -706,6 +707,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
node.stores,
stopper,
sessionRegistry,
+ closedSessionCache,
flowScheduler,
internalExecutor,
)
@@ -756,6 +758,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
registry: registry,
recorder: recorder,
sessionRegistry: sessionRegistry,
+ closedSessionCache: closedSessionCache,
flowScheduler: flowScheduler,
circularInternalExecutor: internalExecutor,
circularJobRegistry: jobRegistry,
diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go
index ccf889095338..9e21ec14091c 100644
--- a/pkg/server/server_sql.go
+++ b/pkg/server/server_sql.go
@@ -135,6 +135,7 @@ type SQLServer struct {
// sessionRegistry can be queried for info on running SQL sessions. It is
// shared between the sql.Server and the statusServer.
sessionRegistry *sql.SessionRegistry
+ closedSessionCache *sql.ClosedSessionCache
jobRegistry *jobs.Registry
startupMigrationsMgr *startupmigrations.Manager
statsRefresher *stats.Refresher
@@ -273,6 +274,9 @@ type sqlServerArgs struct {
// Used for SHOW/CANCEL QUERIE(S)/SESSION(S).
sessionRegistry *sql.SessionRegistry
+ // Used to store closed sessions.
+ closedSessionCache *sql.ClosedSessionCache
+
// Used to track the DistSQL flows scheduled on this node but initiated on
// behalf of other nodes.
flowScheduler *flowinfra.FlowScheduler
@@ -728,6 +732,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
SQLStatusServer: cfg.sqlStatusServer,
RegionsServer: cfg.regionsServer,
SessionRegistry: cfg.sessionRegistry,
+ ClosedSessionCache: cfg.closedSessionCache,
ContentionRegistry: contentionRegistry,
SQLLiveness: cfg.sqlLivenessProvider,
JobRegistry: jobRegistry,
@@ -1061,6 +1066,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
tracingService: tracingService,
tenantConnect: cfg.tenantConnect,
sessionRegistry: cfg.sessionRegistry,
+ closedSessionCache: cfg.closedSessionCache,
jobRegistry: jobRegistry,
statsRefresher: statsRefresher,
temporaryObjectCleaner: temporaryObjectCleaner,
diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto
index 5a82b523c255..f6c5ab35c2d2 100644
--- a/pkg/server/serverpb/status.proto
+++ b/pkg/server/serverpb/status.proto
@@ -907,6 +907,9 @@ message ListSessionsRequest {
// The caller is responsible to normalize the username
// (= case fold and perform unicode NFC normalization).
string username = 1;
+ // Boolean to exclude closed sessions; if unspecified, defaults
+ // to false and closed sessions are included in the response.
+ bool exclude_closed_sessions = 2;
}
// Session represents one SQL session.
@@ -944,6 +947,17 @@ message Session {
// The SQL statement fingerprint of the last query executed on this session,
// compatible with StatementStatisticsKey.
string last_active_query_no_constants = 13;
+ // Enum for sessions status.
+ enum Status {
+ ACTIVE = 0;
+ CLOSED = 1;
+ IDLE = 2;
+ }
+ // The session's status.
+ Status status = 14;
+ // Timestamp of session's end.
+ google.protobuf.Timestamp end = 15
+ [ (gogoproto.nullable) = true, (gogoproto.stdtime) = true ];
}
// An error wrapper object for ListSessionsResponse.
diff --git a/pkg/server/status.go b/pkg/server/status.go
index 1cfc128f07c8..d8c5e2742717 100644
--- a/pkg/server/status.go
+++ b/pkg/server/status.go
@@ -143,13 +143,14 @@ type baseStatusServer struct {
serverpb.UnimplementedStatusServer
log.AmbientContext
- privilegeChecker *adminPrivilegeChecker
- sessionRegistry *sql.SessionRegistry
- flowScheduler *flowinfra.FlowScheduler
- st *cluster.Settings
- sqlServer *SQLServer
- rpcCtx *rpc.Context
- stopper *stop.Stopper
+ privilegeChecker *adminPrivilegeChecker
+ sessionRegistry *sql.SessionRegistry
+ closedSessionCache *sql.ClosedSessionCache
+ flowScheduler *flowinfra.FlowScheduler
+ st *cluster.Settings
+ sqlServer *SQLServer
+ rpcCtx *rpc.Context
+ stopper *stop.Stopper
}
// getLocalSessions returns a list of local sessions on this node. Note that the
@@ -199,9 +200,22 @@ func (b *baseStatusServer) getLocalSessions(
// The empty username means "all sessions".
showAll := reqUsername.Undefined()
- registry := b.sessionRegistry
- sessions := registry.SerializeAll()
- userSessions := make([]serverpb.Session, 0, len(sessions))
+ // In order to avoid duplicate sessions showing up as both open and closed,
+ // we lock the session registry to prevent any changes to it while we
+ // serialize the sessions from the session registry and the closed session
+ // cache.
+ b.sessionRegistry.Lock()
+ sessions := b.sessionRegistry.SerializeAllLocked()
+
+ var closedSessions []serverpb.Session
+ if !req.ExcludeClosedSessions {
+ closedSessions = b.closedSessionCache.GetSerializedSessions()
+ }
+
+ b.sessionRegistry.Unlock()
+
+ userSessions := make([]serverpb.Session, 0, len(sessions)+len(closedSessions))
+ sessions = append(sessions, closedSessions...)
for _, session := range sessions {
if reqUsername.Normalized() != session.Username && !showAll {
@@ -220,6 +234,11 @@ func (b *baseStatusServer) getLocalSessions(
userSessions = append(userSessions, session)
}
+
+ sort.Slice(userSessions, func(i, j int) bool {
+ return userSessions[i].Start.Before(userSessions[j].Start)
+ })
+
return userSessions, nil
}
@@ -501,19 +520,21 @@ func newStatusServer(
stores *kvserver.Stores,
stopper *stop.Stopper,
sessionRegistry *sql.SessionRegistry,
+ closedSessionCache *sql.ClosedSessionCache,
flowScheduler *flowinfra.FlowScheduler,
internalExecutor *sql.InternalExecutor,
) *statusServer {
ambient.AddLogTag("status", nil)
server := &statusServer{
baseStatusServer: &baseStatusServer{
- AmbientContext: ambient,
- privilegeChecker: adminAuthzCheck,
- sessionRegistry: sessionRegistry,
- flowScheduler: flowScheduler,
- st: st,
- rpcCtx: rpcCtx,
- stopper: stopper,
+ AmbientContext: ambient,
+ privilegeChecker: adminAuthzCheck,
+ sessionRegistry: sessionRegistry,
+ closedSessionCache: closedSessionCache,
+ flowScheduler: flowScheduler,
+ st: st,
+ rpcCtx: rpcCtx,
+ stopper: stopper,
},
cfg: cfg,
admin: adminServer,
@@ -2744,6 +2765,9 @@ func (s *statusServer) listSessionsHelper(
}
sessions := nodeResp.([]serverpb.Session)
response.Sessions = append(response.Sessions, sessions...)
+ sort.Slice(response.Sessions, func(i, j int) bool {
+ return response.Sessions[i].Start.Before(response.Sessions[j].Start)
+ })
}
errorFn := func(nodeID roachpb.NodeID, err error) {
errResponse := serverpb.ListSessionsError{NodeID: nodeID, Message: err.Error()}
diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go
index 799ba2b9cc6f..21aec0eea5b6 100644
--- a/pkg/server/status_test.go
+++ b/pkg/server/status_test.go
@@ -25,6 +25,7 @@ import (
"sort"
"strconv"
"strings"
+ "sync"
"testing"
"time"
@@ -3270,6 +3271,140 @@ func TestStatusAPIListSessions(t *testing.T) {
require.Equal(t, "SELECT 2", session.LastActiveQuery)
}
+func TestListClosedSessions(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ // The active sessions might close before the stress race can finish.
+ skip.UnderStressRace(t, "active sessions")
+
+ ctx := context.Background()
+ serverParams, _ := tests.CreateTestServerParams()
+ testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{
+ ServerArgs: serverParams,
+ })
+ defer testCluster.Stopper().Stop(ctx)
+
+ server := testCluster.Server(0)
+
+ doSessionsRequest := func(username string) serverpb.ListSessionsResponse {
+ var resp serverpb.ListSessionsResponse
+ path := "/_status/sessions?username=" + username
+ err := serverutils.GetJSONProto(server, path, &resp)
+ require.NoError(t, err)
+ return resp
+ }
+
+ getUserConn := func(t *testing.T, username string, server serverutils.TestServerInterface) *gosql.DB {
+ pgURL := url.URL{
+ Scheme: "postgres",
+ User: url.UserPassword(username, "hunter2"),
+ Host: server.ServingSQLAddr(),
+ }
+ db, err := gosql.Open("postgres", pgURL.String())
+ require.NoError(t, err)
+ return db
+ }
+
+ // Create a test user.
+ users := []string{"test_user_a", "test_user_b", "test_user_c"}
+ conn := testCluster.ServerConn(0)
+ _, err := conn.Exec(fmt.Sprintf(`
+CREATE USER %s with password 'hunter2';
+CREATE USER %s with password 'hunter2';
+CREATE USER %s with password 'hunter2';
+`, users[0], users[1], users[2]))
+ require.NoError(t, err)
+
+ var dbs []*gosql.DB
+
+ // Open 10 sessions for the user and then close them.
+ for _, user := range users {
+ for i := 0; i < 10; i++ {
+ targetDB := getUserConn(t, user, testCluster.Server(0))
+ dbs = append(dbs, targetDB)
+ sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT version()`)
+ }
+ }
+
+ for _, db := range dbs {
+ err := db.Close()
+ require.NoError(t, err)
+ }
+
+ var wg sync.WaitGroup
+
+ // Open 5 sessions for the user and leave them open by running pg_sleep(30).
+ for _, user := range users {
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func(user string) {
+ // Open a session for the target user.
+ targetDB := getUserConn(t, user, testCluster.Server(0))
+ defer targetDB.Close()
+ defer wg.Done()
+ sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT pg_sleep(30)`)
+ }(user)
+ }
+ }
+
+ // Open 3 sessions for the user and leave them idle by running version().
+ for _, user := range users {
+ for i := 0; i < 3; i++ {
+ targetDB := getUserConn(t, user, testCluster.Server(0))
+ defer targetDB.Close()
+ sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT version()`)
+ }
+ }
+
+ countSessionStatus := func(allSessions []serverpb.Session) (int, int, int) {
+ var active, idle, closed int
+ for _, s := range allSessions {
+ if s.Status.String() == "ACTIVE" {
+ active++
+ }
+ // IDLE sessions are open sessions with no active queries.
+ if s.Status.String() == "IDLE" {
+ idle++
+ }
+ if s.Status.String() == "CLOSED" {
+ closed++
+ }
+ }
+ return active, idle, closed
+ }
+
+ expectedIdle := 3
+ expectedActive := 5
+ expectedClosed := 10
+
+ testutils.SucceedsSoon(t, func() error {
+ for _, user := range users {
+ sessionsResponse := doSessionsRequest(user)
+ allSessions := sessionsResponse.Sessions
+ sort.Slice(allSessions, func(i, j int) bool {
+ return allSessions[i].Start.Before(allSessions[j].Start)
+ })
+
+ active, idle, closed := countSessionStatus(allSessions)
+ if idle != expectedIdle {
+ return errors.Newf("User: %s: Expected %d idle sessions, got %d\n", user, expectedIdle, idle)
+ }
+ if active != expectedActive {
+ return errors.Newf("User: %s: Expected %d active sessions, got %d\n", user, expectedActive, active)
+ }
+ if closed != expectedClosed {
+ return errors.Newf("User: %s: Expected %d closed sessions, got %d\n", user, expectedClosed, closed)
+ }
+ }
+ return nil
+ })
+
+ // Wait for the goroutines from the pg_sleep() command to finish, so we can
+ // safely close their connections.
+ wg.Wait()
+}
+
func TestTransactionContentionEvents(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go
index 1cc74076cbd7..a93c304625e4 100644
--- a/pkg/server/tenant.go
+++ b/pkg/server/tenant.go
@@ -143,6 +143,9 @@ func startTenantInternal(
histogramWindowInterval: args.HistogramWindowInterval(),
settings: args.Settings,
})
+ closedSessionCache := sql.NewClosedSessionCache(
+ baseCfg.Settings, args.monitorAndMetrics.rootSQLMemoryMonitor, time.Now)
+ args.closedSessionCache = closedSessionCache
// Initialize gRPC server for use on shared port with pg
grpcMain := newGRPCServer(args.rpcContext)
@@ -208,7 +211,7 @@ func startTenantInternal(
// the SQL server object.
tenantStatusServer := newTenantStatusServer(
baseCfg.AmbientCtx, &adminPrivilegeChecker{ie: args.circularInternalExecutor},
- args.sessionRegistry, args.flowScheduler, baseCfg.Settings, nil,
+ args.sessionRegistry, args.closedSessionCache, args.flowScheduler, baseCfg.Settings, nil,
args.rpcContext, args.stopper,
)
diff --git a/pkg/server/tenant_status.go b/pkg/server/tenant_status.go
index e5d4e2093282..d0e9df2171e2 100644
--- a/pkg/server/tenant_status.go
+++ b/pkg/server/tenant_status.go
@@ -81,6 +81,7 @@ func newTenantStatusServer(
ambient log.AmbientContext,
privilegeChecker *adminPrivilegeChecker,
sessionRegistry *sql.SessionRegistry,
+ closedSessionCache *sql.ClosedSessionCache,
flowScheduler *flowinfra.FlowScheduler,
st *cluster.Settings,
sqlServer *SQLServer,
@@ -90,14 +91,15 @@ func newTenantStatusServer(
ambient.AddLogTag("tenant-status", nil)
return &tenantStatusServer{
baseStatusServer: baseStatusServer{
- AmbientContext: ambient,
- privilegeChecker: privilegeChecker,
- sessionRegistry: sessionRegistry,
- flowScheduler: flowScheduler,
- st: st,
- sqlServer: sqlServer,
- rpcCtx: rpcCtx,
- stopper: stopper,
+ AmbientContext: ambient,
+ privilegeChecker: privilegeChecker,
+ sessionRegistry: sessionRegistry,
+ closedSessionCache: closedSessionCache,
+ flowScheduler: flowScheduler,
+ st: st,
+ sqlServer: sqlServer,
+ rpcCtx: rpcCtx,
+ stopper: stopper,
},
}
}
diff --git a/pkg/sql/closed_session_cache.go b/pkg/sql/closed_session_cache.go
index f851d6db15cd..2ac9c5a573ba 100644
--- a/pkg/sql/closed_session_cache.go
+++ b/pkg/sql/closed_session_cache.go
@@ -11,13 +11,17 @@
package sql
import (
+ "context"
"time"
+ "unsafe"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/cache"
+ "github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
type timeSource func() time.Time
@@ -28,7 +32,7 @@ var ClosedSessionCacheCapacity = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.closed_session_cache.capacity",
"the maximum number of sessions in the cache",
- 100, // TODO(gtr): Totally arbitrary for now, adjust later.
+ 1000, // TODO(gtr): Totally arbitrary for now, adjust later.
).WithPublic()
// ClosedSessionCacheTimeToLive is the cluster setting that controls the maximum time
@@ -47,32 +51,57 @@ type ClosedSessionCache struct {
mu struct {
syncutil.RWMutex
+ acc mon.BoundAccount
data *cache.UnorderedCache
}
+
+ mon *mon.BytesMonitor
}
// NewClosedSessionCache returns a new ClosedSessionCache.
-func NewClosedSessionCache(st *cluster.Settings, timeSrc timeSource) *ClosedSessionCache {
- c := &ClosedSessionCache{st: st, timeSrc: timeSrc}
+func NewClosedSessionCache(
+ st *cluster.Settings, parentMon *mon.BytesMonitor, timeSrc timeSource,
+) *ClosedSessionCache {
+ monitor := mon.NewMonitorInheritWithLimit("closed-session-cache", 0 /* limit*/, parentMon)
+ c := &ClosedSessionCache{st: st, timeSrc: timeSrc}
c.mu.data = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheFIFO,
ShouldEvict: func(size int, _, _ interface{}) bool {
capacity := ClosedSessionCacheCapacity.Get(&st.SV)
return int64(size) > capacity
},
+ OnEvictedEntry: func(entry *cache.Entry) {
+ node := entry.Value.(*sessionNode)
+ size := int64(node.size())
+ c.mu.acc.Shrink(context.Background(), size)
+ },
})
+ c.mu.acc = monitor.MakeBoundAccount()
+ c.mon = monitor
+ c.mon.Start(context.Background(), parentMon, mon.BoundAccount{})
return c
}
-// Add adds a closed session to the ClosedSessionCache.
-func (c *ClosedSessionCache) Add(id ClusterWideID, session serverpb.Session) {
+// add adds a closed session to the ClosedSessionCache.
+func (c *ClosedSessionCache) add(
+ ctx context.Context, id ClusterWideID, session serverpb.Session,
+) error {
c.mu.Lock()
defer c.mu.Unlock()
+ end := timeutil.Now()
+ session.End = &end
+ session.Status = serverpb.Session_CLOSED
node := &sessionNode{id: id, data: session, timestamp: c.timeSrc()}
+
+ if err := c.mu.acc.Grow(ctx, int64(node.size())); err != nil {
+ return err
+ }
+
c.mu.data.Add(id, node)
+ return nil
}
func (c *ClosedSessionCache) size() int {
@@ -146,3 +175,11 @@ func (n *sessionNode) getAge(now timeSource) time.Duration {
func (n *sessionNode) getAgeString(now timeSource) string {
return n.getAge(now).Round(time.Second).String()
}
+
+func (n *sessionNode) size() int {
+ size := 0
+ size += int(unsafe.Sizeof(ClusterWideID{}))
+ size += n.data.Size()
+ size += int(unsafe.Sizeof(time.Time{}))
+ return size
+}
diff --git a/pkg/sql/closed_session_cache_test.go b/pkg/sql/closed_session_cache_test.go
index d6fbac8ccc93..00690e7edc67 100644
--- a/pkg/sql/closed_session_cache_test.go
+++ b/pkg/sql/closed_session_cache_test.go
@@ -13,6 +13,7 @@ package sql
import (
"context"
"fmt"
+ "math"
"strings"
"testing"
"time"
@@ -21,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
@@ -40,15 +42,24 @@ func TestSessionCacheBasic(t *testing.T) {
datadriven.Walk(t, testutils.TestDataPath(t, "closed_session_cache"), func(t *testing.T, path string) {
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
+ ctx := context.Background()
switch d.Cmd {
case "init":
var capacity, timeToLive int
d.ScanArgs(t, "capacity", &capacity)
d.ScanArgs(t, "timeToLive", &timeToLive)
- ctx := context.Background()
st := &cluster.Settings{}
- cache = NewClosedSessionCache(st, time.Now)
+ monitor := mon.NewUnlimitedMonitor(
+ ctx,
+ "test",
+ mon.MemoryResource,
+ nil, /* currCount */
+ nil, /* maxHist */
+ math.MaxInt64,
+ st,
+ )
+ cache = NewClosedSessionCache(st, monitor, time.Now)
ClosedSessionCacheCapacity.Override(ctx, &st.SV, int64(capacity))
ClosedSessionCacheTimeToLive.Override(ctx, &st.SV, int64(timeToLive))
@@ -62,7 +73,8 @@ func TestSessionCacheBasic(t *testing.T) {
session := serverpb.Session{}
sessionID := ClusterWideID{id}
- cache.Add(sessionID, session)
+ err = cache.add(ctx, sessionID, session)
+ require.NoError(t, err)
return fmt.Sprintf("cache_size: %d", cache.size())
case "addSessionBatch":
@@ -79,7 +91,8 @@ func TestSessionCacheBasic(t *testing.T) {
cache.timeSrc = addSeconds(cache.timeSrc, seconds)
session := serverpb.Session{}
sessionID := ClusterWideID{id}
- cache.Add(sessionID, session)
+ err := cache.add(ctx, sessionID, session)
+ require.NoError(t, err)
id = id.Add(1)
}
diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go
index 2eef8c9b3165..90b35e5b1024 100644
--- a/pkg/sql/conn_executor.go
+++ b/pkg/sql/conn_executor.go
@@ -1778,7 +1778,7 @@ func (ex *connExecutor) run(
parentMon *mon.BytesMonitor,
reserved mon.BoundAccount,
onCancel context.CancelFunc,
-) error {
+) (err error) {
if !ex.activated {
ex.activate(ctx, parentMon, reserved)
}
@@ -1788,7 +1788,15 @@ func (ex *connExecutor) run(
ex.sessionID = ex.generateID()
ex.server.cfg.SessionRegistry.register(ex.sessionID, ex.queryCancelKey, ex)
ex.planner.extendedEvalCtx.setSessionID(ex.sessionID)
- defer ex.server.cfg.SessionRegistry.deregister(ex.sessionID, ex.queryCancelKey)
+
+ defer func() {
+ ex.server.cfg.SessionRegistry.deregister(ex.sessionID, ex.queryCancelKey)
+ addErr := ex.server.cfg.ClosedSessionCache.add(ctx, ex.sessionID, ex.serialize())
+ if addErr != nil {
+ err = errors.CombineErrors(err, addErr)
+ }
+ }()
+
for {
ex.curStmtAST = nil
if err := ctx.Err(); err != nil {
@@ -3089,6 +3097,10 @@ func (ex *connExecutor) serialize() serverpb.Session {
lastActiveQuery = truncateSQL(ex.mu.LastActiveQuery.String())
lastActiveQueryNoConstants = truncateSQL(formatStatementHideConstants(ex.mu.LastActiveQuery))
}
+ status := serverpb.Session_IDLE
+ if len(activeQueries) > 0 {
+ status = serverpb.Session_ACTIVE
+ }
// We always use base here as the fields from the SessionData should always
// be that of the root session.
@@ -3111,6 +3123,7 @@ func (ex *connExecutor) serialize() serverpb.Session {
AllocBytes: ex.mon.AllocBytes(),
MaxAllocBytes: ex.mon.MaximumBytes(),
LastActiveQueryNoConstants: lastActiveQueryNoConstants,
+ Status: status,
}
}
diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go
index 73779f9c76cd..6fe480d55751 100644
--- a/pkg/sql/conn_executor_internal_test.go
+++ b/pkg/sql/conn_executor_internal_test.go
@@ -280,6 +280,12 @@ func startConnExecutor(
}
defer tempEngine.Close()
ambientCtx := log.MakeTestingAmbientCtxWithNewTracer()
+ pool := mon.NewUnlimitedMonitor(
+ context.Background(), "test", mon.MemoryResource,
+ nil /* curCount */, nil /* maxHist */, math.MaxInt64, st,
+ )
+ // This pool should never be Stop()ed because, if the test is failing, memory
+ // is not properly released.
cfg := &ExecutorConfig{
AmbientCtx: ambientCtx,
Settings: st,
@@ -288,7 +294,8 @@ func startConnExecutor(
SystemConfig: config.NewConstantSystemConfigProvider(
config.NewSystemConfig(zonepb.DefaultZoneConfigRef()),
),
- SessionRegistry: NewSessionRegistry(),
+ SessionRegistry: NewSessionRegistry(),
+ ClosedSessionCache: NewClosedSessionCache(st, pool, time.Now),
NodeInfo: NodeInfo{
NodeID: nodeID,
LogicalClusterID: func() uuid.UUID { return uuid.UUID{} },
@@ -326,12 +333,6 @@ func startConnExecutor(
HistogramWindowInterval: base.DefaultHistogramWindowInterval(),
CollectionFactory: descs.NewBareBonesCollectionFactory(st, keys.SystemSQLCodec),
}
- pool := mon.NewUnlimitedMonitor(
- context.Background(), "test", mon.MemoryResource,
- nil /* curCount */, nil /* maxHist */, math.MaxInt64, st,
- )
- // This pool should never be Stop()ed because, if the test is failing, memory
- // is not properly released.
s := NewServer(cfg, pool)
buf := NewStmtBuf()
diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go
index 7a5894ec8e22..88acfa7f02f5 100644
--- a/pkg/sql/crdb_internal.go
+++ b/pkg/sql/crdb_internal.go
@@ -1625,7 +1625,7 @@ var crdbInternalLocalTxnsTable = virtualSchemaTable{
if err := p.RequireAdminRole(ctx, "read crdb_internal.node_transactions"); err != nil {
return err
}
- req, err := p.makeSessionsRequest(ctx)
+ req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */)
if err != nil {
return err
}
@@ -1644,7 +1644,7 @@ var crdbInternalClusterTxnsTable = virtualSchemaTable{
if err := p.RequireAdminRole(ctx, "read crdb_internal.cluster_transactions"); err != nil {
return err
}
- req, err := p.makeSessionsRequest(ctx)
+ req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */)
if err != nil {
return err
}
@@ -1719,8 +1719,13 @@ CREATE TABLE crdb_internal.%s (
phase STRING -- the current execution phase
)`
-func (p *planner) makeSessionsRequest(ctx context.Context) (serverpb.ListSessionsRequest, error) {
- req := serverpb.ListSessionsRequest{Username: p.SessionData().User().Normalized()}
+func (p *planner) makeSessionsRequest(
+ ctx context.Context, excludeClosed bool,
+) (serverpb.ListSessionsRequest, error) {
+ req := serverpb.ListSessionsRequest{
+ Username: p.SessionData().User().Normalized(),
+ ExcludeClosedSessions: excludeClosed,
+ }
hasAdmin, err := p.HasAdminRole(ctx)
if err != nil {
return serverpb.ListSessionsRequest{}, err
@@ -1770,7 +1775,7 @@ var crdbInternalLocalQueriesTable = virtualSchemaTable{
comment: "running queries visible by current user (RAM; local node only)",
schema: fmt.Sprintf(queriesSchemaPattern, "node_queries"),
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
- req, err := p.makeSessionsRequest(ctx)
+ req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */)
if err != nil {
return err
}
@@ -1788,7 +1793,7 @@ var crdbInternalClusterQueriesTable = virtualSchemaTable{
comment: "running queries visible by current user (cluster RPC; expensive!)",
schema: fmt.Sprintf(queriesSchemaPattern, "cluster_queries"),
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
- req, err := p.makeSessionsRequest(ctx)
+ req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */)
if err != nil {
return err
}
@@ -1889,7 +1894,9 @@ CREATE TABLE crdb_internal.%s (
oldest_query_start TIMESTAMP, -- the time when the oldest query in the session was started
kv_txn STRING, -- the ID of the current KV transaction
alloc_bytes INT, -- the number of bytes allocated by the session
- max_alloc_bytes INT -- the high water mark of bytes allocated by the session
+ max_alloc_bytes INT, -- the high water mark of bytes allocated by the session
+ status STRING, -- the status of the session (open, closed)
+ session_end TIMESTAMP -- the time when the session was closed
)
`
@@ -1899,7 +1906,7 @@ var crdbInternalLocalSessionsTable = virtualSchemaTable{
comment: "running sessions visible by current user (RAM; local node only)",
schema: fmt.Sprintf(sessionsSchemaPattern, "node_sessions"),
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
- req, err := p.makeSessionsRequest(ctx)
+ req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */)
if err != nil {
return err
}
@@ -1917,7 +1924,7 @@ var crdbInternalClusterSessionsTable = virtualSchemaTable{
comment: "running sessions visible to current user (cluster RPC; expensive!)",
schema: fmt.Sprintf(sessionsSchemaPattern, "cluster_sessions"),
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
- req, err := p.makeSessionsRequest(ctx)
+ req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */)
if err != nil {
return err
}
@@ -1969,6 +1976,13 @@ func populateSessionsTable(
if err != nil {
return err
}
+ endTSDatum := tree.DNull
+ if session.End != nil {
+ endTSDatum, err = tree.MakeDTimestamp(*session.End, time.Microsecond)
+ if err != nil {
+ return err
+ }
+ }
if err := addRow(
tree.NewDInt(tree.DInt(session.NodeID)),
sessionID,
@@ -1982,6 +1996,8 @@ func populateSessionsTable(
kvTxnIDDatum,
tree.NewDInt(tree.DInt(session.AllocBytes)),
tree.NewDInt(tree.DInt(session.MaxAllocBytes)),
+ tree.NewDString(session.Status.String()),
+ endTSDatum,
); err != nil {
return err
}
@@ -2005,6 +2021,8 @@ func populateSessionsTable(
tree.DNull, // kv_txn
tree.DNull, // alloc_bytes
tree.DNull, // max_alloc_bytes
+ tree.DNull, // status
+ tree.DNull, // session end
); err != nil {
return err
}
diff --git a/pkg/sql/delegate/show_sessions.go b/pkg/sql/delegate/show_sessions.go
index be8dbe806f48..9dc08aba57c7 100644
--- a/pkg/sql/delegate/show_sessions.go
+++ b/pkg/sql/delegate/show_sessions.go
@@ -16,7 +16,7 @@ import (
)
func (d *delegator) delegateShowSessions(n *tree.ShowSessions) (tree.Statement, error) {
- const query = `SELECT node_id, session_id, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.`
+ const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.`
table := `node_sessions`
if n.Cluster {
table = `cluster_sessions`
diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go
index bad602fc6606..26db6e574323 100644
--- a/pkg/sql/exec_util.go
+++ b/pkg/sql/exec_util.go
@@ -1178,6 +1178,7 @@ type ExecutorConfig struct {
RegionsServer serverpb.RegionsServer
MetricsRecorder nodeStatusGenerator
SessionRegistry *SessionRegistry
+ ClosedSessionCache *ClosedSessionCache
SQLLiveness sqlliveness.Liveness
JobRegistry *jobs.Registry
VirtualSchemas *VirtualSchemaHolder
@@ -2032,6 +2033,11 @@ func (r *SessionRegistry) SerializeAll() []serverpb.Session {
r.Lock()
defer r.Unlock()
+ return r.SerializeAllLocked()
+}
+
+// SerializeAllLocked is like SerializeAll but assumes SessionRegistry's mutex is locked.
+func (r *SessionRegistry) SerializeAllLocked() []serverpb.Session {
response := make([]serverpb.Session, 0, len(r.sessions))
for _, s := range r.sessions {
diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal
index 381929f40a45..e29788fbe7f1 100644
--- a/pkg/sql/logictest/testdata/logic_test/crdb_internal
+++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal
@@ -363,15 +363,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0
----
id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries
-query ITTTTTTTTTTT colnames
+query ITTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0
----
-node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes
+node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
-query ITTTTTTTTTTT colnames
+query ITTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0
----
-node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes
+node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
query IIITTTI colnames
SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0
diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements
index 77d5cff749a9..ed7e14ad5c48 100644
--- a/pkg/sql/logictest/testdata/logic_test/create_statements
+++ b/pkg/sql/logictest/testdata/logic_test/create_statements
@@ -287,7 +287,9 @@ CREATE TABLE crdb_internal.cluster_sessions (
oldest_query_start TIMESTAMP NULL,
kv_txn STRING NULL,
alloc_bytes INT8 NULL,
- max_alloc_bytes INT8 NULL
+ max_alloc_bytes INT8 NULL,
+ status STRING NULL,
+ session_end TIMESTAMP NULL
) CREATE TABLE crdb_internal.cluster_sessions (
node_id INT8 NOT NULL,
session_id STRING NULL,
@@ -300,7 +302,9 @@ CREATE TABLE crdb_internal.cluster_sessions (
oldest_query_start TIMESTAMP NULL,
kv_txn STRING NULL,
alloc_bytes INT8 NULL,
- max_alloc_bytes INT8 NULL
+ max_alloc_bytes INT8 NULL,
+ status STRING NULL,
+ session_end TIMESTAMP NULL
) {} {}
CREATE TABLE crdb_internal.cluster_settings (
variable STRING NOT NULL,
@@ -906,7 +910,9 @@ CREATE TABLE crdb_internal.node_sessions (
oldest_query_start TIMESTAMP NULL,
kv_txn STRING NULL,
alloc_bytes INT8 NULL,
- max_alloc_bytes INT8 NULL
+ max_alloc_bytes INT8 NULL,
+ status STRING NULL,
+ session_end TIMESTAMP NULL
) CREATE TABLE crdb_internal.node_sessions (
node_id INT8 NOT NULL,
session_id STRING NULL,
@@ -919,7 +925,9 @@ CREATE TABLE crdb_internal.node_sessions (
oldest_query_start TIMESTAMP NULL,
kv_txn STRING NULL,
alloc_bytes INT8 NULL,
- max_alloc_bytes INT8 NULL
+ max_alloc_bytes INT8 NULL,
+ status STRING NULL,
+ session_end TIMESTAMP NULL
) {} {}
CREATE TABLE crdb_internal.node_statement_statistics (
node_id INT8 NOT NULL,
diff --git a/pkg/sql/temporary_schema.go b/pkg/sql/temporary_schema.go
index bf007cd955b9..d423c5f9866d 100644
--- a/pkg/sql/temporary_schema.go
+++ b/pkg/sql/temporary_schema.go
@@ -569,7 +569,9 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup(
var err error
response, err = c.statusServer.ListSessions(
ctx,
- &serverpb.ListSessionsRequest{},
+ &serverpb.ListSessionsRequest{
+ ExcludeClosedSessions: true,
+ },
)
if response != nil && len(response.Errors) > 0 &&
err == nil {
diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts
index c9f3c9b3010e..05140c7c720f 100644
--- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts
+++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts
@@ -21,6 +21,8 @@ import {
CancelSessionRequestMessage,
} from "src/api/terminateQueryApi";
+import Status = cockroach.server.serverpb.Session.Status;
+
const history = createMemoryHistory({ initialEntries: ["/sessions"] });
const toUuid = function(s: string): Uint8Array {
@@ -45,6 +47,7 @@ export const idleSession: SessionInfo = {
alloc_bytes: Long.fromNumber(0),
max_alloc_bytes: Long.fromNumber(10240),
active_queries: [],
+ status: Status.ACTIVE,
toJSON: () => ({}),
},
};
@@ -82,6 +85,7 @@ export const idleTransactionSession: SessionInfo = {
},
last_active_query_no_constants: "SHOW database",
active_queries: [],
+ status: Status.ACTIVE,
toJSON: () => ({}),
},
};
@@ -133,6 +137,7 @@ export const activeSession: SessionInfo = {
num_auto_retries: 3,
},
last_active_query_no_constants: "SHOW database",
+ status: Status.ACTIVE,
toJSON: () => ({}),
},
};