Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
78650: server, sql: expose closed sessions to ListSessions endpoint r=gtr a=gtr

This is the second phase of cockroachdb#67888 which is to expose closed sessions to
the ListSessions endpoint.

Previously, the ListSessions endpoint only returned open sessions. This
change builds on top of the previous PR to add the `ClosedSessionsCache`
and now allows the ListSessions endpoint to also return closed sessions
in its response. The `ListSessionsRequest` object was edited to include a flag
`exclude_closed_sessions` which is a boolean to exclude closed sessions. If 
unspecified, defaults to false and closed sessions are included in the 
response.

Additionally, the `serverpb.Session` object was updated
to include new `end` and `status` fields which specify the time the
session ended and the status (opened, closed) of the session,
respectively.

To test the changes, I made a test `TestListClosedSessions` which creates three
users. For each user, we create 10 closed sessions, 5 open sessions, and 3 idle
sessions. 

- The closed sessions are created by opening 10 DB connections and then
closing them right after:
	```go
	// Open 10 sessions for each user and then close them.
	for _, user := range users {
	    for i := 0; i < 10; i++ {
	        targetDB := getUserConn(t, user, testCluster.Server(0))
	        dbs = append(dbs, targetDB)
	        sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT version()`)
	    }
	}

	for _, db := range dbs {
	    err := db.Close()
	    require.NoError(t, err)
	}
	```

- The open sessions are created by opening up 5 DB connections and running 
`pg_sleep(30)` concurrently:
	```go
    // Open 5 sessions for the user and leave them open by running pg_sleep(30).
	for _, user := range users {
		for i := 0; i < 5; i++ {
			wg.Add(1)
			go func(user string) {
				// Open a session for the target user.
				targetDB := getUserConn(t, user, testCluster.Server(0))
				defer targetDB.Close()
				defer wg.Done()
				sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT pg_sleep(30)`)
			}(user)
		}
	}
	```

- The idle sessions are created by opening up 3 DB connections and running
`SELECT version()` (which finishes executing almost instantaneously) but 
deferring the `db.Close()` call until the end of the test:
	```go
	// Open 3 sessions for each user and leave them idle by running version().
	for _, user := range users {
	    for i := 0; i < 3; i++ {
	        targetDB := getUserConn(t, user, testCluster.Server(0))
	        defer targetDB.Close()
	        sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT version()`)
	    }
	}
	```

These are the results for the three users: 
```
username: test_user_a
-------------------------------------
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: ACTIVE
Status: ACTIVE
Status: ACTIVE
Status: ACTIVE
Status: ACTIVE
Status: IDLE
Status: IDLE
Status: IDLE
username: test_user_b
-------------------------------------
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: ACTIVE
Status: ACTIVE
Status: ACTIVE
Status: ACTIVE
Status: ACTIVE
Status: IDLE
Status: IDLE
Status: IDLE
username: test_user_c
-------------------------------------
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: CLOSED
Status: ACTIVE
Status: ACTIVE
Status: ACTIVE
Status: ACTIVE
Status: ACTIVE
Status: IDLE
Status: IDLE
Status: IDLE
```

Release note (api change): `ListSessions` now returns closed sessions in
addition to open sessions; `ListSessionsRequest` now has a 
`exclude_closed_sessions` flag; `serverpb.Session` now has `end` and `status`
fields.

Co-authored-by: Gerardo Torres <[email protected]>
  • Loading branch information
craig[bot] and Gerardo Torres committed Apr 22, 2022
2 parents 11a0a9f + 1c36b6e commit 2f8938f
Show file tree
Hide file tree
Showing 26 changed files with 398 additions and 72 deletions.
6 changes: 6 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -2063,6 +2063,7 @@ Request object for ListSessions and ListLocalSessions.
| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| username | [string](#cockroach.server.serverpb.ListSessionsRequest-string) | | Username of the user making this request. The caller is responsible to normalize the username (= case fold and perform unicode NFC normalization). | [reserved](#support-status) |
| exclude_closed_sessions | [bool](#cockroach.server.serverpb.ListSessionsRequest-bool) | | Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response. | [reserved](#support-status) |



Expand Down Expand Up @@ -2108,6 +2109,8 @@ Session represents one SQL session.
| max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) |
| active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) |
| last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) |
| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) |
| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) |



Expand Down Expand Up @@ -2194,6 +2197,7 @@ Request object for ListSessions and ListLocalSessions.
| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| username | [string](#cockroach.server.serverpb.ListSessionsRequest-string) | | Username of the user making this request. The caller is responsible to normalize the username (= case fold and perform unicode NFC normalization). | [reserved](#support-status) |
| exclude_closed_sessions | [bool](#cockroach.server.serverpb.ListSessionsRequest-bool) | | Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response. | [reserved](#support-status) |



Expand Down Expand Up @@ -2239,6 +2243,8 @@ Session represents one SQL session.
| max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) |
| active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) |
| last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) |
| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) |
| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) |



Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ server.web_session.purge.period duration 1h0m0s the time until old sessions are
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
sql.auth.resolve_membership_single_scan.enabled boolean true determines whether to populate the role membership cache with a single scan
sql.closed_session_cache.capacity integer 100 the maximum number of sessions in the cache
sql.closed_session_cache.capacity integer 1000 the maximum number of sessions in the cache
sql.closed_session_cache.time_to_live integer 3600 the maximum time to live, in seconds
sql.contention.event_store.capacity byte size 64 MiB the in-memory storage capacity per-node of contention event store
sql.contention.event_store.duration_threshold duration 0s minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
<tr><td><code>server.web_session.purge.ttl</code></td><td>duration</td><td><code>1h0m0s</code></td><td>if nonzero, entries in system.web_sessions older than this duration are periodically purged</td></tr>
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.auth.resolve_membership_single_scan.enabled</code></td><td>boolean</td><td><code>true</code></td><td>determines whether to populate the role membership cache with a single scan</td></tr>
<tr><td><code>sql.closed_session_cache.capacity</code></td><td>integer</td><td><code>100</code></td><td>the maximum number of sessions in the cache</td></tr>
<tr><td><code>sql.closed_session_cache.capacity</code></td><td>integer</td><td><code>1000</code></td><td>the maximum number of sessions in the cache</td></tr>
<tr><td><code>sql.closed_session_cache.time_to_live</code></td><td>integer</td><td><code>3600</code></td><td>the maximum time to live, in seconds</td></tr>
<tr><td><code>sql.contention.event_store.capacity</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the in-memory storage capacity per-node of contention event store</td></tr>
<tr><td><code>sql.contention.event_store.duration_threshold</code></td><td>duration</td><td><code>0s</code></td><td>minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events</td></tr>
Expand Down
21 changes: 21 additions & 0 deletions docs/generated/swagger/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,12 @@
"name": "username",
"in": "query"
},
{
"type": "bool",
"description": "Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response.",
"name": "exclude_closed_sessions",
"in": "query"
},
{
"type": "integer",
"description": "Maximum number of results to return in this call.",
Expand Down Expand Up @@ -1180,6 +1186,12 @@
"type": "string",
"x-go-name": "ClientAddress"
},
"end": {
"description": "Timestamp of session's end.",
"type": "string",
"format": "date-time",
"x-go-name": "End"
},
"id": {
"description": "ID of the session (uint128 represented as raw bytes).",
"type": "array",
Expand Down Expand Up @@ -1214,6 +1226,9 @@
"format": "date-time",
"x-go-name": "Start"
},
"status": {
"$ref": "#/definitions/Session_Status"
},
"username": {
"description": "Username of the user for this session.",
"type": "string",
Expand All @@ -1222,6 +1237,12 @@
},
"x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb"
},
"Session_Status": {
"type": "integer",
"format": "int32",
"title": "Enum for sessions status.",
"x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb"
},
"StoreID": {
"type": "integer",
"format": "int32",
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0
----
id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries

query ITTTTTTTTTTT colnames
query ITTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0
----
node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes
node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end

query ITTTTTTTTTTT colnames
query ITTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0
----
node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes
node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end

query IIITTTI colnames
SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,8 @@ WHERE

func selectClusterSessionIDs(t *testing.T, conn *sqlutils.SQLRunner) []string {
var sessionIDs []string
rows := conn.QueryStr(t, "SELECT session_id FROM crdb_internal.cluster_sessions")
rows := conn.QueryStr(t,
"SELECT session_id FROM crdb_internal.cluster_sessions WHERE status = 'ACTIVE' OR status = 'IDLE'")
for _, row := range rows {
sessionIDs = append(sessionIDs, row[0])
}
Expand All @@ -822,7 +823,7 @@ func testTenantStatusCancelSession(t *testing.T, helper *tenantTestHelper) {
httpPod1 := helper.testCluster().tenantAdminHTTPClient(t, 1)
defer httpPod1.Close()
listSessionsResp := serverpb.ListSessionsResponse{}
httpPod1.GetJSON("/_status/sessions", &listSessionsResp)
httpPod1.GetJSON("/_status/sessions?exclude_closed_sessions=true", &listSessionsResp)
var session serverpb.Session
for _, s := range listSessionsResp.Sessions {
if s.LastActiveQuery == "SELECT 1" {
Expand Down
9 changes: 8 additions & 1 deletion pkg/server/api_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ type listSessionsResponse struct {
// description: Username of user to return sessions for; if unspecified,
// sessions from all users are returned.
// required: false
// - name: exclude_closed_sessions
// type: bool
// in: query
// description: Boolean to exclude closed sessions; if unspecified, defaults
// to false and closed sessions are included in the response.
// required: false
// - name: limit
// type: integer
// in: query
Expand All @@ -250,7 +256,8 @@ func (a *apiV2Server) listSessions(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
limit, start := getRPCPaginationValues(r)
reqUsername := r.URL.Query().Get("username")
req := &serverpb.ListSessionsRequest{Username: reqUsername}
reqExcludeClosed := r.URL.Query().Get("exclude_closed_sessions") == "true"
req := &serverpb.ListSessionsRequest{Username: reqUsername, ExcludeClosedSessions: reqExcludeClosed}
response := &listSessionsResponse{}
outgoingCtx := apiToOutgoingGatewayCtx(ctx, r)

Expand Down
1 change: 1 addition & 0 deletions pkg/server/api_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestListSessionsV2(t *testing.T) {
req, err := http.NewRequest("GET", ts1.AdminURL()+apiV2Path+"sessions/", nil)
require.NoError(t, err)
query := req.URL.Query()
query.Add("exclude_closed_sessions", "true")
if limit > 0 {
query.Add("limit", strconv.Itoa(limit))
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
sHTTP := newHTTPServer(cfg.BaseConfig, rpcContext, parseNodeIDFn, getNodeIDHTTPAddressFn)

sessionRegistry := sql.NewSessionRegistry()
closedSessionCache := sql.NewClosedSessionCache(cfg.Settings, sqlMonitorAndMetrics.rootSQLMemoryMonitor, time.Now)
flowScheduler := flowinfra.NewFlowScheduler(cfg.AmbientCtx, stopper, st)

sStatus := newStatusServer(
Expand All @@ -706,6 +707,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
node.stores,
stopper,
sessionRegistry,
closedSessionCache,
flowScheduler,
internalExecutor,
)
Expand Down Expand Up @@ -756,6 +758,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
registry: registry,
recorder: recorder,
sessionRegistry: sessionRegistry,
closedSessionCache: closedSessionCache,
flowScheduler: flowScheduler,
circularInternalExecutor: internalExecutor,
circularJobRegistry: jobRegistry,
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type SQLServer struct {
// sessionRegistry can be queried for info on running SQL sessions. It is
// shared between the sql.Server and the statusServer.
sessionRegistry *sql.SessionRegistry
closedSessionCache *sql.ClosedSessionCache
jobRegistry *jobs.Registry
startupMigrationsMgr *startupmigrations.Manager
statsRefresher *stats.Refresher
Expand Down Expand Up @@ -273,6 +274,9 @@ type sqlServerArgs struct {
// Used for SHOW/CANCEL QUERIE(S)/SESSION(S).
sessionRegistry *sql.SessionRegistry

// Used to store closed sessions.
closedSessionCache *sql.ClosedSessionCache

// Used to track the DistSQL flows scheduled on this node but initiated on
// behalf of other nodes.
flowScheduler *flowinfra.FlowScheduler
Expand Down Expand Up @@ -728,6 +732,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
SQLStatusServer: cfg.sqlStatusServer,
RegionsServer: cfg.regionsServer,
SessionRegistry: cfg.sessionRegistry,
ClosedSessionCache: cfg.closedSessionCache,
ContentionRegistry: contentionRegistry,
SQLLiveness: cfg.sqlLivenessProvider,
JobRegistry: jobRegistry,
Expand Down Expand Up @@ -1061,6 +1066,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
tracingService: tracingService,
tenantConnect: cfg.tenantConnect,
sessionRegistry: cfg.sessionRegistry,
closedSessionCache: cfg.closedSessionCache,
jobRegistry: jobRegistry,
statsRefresher: statsRefresher,
temporaryObjectCleaner: temporaryObjectCleaner,
Expand Down
14 changes: 14 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,9 @@ message ListSessionsRequest {
// The caller is responsible to normalize the username
// (= case fold and perform unicode NFC normalization).
string username = 1;
// Boolean to exclude closed sessions; if unspecified, defaults
// to false and closed sessions are included in the response.
bool exclude_closed_sessions = 2;
}

// Session represents one SQL session.
Expand Down Expand Up @@ -944,6 +947,17 @@ message Session {
// The SQL statement fingerprint of the last query executed on this session,
// compatible with StatementStatisticsKey.
string last_active_query_no_constants = 13;
// Enum for sessions status.
enum Status {
ACTIVE = 0;
CLOSED = 1;
IDLE = 2;
}
// The session's status.
Status status = 14;
// Timestamp of session's end.
google.protobuf.Timestamp end = 15
[ (gogoproto.nullable) = true, (gogoproto.stdtime) = true ];
}

// An error wrapper object for ListSessionsResponse.
Expand Down
58 changes: 41 additions & 17 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,14 @@ type baseStatusServer struct {
serverpb.UnimplementedStatusServer

log.AmbientContext
privilegeChecker *adminPrivilegeChecker
sessionRegistry *sql.SessionRegistry
flowScheduler *flowinfra.FlowScheduler
st *cluster.Settings
sqlServer *SQLServer
rpcCtx *rpc.Context
stopper *stop.Stopper
privilegeChecker *adminPrivilegeChecker
sessionRegistry *sql.SessionRegistry
closedSessionCache *sql.ClosedSessionCache
flowScheduler *flowinfra.FlowScheduler
st *cluster.Settings
sqlServer *SQLServer
rpcCtx *rpc.Context
stopper *stop.Stopper
}

// getLocalSessions returns a list of local sessions on this node. Note that the
Expand Down Expand Up @@ -199,9 +200,22 @@ func (b *baseStatusServer) getLocalSessions(
// The empty username means "all sessions".
showAll := reqUsername.Undefined()

registry := b.sessionRegistry
sessions := registry.SerializeAll()
userSessions := make([]serverpb.Session, 0, len(sessions))
// In order to avoid duplicate sessions showing up as both open and closed,
// we lock the session registry to prevent any changes to it while we
// serialize the sessions from the session registry and the closed session
// cache.
b.sessionRegistry.Lock()
sessions := b.sessionRegistry.SerializeAllLocked()

var closedSessions []serverpb.Session
if !req.ExcludeClosedSessions {
closedSessions = b.closedSessionCache.GetSerializedSessions()
}

b.sessionRegistry.Unlock()

userSessions := make([]serverpb.Session, 0, len(sessions)+len(closedSessions))
sessions = append(sessions, closedSessions...)

for _, session := range sessions {
if reqUsername.Normalized() != session.Username && !showAll {
Expand All @@ -220,6 +234,11 @@ func (b *baseStatusServer) getLocalSessions(

userSessions = append(userSessions, session)
}

sort.Slice(userSessions, func(i, j int) bool {
return userSessions[i].Start.Before(userSessions[j].Start)
})

return userSessions, nil
}

Expand Down Expand Up @@ -501,19 +520,21 @@ func newStatusServer(
stores *kvserver.Stores,
stopper *stop.Stopper,
sessionRegistry *sql.SessionRegistry,
closedSessionCache *sql.ClosedSessionCache,
flowScheduler *flowinfra.FlowScheduler,
internalExecutor *sql.InternalExecutor,
) *statusServer {
ambient.AddLogTag("status", nil)
server := &statusServer{
baseStatusServer: &baseStatusServer{
AmbientContext: ambient,
privilegeChecker: adminAuthzCheck,
sessionRegistry: sessionRegistry,
flowScheduler: flowScheduler,
st: st,
rpcCtx: rpcCtx,
stopper: stopper,
AmbientContext: ambient,
privilegeChecker: adminAuthzCheck,
sessionRegistry: sessionRegistry,
closedSessionCache: closedSessionCache,
flowScheduler: flowScheduler,
st: st,
rpcCtx: rpcCtx,
stopper: stopper,
},
cfg: cfg,
admin: adminServer,
Expand Down Expand Up @@ -2744,6 +2765,9 @@ func (s *statusServer) listSessionsHelper(
}
sessions := nodeResp.([]serverpb.Session)
response.Sessions = append(response.Sessions, sessions...)
sort.Slice(response.Sessions, func(i, j int) bool {
return response.Sessions[i].Start.Before(response.Sessions[j].Start)
})
}
errorFn := func(nodeID roachpb.NodeID, err error) {
errResponse := serverpb.ListSessionsError{NodeID: nodeID, Message: err.Error()}
Expand Down
Loading

0 comments on commit 2f8938f

Please sign in to comment.