From d9304ce6d527b38882cac4f370e63e88ea373310 Mon Sep 17 00:00:00 2001 From: Gerardo Torres Date: Thu, 28 Apr 2022 11:36:31 -0400 Subject: [PATCH] server, sql: collect additional session details Partially addresses #74257. Previously, the sql server did not provide additional session details such as total number of transactions executed, transaction fingerprint IDs, and total active time. This change adds the aforementioned session details to the `serverpb.Session` struct. To track transaction fingerprint IDs, a new circular buffer `TxnFingerprintIDBuffer` is introduced with its corresponding cluster setting `TxnFingerprintIDBufferCapacity` which controls the capacity. The total number of transactions executed is accumulated using the `txnCounter` from the `extraTxnState` in `connExecutor`. The total active time is calculated by a `timeutil.StopWatch` which is started and stopped when a transaction is started, restarted, and finished. Release note (api change): the `serverpb.Session` struct now has three new fields: number of transactions executed, transaction fingerprint IDs, and total active time. --- docs/generated/http/full.md | 6 + .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + docs/generated/swagger/spec.json | 29 +++ .../testdata/logic_test/crdb_internal_tenant | 8 +- pkg/server/serverpb/status.proto | 9 + pkg/sql/conn_executor.go | 21 ++ pkg/sql/conn_executor_exec.go | 13 +- pkg/sql/crdb_internal.go | 5 +- pkg/sql/delegate/show_sessions.go | 2 +- .../testdata/logic_test/crdb_internal | 8 +- .../testdata/logic_test/create_statements | 4 + pkg/sql/testdata/txn_fingerprint_id_buffer | 95 +++++++++ pkg/sql/txn_fingerprint_id_buffer.go | 182 ++++++++++++++++++ pkg/sql/txn_fingerprint_id_buffer_test.go | 104 ++++++++++ .../src/sessions/sessionsPage.fixture.ts | 8 + 16 files changed, 485 insertions(+), 11 deletions(-) create mode 100644 pkg/sql/testdata/txn_fingerprint_id_buffer create mode 100644 pkg/sql/txn_fingerprint_id_buffer.go create mode 100644 pkg/sql/txn_fingerprint_id_buffer_test.go diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index e318ab43dd71..b93c50b858c6 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -2108,9 +2108,12 @@ Session represents one SQL session. | alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | Number of currently allocated bytes in the session memory monitor. | [reserved](#support-status) | | max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) | | active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) | +| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Number of transactions that were executed so far on this session. | [reserved](#support-status) | +| txnFingerprintIDs | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) | | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | | status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | | end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | +| totalActiveTime | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) | @@ -2242,9 +2245,12 @@ Session represents one SQL session. | alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | Number of currently allocated bytes in the session memory monitor. | [reserved](#support-status) | | max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) | | active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) | +| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Number of transactions that were executed so far on this session. | [reserved](#support-status) | +| txnFingerprintIDs | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) | | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | | status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | | end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | +| totalActiveTime | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) | diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index ed9ad06da725..593b601d02c5 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -186,6 +186,7 @@ sql.ttl.default_range_concurrency integer 1 default amount of ranges to process sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job sql.ttl.job.enabled boolean true whether the TTL job is enabled sql.ttl.range_batch_size integer 100 amount of ranges to fetch at a time for a table during the TTL job +sql.txn_fingerprint_id_buffer.capacity integer 100 the maximum number of txn fingerprint IDs stored timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index e444e83a3c6c..ebd50141da88 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -202,6 +202,7 @@ sql.ttl.default_select_batch_sizeinteger500default amount of rows to select in a single query during a TTL job sql.ttl.job.enabledbooleantruewhether the TTL job is enabled sql.ttl.range_batch_sizeinteger100amount of ranges to fetch at a time for a table during the TTL job +sql.txn_fingerprint_id_buffer.capacityinteger100the maximum number of txn fingerprint IDs stored timeseries.storage.enabledbooleantrueif set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere timeseries.storage.resolution_10s.ttlduration240h0m0sthe maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. timeseries.storage.resolution_30m.ttlduration2160h0m0sthe maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. diff --git a/docs/generated/swagger/spec.json b/docs/generated/swagger/spec.json index 69ffddbb0389..65b96b6da65f 100644 --- a/docs/generated/swagger/spec.json +++ b/docs/generated/swagger/spec.json @@ -768,6 +768,12 @@ }, "x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb" }, + "Duration": { + "description": "A Duration represents the elapsed time between two instants\nas an int64 nanosecond count. The representation limits the\nlargest representable duration to approximately 290 years.", + "type": "integer", + "format": "int64", + "x-go-package": "time" + }, "EventsResponse": { "description": "EventsResponse contains a set of event log entries. This is always limited\nto the latest N entries (N is enforced in the associated endpoint).", "type": "object", @@ -1220,6 +1226,12 @@ "node_id": { "$ref": "#/definitions/NodeID" }, + "num_txns_executed": { + "description": "num_txns_executed is the number of transactions that were executed so\nfar on this session.", + "type": "integer", + "format": "int32", + "x-go-name": "NumTxnsExecuted" + }, "start": { "description": "Timestamp of session's start.", "type": "string", @@ -1229,6 +1241,17 @@ "status": { "$ref": "#/definitions/Session_Status" }, + "totalActiveTime": { + "$ref": "#/definitions/Duration" + }, + "txnFingerprintIDs": { + "description": "List of transaction fingerprint IDs in this session.", + "type": "array", + "items": { + "$ref": "#/definitions/TransactionFingerprintID" + }, + "x-go-name": "TxnFingerprintIDs" + }, "username": { "description": "Username of the user for this session.", "type": "string", @@ -1493,6 +1516,12 @@ }, "x-go-package": "github.com/cockroachdb/cockroach/pkg/util/hlc" }, + "TransactionFingerprintID": { + "description": "TransactionFingerprintID is the hashed string constructed using the\nindividual statement fingerprint IDs that comprise the transaction.", + "type": "integer", + "format": "uint64", + "x-go-package": "github.com/cockroachdb/cockroach/pkg/roachpb" + }, "TxnInfo": { "type": "object", "title": "TxnInfo represents an in flight user transaction on some Session.", diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant index daeaa524be6b..e92efa21cc3e 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant +++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant @@ -243,15 +243,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index f6c5ab35c2d2..c23400eb9e57 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -944,6 +944,12 @@ message Session { // Information about the txn in progress on this session. Nil if the // session doesn't currently have a transaction. TxnInfo active_txn = 12; + // Number of transactions that were executed so far on this session. + int32 num_txns_executed = 16; + // List of transaction fingerprint IDs in this session. + repeated uint64 txnFingerprintIDs = 17 [(gogoproto.customname) = "TxnFingerprintIDs", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID", + (gogoproto.nullable) = false]; // The SQL statement fingerprint of the last query executed on this session, // compatible with StatementStatisticsKey. string last_active_query_no_constants = 13; @@ -958,6 +964,9 @@ message Session { // Timestamp of session's end. google.protobuf.Timestamp end = 15 [ (gogoproto.nullable) = true, (gogoproto.stdtime) = true ]; + // The session's total active time. + google.protobuf.Duration totalActiveTime = 18 [(gogoproto.nullable) = false, + (gogoproto.stdduration) = true]; } // An error wrapper object for ListSessionsResponse. diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 8f6e937266c4..df00ad12b80b 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -883,6 +883,8 @@ func (s *Server) newConnExecutor( stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder, indexUsageStats: s.indexUsageStats, txnIDCacheWriter: s.txnIDCache, + totalActiveTimeStopWatch: timeutil.NewStopWatch(), + TxnFingerprintIDBuffer: NewTxnFingerprintIDBuffer(s.cfg.Settings, s.cfg.RootMemoryMonitor), } ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount @@ -1497,6 +1499,17 @@ type connExecutor struct { // txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the // Transaction ID Cache. txnIDCacheWriter txnidcache.Writer + + // totalTxnsExecuted tracks the total number of transactions executed during + // the lifetime of the session. + totalTxnsExecuted int + + // TxnFingerprintIDBuffer is a circular buffer keeping track of the + // txnFingerprintIDs in this session. + TxnFingerprintIDBuffer *TxnFingerprintIDBuffer + + // totalActiveTimeStopWatch tracks the total active time of the session. + totalActiveTimeStopWatch *timeutil.StopWatch } // ctxHolder contains a connection's context and, while session tracing is @@ -1661,6 +1674,9 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err ex.extraTxnState.createdSequences = make(map[descpb.ID]struct{}) + // Increment the totalTxnsExecuted count. + ex.totalTxnsExecuted += ex.extraTxnState.txnCounter + switch ev.eventType { case txnCommit, txnRollback: for name, p := range ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals { @@ -3115,6 +3131,8 @@ func (ex *connExecutor) serialize() serverpb.Session { remoteStr = sd.RemoteAddr.String() } + txnFingerprintIDs := ex.TxnFingerprintIDBuffer.GetAllTxnFingerprintIDs() + return serverpb.Session{ Username: sd.SessionUser().Normalized(), ClientAddress: remoteStr, @@ -3122,12 +3140,15 @@ func (ex *connExecutor) serialize() serverpb.Session { Start: ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionInit).UTC(), ActiveQueries: activeQueries, ActiveTxn: activeTxnInfo, + NumTxnsExecuted: int32(ex.totalTxnsExecuted), + TxnFingerprintIDs: txnFingerprintIDs, LastActiveQuery: lastActiveQuery, ID: ex.sessionID.GetBytes(), AllocBytes: ex.mon.AllocBytes(), MaxAllocBytes: ex.mon.MaximumBytes(), LastActiveQueryNoConstants: lastActiveQueryNoConstants, Status: status, + TotalActiveTime: ex.totalActiveTimeStopWatch.Elapsed(), } } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 536429c99817..772d5fe22776 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2101,6 +2101,12 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndExecTransaction, timeutil.Now()) transactionFingerprintID := roachpb.TransactionFingerprintID(ex.extraTxnState.transactionStatementsHash.Sum()) + err := ex.TxnFingerprintIDBuffer.Enqueue(transactionFingerprintID) + if err != nil { + if log.V(1) { + log.Warningf(ctx, "failed to enqueue transactionFingerprintID = %d: %s", transactionFingerprintID, err) + } + } if !implicit { ex.statsCollector.EndExplicitTransaction( ctx, @@ -2114,7 +2120,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { transactionFingerprintID, ) } - err := ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart) + err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart) if err != nil { if log.V(1) { log.Warningf(ctx, "failed to record transaction stats: %s", err) @@ -2126,6 +2132,8 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { func (ex *connExecutor) onTxnRestart(ctx context.Context) { if ex.extraTxnState.shouldExecuteOnTxnRestart { + ex.totalActiveTimeStopWatch.Stop() + defer ex.totalActiveTimeStopWatch.Start() ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionMostRecentStartExecTransaction, timeutil.Now()) ex.extraTxnState.transactionStatementFingerprintIDs = nil ex.extraTxnState.transactionStatementsHash = util.MakeFNV64() @@ -2157,6 +2165,8 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { ex.state.mu.RUnlock() implicit := ex.implicitTxn() + ex.totalActiveTimeStopWatch.Start() + // Transaction received time is the time at which the statement that prompted // the creation of this transaction was received. ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionReceived, @@ -2210,6 +2220,7 @@ func (ex *connExecutor) recordTransactionFinish( txnEnd := timeutil.Now() txnTime := txnEnd.Sub(txnStart) + ex.totalActiveTimeStopWatch.Stop() if ex.executorType != executorTypeInternal { ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1) } diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 53809d47f5d4..b601f2845ea5 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1615,7 +1615,7 @@ CREATE TABLE crdb_internal.%s ( node_id INT, -- the ID of the node running the transaction session_id STRING, -- the ID of the session start TIMESTAMP, -- the start time of the transaction - txn_string STRING, -- the string representation of the transcation + txn_string STRING, -- the string representation of the transaction application_name STRING, -- the name of the application as per SET application_name num_stmts INT, -- the number of statements executed so far num_retries INT, -- the number of times the transaction was restarted @@ -1894,6 +1894,7 @@ CREATE TABLE crdb_internal.%s ( application_name STRING, -- the name of the application as per SET application_name active_queries STRING, -- the currently running queries as SQL last_active_query STRING, -- the query that finished last on this session as SQL + num_txns_executed INT, -- the number of transactions that were executed so far on this session session_start TIMESTAMP, -- the time when the session was opened oldest_query_start TIMESTAMP, -- the time when the oldest query in the session was started kv_txn STRING, -- the ID of the current KV transaction @@ -1995,6 +1996,7 @@ func populateSessionsTable( tree.NewDString(session.ApplicationName), tree.NewDString(activeQueries.String()), tree.NewDString(session.LastActiveQuery), + tree.NewDInt(tree.DInt(session.NumTxnsExecuted)), startTSDatum, oldestStartDatum, kvTxnIDDatum, @@ -2020,6 +2022,7 @@ func populateSessionsTable( tree.DNull, // application name tree.NewDString("-- "+rpcErr.Message), // active queries tree.DNull, // last active query + tree.DNull, // num txns executed tree.DNull, // session start tree.DNull, // oldest_query_start tree.DNull, // kv_txn diff --git a/pkg/sql/delegate/show_sessions.go b/pkg/sql/delegate/show_sessions.go index 9dc08aba57c7..638d0358550b 100644 --- a/pkg/sql/delegate/show_sessions.go +++ b/pkg/sql/delegate/show_sessions.go @@ -16,7 +16,7 @@ import ( ) func (d *delegator) delegateShowSessions(n *tree.ShowSessions) (tree.Statement, error) { - const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.` + const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, num_txns_executed, session_start, oldest_query_start FROM crdb_internal.` table := `node_sessions` if n.Cluster { table = `cluster_sessions` diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index 0f49f9c0c072..65ff7d342f19 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -364,15 +364,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index 3c197f49016d..6c37ac511b1d 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -316,6 +316,7 @@ CREATE TABLE crdb_internal.cluster_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -331,6 +332,7 @@ CREATE TABLE crdb_internal.cluster_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -939,6 +941,7 @@ CREATE TABLE crdb_internal.node_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -954,6 +957,7 @@ CREATE TABLE crdb_internal.node_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, diff --git a/pkg/sql/testdata/txn_fingerprint_id_buffer b/pkg/sql/testdata/txn_fingerprint_id_buffer new file mode 100644 index 000000000000..918cf08f34e1 --- /dev/null +++ b/pkg/sql/testdata/txn_fingerprint_id_buffer @@ -0,0 +1,95 @@ +# Initialize TxnFingerprintIDBuffer +init capacity=10 +---- +buffer_size: 0 + +# Add four TxnFingerprintIDs: 5, 6, 7, 8 +enqueue id=05 +---- +buffer_size: 1 + +enqueue id=06 +---- +buffer_size: 2 + +enqueue id=07 +---- +buffer_size: 3 + +enqueue id=08 +---- +buffer_size: 4 + +dequeue +---- +txnFingerprintID: 5 + +dequeue +---- +txnFingerprintID: 6 + +# There should be 2 valid TxnFingerprintIDs +show +---- +0 -> 0 +1 -> 0 +2 -> 7 +3 -> 8 +4 -> 0 +5 -> 0 +6 -> 0 +7 -> 0 +8 -> 0 +9 -> 0 + +enqueue id=09 +---- +buffer_size: 3 + +getAllTxnFingerprintIDs +---- +[7 8 9] + +# Increase the TxnFingerprintIDBufferCapacity cluster setting +override capacity=12 +---- +TxnFingerprintIDBufferCapacity: 12 + +# Enqueue another txnFingerprintID so the override takes effect +enqueue id=10 +---- +buffer_size: 4 + +# The buffer should look like this now: +show +---- +0 -> 7 +1 -> 8 +2 -> 9 +3 -> 10 +4 -> 0 +5 -> 0 +6 -> 0 +7 -> 0 +8 -> 0 +9 -> 0 +10 -> 0 +11 -> 0 + +# Decrease the TxnFingerprintIDBufferCapacity cluster setting to one below the +# size of the buffer +override capacity=3 +---- +TxnFingerprintIDBufferCapacity: 3 + +# Enqueue another id to overwrite the 0th txnFingerprintID +enqueue id=11 +---- +buffer_size: 3 + +# The buffer should look like this now: +show +---- +0 -> 11 +1 -> 8 +2 -> 9 diff --git a/pkg/sql/txn_fingerprint_id_buffer.go b/pkg/sql/txn_fingerprint_id_buffer.go new file mode 100644 index 000000000000..0c54518b9e02 --- /dev/null +++ b/pkg/sql/txn_fingerprint_id_buffer.go @@ -0,0 +1,182 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// TxnFingerprintIDBufferCapacity is the cluster setting that controls the +// capacity of the txn fingerprint ID circular buffer. +var TxnFingerprintIDBufferCapacity = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.txn_fingerprint_id_buffer.capacity", + "the maximum number of txn fingerprint IDs stored", + 100, +).WithPublic() + +// TxnFingerprintIDBuffer is a thread-safe circular buffer tracking transaction +// fingerprint IDs at the session level. +type TxnFingerprintIDBuffer struct { + st *cluster.Settings + + mu struct { + syncutil.RWMutex + data []roachpb.TransactionFingerprintID + acc mon.BoundAccount + size int + capacity int64 + readPosition int + writePosition int + } + + mon *mon.BytesMonitor +} + +// NewTxnFingerprintIDBuffer returns a new TxnFingerprintIDBuffer. +func NewTxnFingerprintIDBuffer( + st *cluster.Settings, parentMon *mon.BytesMonitor, +) *TxnFingerprintIDBuffer { + b := &TxnFingerprintIDBuffer{st: st} + b.initializeBufferLocked(TxnFingerprintIDBufferCapacity.Get(&st.SV)) + + monitor := mon.NewMonitorInheritWithLimit("txn-fingerprint-id-buffer", 0 /* limit */, parentMon) + b.mu.acc = monitor.MakeBoundAccount() + b.mon = monitor + b.mon.Start(context.Background(), parentMon, mon.BoundAccount{}) + + return b +} + +// initializeBufferLocked initializes a new buffer with the given capacity. +func (b *TxnFingerprintIDBuffer) initializeBufferLocked(capacity int64) { + b.mu.capacity = capacity + b.mu.data = make([]roachpb.TransactionFingerprintID, capacity) + for i := range b.mu.data { + b.mu.data[i] = roachpb.InvalidTransactionFingerprintID + } +} + +// Enqueue adds a TxnFingerprintID to the circular buffer. +func (b *TxnFingerprintIDBuffer) Enqueue(value roachpb.TransactionFingerprintID) error { + b.mu.Lock() + defer b.mu.Unlock() + + b.checkForCapacityLocked() + if b.mu.size >= int(b.mu.capacity) { + b.dequeueLocked() + } + + if b.mu.writePosition >= int(b.mu.capacity) { + b.mu.writePosition = 0 + } + + size := value.Size() + err := b.mu.acc.Grow(context.Background(), size) + if err != nil { + return err + } + + b.mu.data[b.mu.writePosition] = value + b.mu.writePosition++ + b.mu.size++ + + return nil +} + +// checkForCapacityLocked checks if the TxnFingerprintIDBufferCapacity cluster +// setting has been updated. +func (b *TxnFingerprintIDBuffer) checkForCapacityLocked() { + capacityClusterSetting := TxnFingerprintIDBufferCapacity.Get(&b.st.SV) + if b.mu.capacity != capacityClusterSetting { + b.updateCapacityLocked(capacityClusterSetting) + } +} + +// updateCapacityLocked updates the capacity of the circular buffer and moves +// the data into a new slice with that capacity. +func (b *TxnFingerprintIDBuffer) updateCapacityLocked(newCapacity int64) { + newData := make([]roachpb.TransactionFingerprintID, newCapacity) + oldData := b.mu.data + + ptr := 0 + b.mu.size = 0 + b.initializeBufferLocked(newCapacity) + for _, txnFingerprintID := range oldData { + if txnFingerprintID != roachpb.InvalidTransactionFingerprintID { + if ptr >= int(newCapacity) { + break + } + newData[ptr] = txnFingerprintID + b.mu.size++ + ptr++ + } + } + + b.mu.data = newData + b.mu.readPosition = 0 + b.mu.writePosition = b.mu.size +} + +// GetAllTxnFingerprintIDs returns a slice of all TxnFingerprintIDs in the +// circular buffer. +func (b *TxnFingerprintIDBuffer) GetAllTxnFingerprintIDs() []roachpb.TransactionFingerprintID { + b.mu.Lock() + defer b.mu.Unlock() + + var txnFingerprintIDs []roachpb.TransactionFingerprintID + if b.mu.data[b.mu.readPosition] != roachpb.InvalidTransactionFingerprintID { + txnFingerprintIDs = append(txnFingerprintIDs, b.mu.data[b.mu.readPosition]) + } + + ptr := b.mu.readPosition + 1 + for ptr != b.mu.readPosition { + if b.mu.data[ptr] != roachpb.InvalidTransactionFingerprintID { + txnFingerprintIDs = append(txnFingerprintIDs, b.mu.data[ptr]) + } + ptr = (ptr + 1) % int(b.mu.capacity) + } + + return txnFingerprintIDs +} + +// dequeue returns the oldest transaction fingerprint ID +func (b *TxnFingerprintIDBuffer) dequeue() roachpb.TransactionFingerprintID { + b.mu.Lock() + defer b.mu.Unlock() + + return b.dequeueLocked() +} + +func (b *TxnFingerprintIDBuffer) dequeueLocked() roachpb.TransactionFingerprintID { + txnFingerprintID := b.mu.data[b.mu.readPosition] + b.mu.data[b.mu.readPosition] = roachpb.InvalidTransactionFingerprintID + + size := txnFingerprintID.Size() + b.mu.acc.Shrink(context.Background(), size) + b.mu.size-- + b.mu.readPosition++ + + return txnFingerprintID +} + +func (b *TxnFingerprintIDBuffer) size() int { + b.mu.Lock() + defer b.mu.Unlock() + + return b.mu.size +} diff --git a/pkg/sql/txn_fingerprint_id_buffer_test.go b/pkg/sql/txn_fingerprint_id_buffer_test.go new file mode 100644 index 000000000000..36f18a25b196 --- /dev/null +++ b/pkg/sql/txn_fingerprint_id_buffer_test.go @@ -0,0 +1,104 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "fmt" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/stretchr/testify/require" + "math" + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/datadriven" +) + +func TestTxnFingerprintIDBuffer(t *testing.T) { + defer leaktest.AfterTest(t)() + var txnFingerprintIDBuffer *TxnFingerprintIDBuffer + + datadriven.Walk(t, testutils.TestDataPath(t, "txn_fingerprint_id_buffer"), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + ctx := context.Background() + switch d.Cmd { + case "init": + var capacity int + d.ScanArgs(t, "capacity", &capacity) + + st := &cluster.Settings{} + monitor := mon.NewUnlimitedMonitor( + ctx, + "test", + mon.MemoryResource, + nil, /* currCount */ + nil, /* maxHist */ + math.MaxInt64, + st, + ) + txnFingerprintIDBuffer = NewTxnFingerprintIDBuffer(st, monitor) + + TxnFingerprintIDBufferCapacity.Override(ctx, &st.SV, int64(capacity)) + + return fmt.Sprintf("buffer_size: %d", txnFingerprintIDBuffer.size()) + case "enqueue": + var idStr string + d.ScanArgs(t, "id", &idStr) + + id, err := strconv.ParseUint(idStr, 10, 64) + require.NoError(t, err) + txnFingerprintID := roachpb.TransactionFingerprintID(id) + + err = txnFingerprintIDBuffer.Enqueue(txnFingerprintID) + require.NoError(t, err) + + return fmt.Sprintf("buffer_size: %d", txnFingerprintIDBuffer.size()) + case "dequeue": + txnFingerprintID := txnFingerprintIDBuffer.dequeue() + return fmt.Sprintf("txnFingerprintID: %d", txnFingerprintID) + case "override": + var capacity int + d.ScanArgs(t, "capacity", &capacity) + TxnFingerprintIDBufferCapacity.Override(ctx, &txnFingerprintIDBuffer.st.SV, int64(capacity)) + capacityClusterSetting := TxnFingerprintIDBufferCapacity.Get(&txnFingerprintIDBuffer.st.SV) + return fmt.Sprintf("TxnFingerprintIDBufferCapacity: %d", capacityClusterSetting) + case "show": + return printTxnFingerprintIDBuffer(txnFingerprintIDBuffer) + case "getAllTxnFingerprintIDs": + txnFingerprintIDs := txnFingerprintIDBuffer.GetAllTxnFingerprintIDs() + + return fmt.Sprintf("%d\n", txnFingerprintIDs) + } + return "" + + }) + }) +} + +func printTxnFingerprintIDBuffer(buffer *TxnFingerprintIDBuffer) string { + buffer.mu.Lock() + defer buffer.mu.Unlock() + + var result []string + for i, txnFingerprintID := range buffer.mu.data { + result = append(result, fmt.Sprintf("%d -> %d", i, txnFingerprintID)) + } + if len(result) == 0 { + return "empty" + } + + return strings.Join(result, "\n") +} diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts index 9ab8b2746f8e..e16010eb3091 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts @@ -47,6 +47,8 @@ export const idleSession: SessionInfo = { alloc_bytes: Long.fromNumber(0), max_alloc_bytes: Long.fromNumber(10240), active_queries: [], + num_txns_executed: 1, + txnFingerprintIDs: [], status: Status.IDLE, toJSON: () => ({}), }, @@ -85,6 +87,8 @@ export const idleTransactionSession: SessionInfo = { }, last_active_query_no_constants: "SHOW database", active_queries: [], + num_txns_executed: 1, + txnFingerprintIDs: [], status: Status.IDLE, toJSON: () => ({}), }, @@ -138,6 +142,8 @@ export const activeSession: SessionInfo = { }, last_active_query_no_constants: "SHOW database", status: Status.ACTIVE, + num_txns_executed: 1, + txnFingerprintIDs: [], toJSON: () => ({}), }, }; @@ -163,6 +169,8 @@ export const closedSession: SessionInfo = { nanos: 369989000, }, status: Status.CLOSED, + num_txns_executed: 1, + txnFingerprintIDs: [], toJSON: () => ({}), }, };