From 5f3b4bdffe107ce19ed82361cb5c8d3656069b42 Mon Sep 17 00:00:00 2001 From: Gerardo Torres Date: Mon, 28 Mar 2022 14:28:44 -0400 Subject: [PATCH] server, sql: expose closed sessions to ListSessions endpoint This is the second phase of #67888 which is to expose closed sessions to the ListSessions endpoint. Previously, the ListSessions endpoint only returned open sessions. This change builds on top of the previous PR to add the `ClosedSessionsCache` and now allows the ListSessions endpoint to also return closed sessions in its response. The `ListSessionsRequest` object was edited to include a flag `exclude_closed_sessions` which is a boolean to exclude closed sessions. If unspecified, defaults to false and closed sessions are included in the response. Additionally, the `serverpb.Session` object was updated to include new `end` and `status` fields which specify the time the session ended and the status (active, idle, closed) of the session, respectively. Release note (api change): `ListSessions` now returns closed sessions in addition to open sessions; `ListSessionsRequest` now has a `exclude_closed_sessions` flag; `serverpb.Session` now has `end` and `status` fields. --- docs/generated/http/full.md | 6 + .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- docs/generated/swagger/spec.json | 21 +++ .../testdata/logic_test/crdb_internal_tenant | 8 +- .../serverccl/statusccl/tenant_status_test.go | 5 +- pkg/server/api_v2.go | 9 +- pkg/server/api_v2_test.go | 1 + pkg/server/server.go | 3 + pkg/server/server_sql.go | 6 + pkg/server/serverpb/status.proto | 14 ++ pkg/server/status.go | 58 ++++++--- pkg/server/status_test.go | 123 ++++++++++++++++++ pkg/server/tenant.go | 5 +- pkg/server/tenant_status.go | 18 +-- pkg/sql/closed_session_cache.go | 37 +++++- pkg/sql/closed_session_cache_test.go | 21 ++- pkg/sql/conn_executor.go | 17 ++- pkg/sql/conn_executor_internal_test.go | 15 ++- pkg/sql/crdb_internal.go | 36 +++-- pkg/sql/delegate/show_sessions.go | 2 +- pkg/sql/exec_util.go | 6 + .../testdata/logic_test/crdb_internal | 8 +- .../testdata/logic_test/create_statements | 16 ++- pkg/sql/temporary_schema.go | 4 +- .../src/sessions/sessionsPage.fixture.ts | 5 + 26 files changed, 376 insertions(+), 72 deletions(-) diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index e4162516dccf..66d9d6b4f51e 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -2051,6 +2051,7 @@ Request object for ListSessions and ListLocalSessions. | Field | Type | Label | Description | Support status | | ----- | ---- | ----- | ----------- | -------------- | | username | [string](#cockroach.server.serverpb.ListSessionsRequest-string) | | Username of the user making this request. The caller is responsible to normalize the username (= case fold and perform unicode NFC normalization). | [reserved](#support-status) | +| exclude_closed_sessions | [bool](#cockroach.server.serverpb.ListSessionsRequest-bool) | | Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response. | [reserved](#support-status) | @@ -2096,6 +2097,8 @@ Session represents one SQL session. | max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) | | active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) | | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | +| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | +| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | @@ -2182,6 +2185,7 @@ Request object for ListSessions and ListLocalSessions. | Field | Type | Label | Description | Support status | | ----- | ---- | ----- | ----------- | -------------- | | username | [string](#cockroach.server.serverpb.ListSessionsRequest-string) | | Username of the user making this request. The caller is responsible to normalize the username (= case fold and perform unicode NFC normalization). | [reserved](#support-status) | +| exclude_closed_sessions | [bool](#cockroach.server.serverpb.ListSessionsRequest-bool) | | Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response. | [reserved](#support-status) | @@ -2227,6 +2231,8 @@ Session represents one SQL session. | max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) | | active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) | | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | +| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | +| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index d1de01ceb3ac..790a4e102907 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -87,7 +87,7 @@ server.web_session.purge.period duration 1h0m0s the time until old sessions are server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid sql.auth.resolve_membership_single_scan.enabled boolean true determines whether to populate the role membership cache with a single scan -sql.closed_session_cache.capacity integer 100 the maximum number of sessions in the cache +sql.closed_session_cache.capacity integer 1000 the maximum number of sessions in the cache sql.closed_session_cache.time_to_live integer 3600 the maximum time to live, in seconds sql.contention.event_store.capacity byte size 64 MiB the in-memory storage capacity per-node of contention event store sql.contention.event_store.duration_threshold duration 0s minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 1f6000c3ced0..8367edcdda4c 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -102,7 +102,7 @@ server.web_session.purge.ttlduration1h0m0sif nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeoutduration168h0m0sthe duration that a newly created web session will be valid sql.auth.resolve_membership_single_scan.enabledbooleantruedetermines whether to populate the role membership cache with a single scan -sql.closed_session_cache.capacityinteger100the maximum number of sessions in the cache +sql.closed_session_cache.capacityinteger1000the maximum number of sessions in the cache sql.closed_session_cache.time_to_liveinteger3600the maximum time to live, in seconds sql.contention.event_store.capacitybyte size64 MiBthe in-memory storage capacity per-node of contention event store sql.contention.event_store.duration_thresholdduration0sminimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events diff --git a/docs/generated/swagger/spec.json b/docs/generated/swagger/spec.json index 4fe16b70a3c9..f567625f81e0 100644 --- a/docs/generated/swagger/spec.json +++ b/docs/generated/swagger/spec.json @@ -551,6 +551,12 @@ "name": "username", "in": "query" }, + { + "type": "bool", + "description": "Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response.", + "name": "exclude_closed_sessions", + "in": "query" + }, { "type": "integer", "description": "Maximum number of results to return in this call.", @@ -1156,6 +1162,12 @@ "type": "string", "x-go-name": "ClientAddress" }, + "end": { + "description": "Timestamp of session's end.", + "type": "string", + "format": "date-time", + "x-go-name": "End" + }, "id": { "description": "ID of the session (uint128 represented as raw bytes).", "type": "array", @@ -1190,6 +1202,9 @@ "format": "date-time", "x-go-name": "Start" }, + "status": { + "$ref": "#/definitions/Session_Status" + }, "username": { "description": "Username of the user for this session.", "type": "string", @@ -1198,6 +1213,12 @@ }, "x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb" }, + "Session_Status": { + "type": "integer", + "format": "int32", + "title": "Enum for sessions status.", + "x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb" + }, "StoreID": { "type": "integer", "format": "int32", diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant index abd649705951..be2840a77b20 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant +++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant @@ -242,15 +242,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries -query ITTTTTTTTTTT colnames +query ITTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes +node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTT colnames +query ITTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes +node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go index ece17ea56773..9aeb0bdf2cda 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -806,7 +806,8 @@ WHERE func selectClusterSessionIDs(t *testing.T, conn *sqlutils.SQLRunner) []string { var sessionIDs []string - rows := conn.QueryStr(t, "SELECT session_id FROM crdb_internal.cluster_sessions") + rows := conn.QueryStr(t, + "SELECT session_id FROM crdb_internal.cluster_sessions WHERE status = 'ACTIVE' OR status = 'IDLE'") for _, row := range rows { sessionIDs = append(sessionIDs, row[0]) } @@ -822,7 +823,7 @@ func testTenantStatusCancelSession(t *testing.T, helper *tenantTestHelper) { httpPod1 := helper.testCluster().tenantAdminHTTPClient(t, 1) defer httpPod1.Close() listSessionsResp := serverpb.ListSessionsResponse{} - httpPod1.GetJSON("/_status/sessions", &listSessionsResp) + httpPod1.GetJSON("/_status/sessions?exclude_closed_sessions=true", &listSessionsResp) var session serverpb.Session for _, s := range listSessionsResp.Sessions { if s.LastActiveQuery == "SELECT 1" { diff --git a/pkg/server/api_v2.go b/pkg/server/api_v2.go index 4d161c4d7acd..d8f243f71f3d 100644 --- a/pkg/server/api_v2.go +++ b/pkg/server/api_v2.go @@ -226,6 +226,12 @@ type listSessionsResponse struct { // description: Username of user to return sessions for; if unspecified, // sessions from all users are returned. // required: false +// - name: exclude_closed_sessions +// type: bool +// in: query +// description: Boolean to exclude closed sessions; if unspecified, defaults +// to false and closed sessions are included in the response. +// required: false // - name: limit // type: integer // in: query @@ -249,7 +255,8 @@ func (a *apiV2Server) listSessions(w http.ResponseWriter, r *http.Request) { ctx := r.Context() limit, start := getRPCPaginationValues(r) reqUsername := r.URL.Query().Get("username") - req := &serverpb.ListSessionsRequest{Username: reqUsername} + reqExcludeClosed := r.URL.Query().Get("exclude_closed_sessions") == "true" + req := &serverpb.ListSessionsRequest{Username: reqUsername, ExcludeClosedSessions: reqExcludeClosed} response := &listSessionsResponse{} outgoingCtx := apiToOutgoingGatewayCtx(ctx, r) diff --git a/pkg/server/api_v2_test.go b/pkg/server/api_v2_test.go index ade26d5430b8..f6a7f444bdcd 100644 --- a/pkg/server/api_v2_test.go +++ b/pkg/server/api_v2_test.go @@ -59,6 +59,7 @@ func TestListSessionsV2(t *testing.T) { req, err := http.NewRequest("GET", ts1.AdminURL()+apiV2Path+"sessions/", nil) require.NoError(t, err) query := req.URL.Query() + query.Add("exclude_closed_sessions", "true") if limit > 0 { query.Add("limit", strconv.Itoa(limit)) } diff --git a/pkg/server/server.go b/pkg/server/server.go index 7a310c1be064..4e350abb14bd 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -688,6 +688,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { sHTTP := newHTTPServer(cfg.BaseConfig, rpcContext, parseNodeIDFn, getNodeIDHTTPAddressFn) sessionRegistry := sql.NewSessionRegistry() + closedSessionCache := sql.NewClosedSessionCache(cfg.Settings, sqlMonitorAndMetrics.rootSQLMemoryMonitor, time.Now) flowScheduler := flowinfra.NewFlowScheduler(cfg.AmbientCtx, stopper, st) sStatus := newStatusServer( @@ -705,6 +706,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { node.stores, stopper, sessionRegistry, + closedSessionCache, flowScheduler, internalExecutor, ) @@ -755,6 +757,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { registry: registry, recorder: recorder, sessionRegistry: sessionRegistry, + closedSessionCache: closedSessionCache, flowScheduler: flowScheduler, circularInternalExecutor: internalExecutor, circularJobRegistry: jobRegistry, diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index b8d4503661b7..b34f2d1079be 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -135,6 +135,7 @@ type SQLServer struct { // sessionRegistry can be queried for info on running SQL sessions. It is // shared between the sql.Server and the statusServer. sessionRegistry *sql.SessionRegistry + closedSessionCache *sql.ClosedSessionCache jobRegistry *jobs.Registry startupMigrationsMgr *startupmigrations.Manager statsRefresher *stats.Refresher @@ -273,6 +274,9 @@ type sqlServerArgs struct { // Used for SHOW/CANCEL QUERIE(S)/SESSION(S). sessionRegistry *sql.SessionRegistry + // Used to store closed sessions. + closedSessionCache *sql.ClosedSessionCache + // Used to track the DistSQL flows scheduled on this node but initiated on // behalf of other nodes. flowScheduler *flowinfra.FlowScheduler @@ -727,6 +731,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { SQLStatusServer: cfg.sqlStatusServer, RegionsServer: cfg.regionsServer, SessionRegistry: cfg.sessionRegistry, + ClosedSessionCache: cfg.closedSessionCache, ContentionRegistry: contentionRegistry, SQLLiveness: cfg.sqlLivenessProvider, JobRegistry: jobRegistry, @@ -1060,6 +1065,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { tracingService: tracingService, tenantConnect: cfg.tenantConnect, sessionRegistry: cfg.sessionRegistry, + closedSessionCache: cfg.closedSessionCache, jobRegistry: jobRegistry, statsRefresher: statsRefresher, temporaryObjectCleaner: temporaryObjectCleaner, diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index 836b5ae6843b..363371cd5fca 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -896,6 +896,9 @@ message ListSessionsRequest { // The caller is responsible to normalize the username // (= case fold and perform unicode NFC normalization). string username = 1; + // Boolean to exclude closed sessions; if unspecified, defaults + // to false and closed sessions are included in the response. + bool exclude_closed_sessions = 2; } // Session represents one SQL session. @@ -933,6 +936,17 @@ message Session { // The SQL statement fingerprint of the last query executed on this session, // compatible with StatementStatisticsKey. string last_active_query_no_constants = 13; + // Enum for sessions status. + enum Status { + ACTIVE = 0; + CLOSED = 1; + IDLE = 2; + } + // The session's status. + Status status = 14; + // Timestamp of session's end. + google.protobuf.Timestamp end = 15 + [ (gogoproto.nullable) = true, (gogoproto.stdtime) = true ]; } // An error wrapper object for ListSessionsResponse. diff --git a/pkg/server/status.go b/pkg/server/status.go index 071cbce312d6..fc37f2a3b4ae 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -142,13 +142,14 @@ type baseStatusServer struct { serverpb.UnimplementedStatusServer log.AmbientContext - privilegeChecker *adminPrivilegeChecker - sessionRegistry *sql.SessionRegistry - flowScheduler *flowinfra.FlowScheduler - st *cluster.Settings - sqlServer *SQLServer - rpcCtx *rpc.Context - stopper *stop.Stopper + privilegeChecker *adminPrivilegeChecker + sessionRegistry *sql.SessionRegistry + closedSessionCache *sql.ClosedSessionCache + flowScheduler *flowinfra.FlowScheduler + st *cluster.Settings + sqlServer *SQLServer + rpcCtx *rpc.Context + stopper *stop.Stopper } // getLocalSessions returns a list of local sessions on this node. Note that the @@ -198,9 +199,22 @@ func (b *baseStatusServer) getLocalSessions( // The empty username means "all sessions". showAll := reqUsername.Undefined() - registry := b.sessionRegistry - sessions := registry.SerializeAll() - userSessions := make([]serverpb.Session, 0, len(sessions)) + // In order to avoid duplicate sessions showing up as both open and closed, + // we lock the session registry to prevent any changes to it while we + // serialize the sessions from the session registry and the closed session + // cache. + b.sessionRegistry.Lock() + sessions := b.sessionRegistry.SerializeAllLocked() + + var closedSessions []serverpb.Session + if !req.ExcludeClosedSessions { + closedSessions = b.closedSessionCache.GetSerializedSessions() + } + + b.sessionRegistry.Unlock() + + userSessions := make([]serverpb.Session, 0, len(sessions)+len(closedSessions)) + sessions = append(sessions, closedSessions...) for _, session := range sessions { if reqUsername.Normalized() != session.Username && !showAll { @@ -219,6 +233,11 @@ func (b *baseStatusServer) getLocalSessions( userSessions = append(userSessions, session) } + + sort.Slice(userSessions, func(i, j int) bool { + return userSessions[i].Start.Before(userSessions[j].Start) + }) + return userSessions, nil } @@ -500,19 +519,21 @@ func newStatusServer( stores *kvserver.Stores, stopper *stop.Stopper, sessionRegistry *sql.SessionRegistry, + closedSessionCache *sql.ClosedSessionCache, flowScheduler *flowinfra.FlowScheduler, internalExecutor *sql.InternalExecutor, ) *statusServer { ambient.AddLogTag("status", nil) server := &statusServer{ baseStatusServer: &baseStatusServer{ - AmbientContext: ambient, - privilegeChecker: adminAuthzCheck, - sessionRegistry: sessionRegistry, - flowScheduler: flowScheduler, - st: st, - rpcCtx: rpcCtx, - stopper: stopper, + AmbientContext: ambient, + privilegeChecker: adminAuthzCheck, + sessionRegistry: sessionRegistry, + closedSessionCache: closedSessionCache, + flowScheduler: flowScheduler, + st: st, + rpcCtx: rpcCtx, + stopper: stopper, }, cfg: cfg, admin: adminServer, @@ -2734,6 +2755,9 @@ func (s *statusServer) listSessionsHelper( } sessions := nodeResp.([]serverpb.Session) response.Sessions = append(response.Sessions, sessions...) + sort.Slice(response.Sessions, func(i, j int) bool { + return response.Sessions[i].Start.Before(response.Sessions[j].Start) + }) } errorFn := func(nodeID roachpb.NodeID, err error) { errResponse := serverpb.ListSessionsError{NodeID: nodeID, Message: err.Error()} diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 8906c6d01b2e..bc51b21641bd 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -25,6 +25,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" @@ -3253,6 +3254,128 @@ func TestStatusAPIListSessions(t *testing.T) { require.Equal(t, "SELECT 2", session.LastActiveQuery) } +func TestListClosedSessions(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // The active sessions might close before the stress race can finish. + skip.UnderStressRace(t, "active sessions") + + ctx := context.Background() + serverParams, _ := tests.CreateTestServerParams() + testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ + ServerArgs: serverParams, + }) + defer testCluster.Stopper().Stop(ctx) + + server := testCluster.Server(0) + + doSessionsRequest := func(username string) serverpb.ListSessionsResponse { + var resp serverpb.ListSessionsResponse + path := "/_status/sessions?username=" + username + err := serverutils.GetJSONProto(server, path, &resp) + require.NoError(t, err) + return resp + } + + getUserConn := func(t *testing.T, username string, server serverutils.TestServerInterface) *gosql.DB { + pgURL := url.URL{ + Scheme: "postgres", + User: url.UserPassword(username, "hunter2"), + Host: server.ServingSQLAddr(), + } + db, err := gosql.Open("postgres", pgURL.String()) + require.NoError(t, err) + return db + } + + // Create a test user. + users := []string{"test_user_a", "test_user_b", "test_user_c"} + conn := testCluster.ServerConn(0) + _, err := conn.Exec(fmt.Sprintf(` +CREATE USER %s with password 'hunter2'; +CREATE USER %s with password 'hunter2'; +CREATE USER %s with password 'hunter2'; +`, users[0], users[1], users[2])) + require.NoError(t, err) + + var dbs []*gosql.DB + + // Open 10 sessions for the user and then close them. + for _, user := range users { + for i := 0; i < 10; i++ { + targetDB := getUserConn(t, user, testCluster.Server(0)) + dbs = append(dbs, targetDB) + sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT version()`) + } + } + + for _, db := range dbs { + err := db.Close() + require.NoError(t, err) + } + + var wg sync.WaitGroup + + // Open 5 sessions for the user and leave them open by running pg_sleep(30). + for _, user := range users { + for i := 0; i < 5; i++ { + wg.Add(1) + go func(user string) { + // Open a session for the target user. + targetDB := getUserConn(t, user, testCluster.Server(0)) + defer targetDB.Close() + defer wg.Done() + sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT pg_sleep(30)`) + }(user) + } + } + + // Open 3 sessions for the user and leave them idle by running version(). + for _, user := range users { + for i := 0; i < 3; i++ { + targetDB := getUserConn(t, user, testCluster.Server(0)) + defer targetDB.Close() + sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT version()`) + } + } + + countSessionStatus := func(allSessions []serverpb.Session) (int, int, int) { + var active, idle, closed int + for _, s := range allSessions { + if s.Status.String() == "ACTIVE" { + active++ + } + // IDLE sessions are open sessions with no active queries. + if s.Status.String() == "IDLE" { + idle++ + } + if s.Status.String() == "CLOSED" { + closed++ + } + } + return active, idle, closed + } + + // Make sure that each client has 10 closed sessions, 5 open sessions, and 3 idle sessions. + for _, user := range users { + sessionsResponse := doSessionsRequest(user) + allSessions := sessionsResponse.Sessions + sort.Slice(allSessions, func(i, j int) bool { + return allSessions[i].Start.Before(allSessions[j].Start) + }) + + active, idle, closed := countSessionStatus(allSessions) + require.Equal(t, 3, idle) + require.Equal(t, 5, active) + require.Equal(t, 10, closed) + } + + // Wait for the goroutines from the pg_sleep() command to finish, so we can + // safely close their connections. + wg.Wait() +} + func TestTransactionContentionEvents(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 582fcb136aef..6bf813072d30 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -143,6 +143,9 @@ func startTenantInternal( histogramWindowInterval: args.HistogramWindowInterval(), settings: args.Settings, }) + closedSessionCache := sql.NewClosedSessionCache( + baseCfg.Settings, args.monitorAndMetrics.rootSQLMemoryMonitor, time.Now) + args.closedSessionCache = closedSessionCache // Initialize gRPC server for use on shared port with pg grpcMain := newGRPCServer(args.rpcContext) @@ -208,7 +211,7 @@ func startTenantInternal( // the SQL server object. tenantStatusServer := newTenantStatusServer( baseCfg.AmbientCtx, &adminPrivilegeChecker{ie: args.circularInternalExecutor}, - args.sessionRegistry, args.flowScheduler, baseCfg.Settings, nil, + args.sessionRegistry, args.closedSessionCache, args.flowScheduler, baseCfg.Settings, nil, args.rpcContext, args.stopper, ) diff --git a/pkg/server/tenant_status.go b/pkg/server/tenant_status.go index e5d4e2093282..d0e9df2171e2 100644 --- a/pkg/server/tenant_status.go +++ b/pkg/server/tenant_status.go @@ -81,6 +81,7 @@ func newTenantStatusServer( ambient log.AmbientContext, privilegeChecker *adminPrivilegeChecker, sessionRegistry *sql.SessionRegistry, + closedSessionCache *sql.ClosedSessionCache, flowScheduler *flowinfra.FlowScheduler, st *cluster.Settings, sqlServer *SQLServer, @@ -90,14 +91,15 @@ func newTenantStatusServer( ambient.AddLogTag("tenant-status", nil) return &tenantStatusServer{ baseStatusServer: baseStatusServer{ - AmbientContext: ambient, - privilegeChecker: privilegeChecker, - sessionRegistry: sessionRegistry, - flowScheduler: flowScheduler, - st: st, - sqlServer: sqlServer, - rpcCtx: rpcCtx, - stopper: stopper, + AmbientContext: ambient, + privilegeChecker: privilegeChecker, + sessionRegistry: sessionRegistry, + closedSessionCache: closedSessionCache, + flowScheduler: flowScheduler, + st: st, + sqlServer: sqlServer, + rpcCtx: rpcCtx, + stopper: stopper, }, } } diff --git a/pkg/sql/closed_session_cache.go b/pkg/sql/closed_session_cache.go index f851d6db15cd..597f9cd29d62 100644 --- a/pkg/sql/closed_session_cache.go +++ b/pkg/sql/closed_session_cache.go @@ -11,13 +11,17 @@ package sql import ( + "context" "time" + "unsafe" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) type timeSource func() time.Time @@ -28,7 +32,7 @@ var ClosedSessionCacheCapacity = settings.RegisterIntSetting( settings.TenantWritable, "sql.closed_session_cache.capacity", "the maximum number of sessions in the cache", - 100, // TODO(gtr): Totally arbitrary for now, adjust later. + 1000, // TODO(gtr): Totally arbitrary for now, adjust later. ).WithPublic() // ClosedSessionCacheTimeToLive is the cluster setting that controls the maximum time @@ -47,14 +51,18 @@ type ClosedSessionCache struct { mu struct { syncutil.RWMutex + mon *mon.BytesMonitor data *cache.UnorderedCache } } // NewClosedSessionCache returns a new ClosedSessionCache. -func NewClosedSessionCache(st *cluster.Settings, timeSrc timeSource) *ClosedSessionCache { - c := &ClosedSessionCache{st: st, timeSrc: timeSrc} +func NewClosedSessionCache( + st *cluster.Settings, parentMon *mon.BytesMonitor, timeSrc timeSource, +) *ClosedSessionCache { + monitor := mon.NewMonitorInheritWithLimit("closed-session-cache", 0 /* limit*/, parentMon) + c := &ClosedSessionCache{st: st, timeSrc: timeSrc} c.mu.data = cache.NewUnorderedCache(cache.Config{ Policy: cache.CacheFIFO, ShouldEvict: func(size int, _, _ interface{}) bool { @@ -63,16 +71,31 @@ func NewClosedSessionCache(st *cluster.Settings, timeSrc timeSource) *ClosedSess }, }) + c.mu.mon = monitor + c.mu.mon.Start(context.Background(), parentMon, mon.BoundAccount{}) return c } -// Add adds a closed session to the ClosedSessionCache. -func (c *ClosedSessionCache) Add(id ClusterWideID, session serverpb.Session) { +// add adds a closed session to the ClosedSessionCache. +func (c *ClosedSessionCache) add( + ctx context.Context, id ClusterWideID, session serverpb.Session, +) error { c.mu.Lock() defer c.mu.Unlock() + end := timeutil.Now() + session.End = &end + session.Status = serverpb.Session_CLOSED node := &sessionNode{id: id, data: session, timestamp: c.timeSrc()} + + acc := c.mu.mon.MakeBoundAccount() + defer acc.Close(ctx) + if err := acc.Grow(ctx, int64(node.size())); err != nil { + return err + } + c.mu.data.Add(id, node) + return nil } func (c *ClosedSessionCache) size() int { @@ -146,3 +169,7 @@ func (n *sessionNode) getAge(now timeSource) time.Duration { func (n *sessionNode) getAgeString(now timeSource) string { return n.getAge(now).Round(time.Second).String() } + +func (n *sessionNode) size() int { + return int(unsafe.Sizeof(time.Time{})) +} diff --git a/pkg/sql/closed_session_cache_test.go b/pkg/sql/closed_session_cache_test.go index d6fbac8ccc93..00690e7edc67 100644 --- a/pkg/sql/closed_session_cache_test.go +++ b/pkg/sql/closed_session_cache_test.go @@ -13,6 +13,7 @@ package sql import ( "context" "fmt" + "math" "strings" "testing" "time" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/uint128" "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" @@ -40,15 +42,24 @@ func TestSessionCacheBasic(t *testing.T) { datadriven.Walk(t, testutils.TestDataPath(t, "closed_session_cache"), func(t *testing.T, path string) { datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + ctx := context.Background() switch d.Cmd { case "init": var capacity, timeToLive int d.ScanArgs(t, "capacity", &capacity) d.ScanArgs(t, "timeToLive", &timeToLive) - ctx := context.Background() st := &cluster.Settings{} - cache = NewClosedSessionCache(st, time.Now) + monitor := mon.NewUnlimitedMonitor( + ctx, + "test", + mon.MemoryResource, + nil, /* currCount */ + nil, /* maxHist */ + math.MaxInt64, + st, + ) + cache = NewClosedSessionCache(st, monitor, time.Now) ClosedSessionCacheCapacity.Override(ctx, &st.SV, int64(capacity)) ClosedSessionCacheTimeToLive.Override(ctx, &st.SV, int64(timeToLive)) @@ -62,7 +73,8 @@ func TestSessionCacheBasic(t *testing.T) { session := serverpb.Session{} sessionID := ClusterWideID{id} - cache.Add(sessionID, session) + err = cache.add(ctx, sessionID, session) + require.NoError(t, err) return fmt.Sprintf("cache_size: %d", cache.size()) case "addSessionBatch": @@ -79,7 +91,8 @@ func TestSessionCacheBasic(t *testing.T) { cache.timeSrc = addSeconds(cache.timeSrc, seconds) session := serverpb.Session{} sessionID := ClusterWideID{id} - cache.Add(sessionID, session) + err := cache.add(ctx, sessionID, session) + require.NoError(t, err) id = id.Add(1) } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 4402de7fc5be..d901ff48c14d 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1778,7 +1778,7 @@ func (ex *connExecutor) run( parentMon *mon.BytesMonitor, reserved mon.BoundAccount, onCancel context.CancelFunc, -) error { +) (err error) { if !ex.activated { ex.activate(ctx, parentMon, reserved) } @@ -1788,7 +1788,15 @@ func (ex *connExecutor) run( ex.sessionID = ex.generateID() ex.server.cfg.SessionRegistry.register(ex.sessionID, ex.queryCancelKey, ex) ex.planner.extendedEvalCtx.setSessionID(ex.sessionID) - defer ex.server.cfg.SessionRegistry.deregister(ex.sessionID, ex.queryCancelKey) + + defer func() { + ex.server.cfg.SessionRegistry.deregister(ex.sessionID, ex.queryCancelKey) + addErr := ex.server.cfg.ClosedSessionCache.add(ctx, ex.sessionID, ex.serialize()) + if addErr != nil { + err = errors.CombineErrors(err, addErr) + } + }() + for { ex.curStmtAST = nil if err := ctx.Err(); err != nil { @@ -3089,6 +3097,10 @@ func (ex *connExecutor) serialize() serverpb.Session { lastActiveQuery = truncateSQL(ex.mu.LastActiveQuery.String()) lastActiveQueryNoConstants = truncateSQL(formatStatementHideConstants(ex.mu.LastActiveQuery)) } + status := serverpb.Session_IDLE + if len(activeQueries) > 0 { + status = serverpb.Session_ACTIVE + } // We always use base here as the fields from the SessionData should always // be that of the root session. @@ -3111,6 +3123,7 @@ func (ex *connExecutor) serialize() serverpb.Session { AllocBytes: ex.mon.AllocBytes(), MaxAllocBytes: ex.mon.MaximumBytes(), LastActiveQueryNoConstants: lastActiveQueryNoConstants, + Status: status, } } diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index 73779f9c76cd..6fe480d55751 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -280,6 +280,12 @@ func startConnExecutor( } defer tempEngine.Close() ambientCtx := log.MakeTestingAmbientCtxWithNewTracer() + pool := mon.NewUnlimitedMonitor( + context.Background(), "test", mon.MemoryResource, + nil /* curCount */, nil /* maxHist */, math.MaxInt64, st, + ) + // This pool should never be Stop()ed because, if the test is failing, memory + // is not properly released. cfg := &ExecutorConfig{ AmbientCtx: ambientCtx, Settings: st, @@ -288,7 +294,8 @@ func startConnExecutor( SystemConfig: config.NewConstantSystemConfigProvider( config.NewSystemConfig(zonepb.DefaultZoneConfigRef()), ), - SessionRegistry: NewSessionRegistry(), + SessionRegistry: NewSessionRegistry(), + ClosedSessionCache: NewClosedSessionCache(st, pool, time.Now), NodeInfo: NodeInfo{ NodeID: nodeID, LogicalClusterID: func() uuid.UUID { return uuid.UUID{} }, @@ -326,12 +333,6 @@ func startConnExecutor( HistogramWindowInterval: base.DefaultHistogramWindowInterval(), CollectionFactory: descs.NewBareBonesCollectionFactory(st, keys.SystemSQLCodec), } - pool := mon.NewUnlimitedMonitor( - context.Background(), "test", mon.MemoryResource, - nil /* curCount */, nil /* maxHist */, math.MaxInt64, st, - ) - // This pool should never be Stop()ed because, if the test is failing, memory - // is not properly released. s := NewServer(cfg, pool) buf := NewStmtBuf() diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 7a5894ec8e22..88acfa7f02f5 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1625,7 +1625,7 @@ var crdbInternalLocalTxnsTable = virtualSchemaTable{ if err := p.RequireAdminRole(ctx, "read crdb_internal.node_transactions"); err != nil { return err } - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1644,7 +1644,7 @@ var crdbInternalClusterTxnsTable = virtualSchemaTable{ if err := p.RequireAdminRole(ctx, "read crdb_internal.cluster_transactions"); err != nil { return err } - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1719,8 +1719,13 @@ CREATE TABLE crdb_internal.%s ( phase STRING -- the current execution phase )` -func (p *planner) makeSessionsRequest(ctx context.Context) (serverpb.ListSessionsRequest, error) { - req := serverpb.ListSessionsRequest{Username: p.SessionData().User().Normalized()} +func (p *planner) makeSessionsRequest( + ctx context.Context, excludeClosed bool, +) (serverpb.ListSessionsRequest, error) { + req := serverpb.ListSessionsRequest{ + Username: p.SessionData().User().Normalized(), + ExcludeClosedSessions: excludeClosed, + } hasAdmin, err := p.HasAdminRole(ctx) if err != nil { return serverpb.ListSessionsRequest{}, err @@ -1770,7 +1775,7 @@ var crdbInternalLocalQueriesTable = virtualSchemaTable{ comment: "running queries visible by current user (RAM; local node only)", schema: fmt.Sprintf(queriesSchemaPattern, "node_queries"), populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1788,7 +1793,7 @@ var crdbInternalClusterQueriesTable = virtualSchemaTable{ comment: "running queries visible by current user (cluster RPC; expensive!)", schema: fmt.Sprintf(queriesSchemaPattern, "cluster_queries"), populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1889,7 +1894,9 @@ CREATE TABLE crdb_internal.%s ( oldest_query_start TIMESTAMP, -- the time when the oldest query in the session was started kv_txn STRING, -- the ID of the current KV transaction alloc_bytes INT, -- the number of bytes allocated by the session - max_alloc_bytes INT -- the high water mark of bytes allocated by the session + max_alloc_bytes INT, -- the high water mark of bytes allocated by the session + status STRING, -- the status of the session (open, closed) + session_end TIMESTAMP -- the time when the session was closed ) ` @@ -1899,7 +1906,7 @@ var crdbInternalLocalSessionsTable = virtualSchemaTable{ comment: "running sessions visible by current user (RAM; local node only)", schema: fmt.Sprintf(sessionsSchemaPattern, "node_sessions"), populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1917,7 +1924,7 @@ var crdbInternalClusterSessionsTable = virtualSchemaTable{ comment: "running sessions visible to current user (cluster RPC; expensive!)", schema: fmt.Sprintf(sessionsSchemaPattern, "cluster_sessions"), populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1969,6 +1976,13 @@ func populateSessionsTable( if err != nil { return err } + endTSDatum := tree.DNull + if session.End != nil { + endTSDatum, err = tree.MakeDTimestamp(*session.End, time.Microsecond) + if err != nil { + return err + } + } if err := addRow( tree.NewDInt(tree.DInt(session.NodeID)), sessionID, @@ -1982,6 +1996,8 @@ func populateSessionsTable( kvTxnIDDatum, tree.NewDInt(tree.DInt(session.AllocBytes)), tree.NewDInt(tree.DInt(session.MaxAllocBytes)), + tree.NewDString(session.Status.String()), + endTSDatum, ); err != nil { return err } @@ -2005,6 +2021,8 @@ func populateSessionsTable( tree.DNull, // kv_txn tree.DNull, // alloc_bytes tree.DNull, // max_alloc_bytes + tree.DNull, // status + tree.DNull, // session end ); err != nil { return err } diff --git a/pkg/sql/delegate/show_sessions.go b/pkg/sql/delegate/show_sessions.go index be8dbe806f48..9dc08aba57c7 100644 --- a/pkg/sql/delegate/show_sessions.go +++ b/pkg/sql/delegate/show_sessions.go @@ -16,7 +16,7 @@ import ( ) func (d *delegator) delegateShowSessions(n *tree.ShowSessions) (tree.Statement, error) { - const query = `SELECT node_id, session_id, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.` + const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.` table := `node_sessions` if n.Cluster { table = `cluster_sessions` diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index df577bb5f355..cf25aafaceb4 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1177,6 +1177,7 @@ type ExecutorConfig struct { RegionsServer serverpb.RegionsServer MetricsRecorder nodeStatusGenerator SessionRegistry *SessionRegistry + ClosedSessionCache *ClosedSessionCache SQLLiveness sqlliveness.Liveness JobRegistry *jobs.Registry VirtualSchemas *VirtualSchemaHolder @@ -2031,6 +2032,11 @@ func (r *SessionRegistry) SerializeAll() []serverpb.Session { r.Lock() defer r.Unlock() + return r.SerializeAllLocked() +} + +// SerializeAllLocked is like SerializeAll but assumes SessionRegistry's mutex is locked. +func (r *SessionRegistry) SerializeAllLocked() []serverpb.Session { response := make([]serverpb.Session, 0, len(r.sessions)) for _, s := range r.sessions { diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index 381929f40a45..e29788fbe7f1 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -363,15 +363,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries -query ITTTTTTTTTTT colnames +query ITTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes +node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTT colnames +query ITTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes +node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index 77d5cff749a9..ed7e14ad5c48 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -287,7 +287,9 @@ CREATE TABLE crdb_internal.cluster_sessions ( oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, alloc_bytes INT8 NULL, - max_alloc_bytes INT8 NULL + max_alloc_bytes INT8 NULL, + status STRING NULL, + session_end TIMESTAMP NULL ) CREATE TABLE crdb_internal.cluster_sessions ( node_id INT8 NOT NULL, session_id STRING NULL, @@ -300,7 +302,9 @@ CREATE TABLE crdb_internal.cluster_sessions ( oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, alloc_bytes INT8 NULL, - max_alloc_bytes INT8 NULL + max_alloc_bytes INT8 NULL, + status STRING NULL, + session_end TIMESTAMP NULL ) {} {} CREATE TABLE crdb_internal.cluster_settings ( variable STRING NOT NULL, @@ -906,7 +910,9 @@ CREATE TABLE crdb_internal.node_sessions ( oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, alloc_bytes INT8 NULL, - max_alloc_bytes INT8 NULL + max_alloc_bytes INT8 NULL, + status STRING NULL, + session_end TIMESTAMP NULL ) CREATE TABLE crdb_internal.node_sessions ( node_id INT8 NOT NULL, session_id STRING NULL, @@ -919,7 +925,9 @@ CREATE TABLE crdb_internal.node_sessions ( oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, alloc_bytes INT8 NULL, - max_alloc_bytes INT8 NULL + max_alloc_bytes INT8 NULL, + status STRING NULL, + session_end TIMESTAMP NULL ) {} {} CREATE TABLE crdb_internal.node_statement_statistics ( node_id INT8 NOT NULL, diff --git a/pkg/sql/temporary_schema.go b/pkg/sql/temporary_schema.go index 7510da8b93ce..125329b73fd5 100644 --- a/pkg/sql/temporary_schema.go +++ b/pkg/sql/temporary_schema.go @@ -570,7 +570,9 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup( var err error response, err = c.statusServer.ListSessions( ctx, - &serverpb.ListSessionsRequest{}, + &serverpb.ListSessionsRequest{ + ExcludeClosedSessions: true, + }, ) if response != nil && len(response.Errors) > 0 && err == nil { diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts index c9f3c9b3010e..05140c7c720f 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts @@ -21,6 +21,8 @@ import { CancelSessionRequestMessage, } from "src/api/terminateQueryApi"; +import Status = cockroach.server.serverpb.Session.Status; + const history = createMemoryHistory({ initialEntries: ["/sessions"] }); const toUuid = function(s: string): Uint8Array { @@ -45,6 +47,7 @@ export const idleSession: SessionInfo = { alloc_bytes: Long.fromNumber(0), max_alloc_bytes: Long.fromNumber(10240), active_queries: [], + status: Status.ACTIVE, toJSON: () => ({}), }, }; @@ -82,6 +85,7 @@ export const idleTransactionSession: SessionInfo = { }, last_active_query_no_constants: "SHOW database", active_queries: [], + status: Status.ACTIVE, toJSON: () => ({}), }, }; @@ -133,6 +137,7 @@ export const activeSession: SessionInfo = { num_auto_retries: 3, }, last_active_query_no_constants: "SHOW database", + status: Status.ACTIVE, toJSON: () => ({}), }, };