diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md
index 688ad0eae957..f79716ca0fcc 100644
--- a/docs/generated/http/full.md
+++ b/docs/generated/http/full.md
@@ -2111,6 +2111,9 @@ Session represents one SQL session.
| last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) |
| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) |
| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) |
+| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Count of the number of transactions that have been opened on this session. This count includes transactions that are in progress. | [reserved](#support-status) |
+| txn_fingerprint_ids | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) |
+| total_active_time | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) |
@@ -2245,6 +2248,9 @@ Session represents one SQL session.
| last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) |
| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) |
| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) |
+| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Count of the number of transactions that have been opened on this session. This count includes transactions that are in progress. | [reserved](#support-status) |
+| txn_fingerprint_ids | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) |
+| total_active_time | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) |
diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index f80571cb0ee0..d2b693b7a30c 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -274,6 +274,7 @@ sql.ttl.default_range_concurrency integer 1 default amount of ranges to process
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job
sql.ttl.job.enabled boolean true whether the TTL job is enabled
sql.ttl.range_batch_size integer 100 amount of ranges to fetch at a time for a table during the TTL job
+sql.txn_fingerprint_id_cache.capacity integer 100 the maximum number of txn fingerprint IDs stored
timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere
timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.
timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 22bcdcdc5f2a..599f6c5714ff 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -205,6 +205,7 @@
sql.ttl.default_select_batch_size | integer | 500 | default amount of rows to select in a single query during a TTL job |
sql.ttl.job.enabled | boolean | true | whether the TTL job is enabled |
sql.ttl.range_batch_size | integer | 100 | amount of ranges to fetch at a time for a table during the TTL job |
+sql.txn_fingerprint_id_cache.capacity | integer | 100 | the maximum number of txn fingerprint IDs stored |
timeseries.storage.enabled | boolean | true | if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere |
timeseries.storage.resolution_10s.ttl | duration | 240h0m0s | the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. |
timeseries.storage.resolution_30m.ttl | duration | 2160h0m0s | the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. |
diff --git a/docs/generated/swagger/spec.json b/docs/generated/swagger/spec.json
index 69ffddbb0389..1c7ffa572db3 100644
--- a/docs/generated/swagger/spec.json
+++ b/docs/generated/swagger/spec.json
@@ -768,6 +768,12 @@
},
"x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb"
},
+ "Duration": {
+ "description": "A Duration represents the elapsed time between two instants\nas an int64 nanosecond count. The representation limits the\nlargest representable duration to approximately 290 years.",
+ "type": "integer",
+ "format": "int64",
+ "x-go-package": "time"
+ },
"EventsResponse": {
"description": "EventsResponse contains a set of event log entries. This is always limited\nto the latest N entries (N is enforced in the associated endpoint).",
"type": "object",
@@ -1220,6 +1226,12 @@
"node_id": {
"$ref": "#/definitions/NodeID"
},
+ "num_txns_executed": {
+ "description": "Count of the number of transactions that have been opened on this session.\nThis count includes transactions that are in progress.",
+ "type": "integer",
+ "format": "int32",
+ "x-go-name": "NumTxnsExecuted"
+ },
"start": {
"description": "Timestamp of session's start.",
"type": "string",
@@ -1229,6 +1241,17 @@
"status": {
"$ref": "#/definitions/Session_Status"
},
+ "total_active_time": {
+ "$ref": "#/definitions/Duration"
+ },
+ "txn_fingerprint_ids": {
+ "description": "List of transaction fingerprint IDs in this session.",
+ "type": "array",
+ "items": {
+ "$ref": "#/definitions/TransactionFingerprintID"
+ },
+ "x-go-name": "TxnFingerprintIDs"
+ },
"username": {
"description": "Username of the user for this session.",
"type": "string",
@@ -1493,6 +1516,12 @@
},
"x-go-package": "github.com/cockroachdb/cockroach/pkg/util/hlc"
},
+ "TransactionFingerprintID": {
+ "description": "TransactionFingerprintID is the hashed string constructed using the\nindividual statement fingerprint IDs that comprise the transaction.",
+ "type": "integer",
+ "format": "uint64",
+ "x-go-package": "github.com/cockroachdb/cockroach/pkg/roachpb"
+ },
"TxnInfo": {
"type": "object",
"title": "TxnInfo represents an in flight user transaction on some Session.",
diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
index f6801210d3bd..cce558567301 100644
--- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
+++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
@@ -245,15 +245,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0
----
id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries
-query ITTTTTTTTTTTTT colnames
+query ITTTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0
----
-node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
+node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
-query ITTTTTTTTTTTTT colnames
+query ITTTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0
----
-node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
+node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
query IIITTTI colnames
SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0
diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto
index 180868b3df19..9e4ebf63faa8 100644
--- a/pkg/server/serverpb/status.proto
+++ b/pkg/server/serverpb/status.proto
@@ -941,9 +941,11 @@ message Session {
// Information about the txn in progress on this session. Nil if the
// session doesn't currently have a transaction.
TxnInfo active_txn = 12;
+
// The SQL statement fingerprint of the last query executed on this session,
// compatible with StatementStatisticsKey.
string last_active_query_no_constants = 13;
+
// Enum for sessions status.
enum Status {
ACTIVE = 0;
@@ -952,9 +954,23 @@ message Session {
}
// The session's status.
Status status = 14;
+
// Timestamp of session's end.
google.protobuf.Timestamp end = 15
[ (gogoproto.nullable) = true, (gogoproto.stdtime) = true ];
+
+ // Count of the number of transactions that have been opened on this session.
+ // This count includes transactions that are in progress.
+ int32 num_txns_executed = 16;
+
+ // List of transaction fingerprint IDs in this session.
+ repeated uint64 txn_fingerprint_ids = 17 [(gogoproto.customname) = "TxnFingerprintIDs",
+ (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID",
+ (gogoproto.nullable) = false];
+
+ // The session's total active time.
+ google.protobuf.Duration total_active_time = 18 [(gogoproto.nullable) = false,
+ (gogoproto.stdduration) = true];
}
// An error wrapper object for ListSessionsResponse.
diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel
index b3791deaef24..71ebc1878278 100644
--- a/pkg/sql/BUILD.bazel
+++ b/pkg/sql/BUILD.bazel
@@ -226,6 +226,7 @@ go_library(
"testutils.go",
"topk.go",
"truncate.go",
+ "txn_fingerprint_id_cache.go",
"txn_state.go",
"type_change.go",
"unary.go",
@@ -581,6 +582,7 @@ go_test(
"temporary_schema_test.go",
"tenant_test.go",
"trace_test.go",
+ "txn_fingerprint_id_cache_test.go",
"txn_restart_test.go",
"txn_state_test.go",
"type_change_test.go",
diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go
index 2a6dbaf4ee05..50f6a11a93d7 100644
--- a/pkg/sql/conn_executor.go
+++ b/pkg/sql/conn_executor.go
@@ -886,6 +886,8 @@ func (s *Server) newConnExecutor(
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
indexUsageStats: s.indexUsageStats,
txnIDCacheWriter: s.txnIDCache,
+ totalActiveTimeStopWatch: timeutil.NewStopWatch(),
+ txnFingerprintIDCache: NewTxnFingerprintIDCache(s.cfg.Settings, sessionRootMon),
}
ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
@@ -1500,6 +1502,14 @@ type connExecutor struct {
// txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the
// Transaction ID Cache.
txnIDCacheWriter txnidcache.Writer
+
+ // txnFingerprintIDCache is used to track the most recent
+ // txnFingerprintIDs executed in this session.
+ txnFingerprintIDCache *TxnFingerprintIDCache
+
+ // totalActiveTimeStopWatch tracks the total active time of the session.
+ // This is defined as the time spent executing transactions and statements.
+ totalActiveTimeStopWatch *timeutil.StopWatch
}
// ctxHolder contains a connection's context and, while session tracing is
@@ -2890,6 +2900,10 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
// Start of the transaction, so no statements were executed earlier.
// Bump the txn counter for logging.
ex.extraTxnState.txnCounter++
+
+ // Session is considred active when executing a transaction.
+ ex.totalActiveTimeStopWatch.Start()
+
if !ex.server.cfg.Codec.ForSystemTenant() {
// Update the leased descriptor collection with the current sqlliveness.Session.
// This is required in the multi-tenant environment to update the transaction
@@ -3133,6 +3147,12 @@ func (ex *connExecutor) serialize() serverpb.Session {
remoteStr = sd.RemoteAddr.String()
}
+ txnFingerprintIDs := ex.txnFingerprintIDCache.GetAllTxnFingerprintIDs()
+ sessionActiveTime := ex.totalActiveTimeStopWatch.Elapsed()
+ if started, startedAt := ex.totalActiveTimeStopWatch.StartedAt(); started {
+ sessionActiveTime = time.Duration(sessionActiveTime.Nanoseconds() + timeutil.Since(startedAt).Nanoseconds())
+ }
+
return serverpb.Session{
Username: sd.SessionUser().Normalized(),
ClientAddress: remoteStr,
@@ -3140,12 +3160,15 @@ func (ex *connExecutor) serialize() serverpb.Session {
Start: ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionInit).UTC(),
ActiveQueries: activeQueries,
ActiveTxn: activeTxnInfo,
+ NumTxnsExecuted: int32(ex.extraTxnState.txnCounter),
+ TxnFingerprintIDs: txnFingerprintIDs,
LastActiveQuery: lastActiveQuery,
ID: ex.sessionID.GetBytes(),
AllocBytes: ex.mon.AllocBytes(),
MaxAllocBytes: ex.mon.MaximumBytes(),
LastActiveQueryNoConstants: lastActiveQueryNoConstants,
Status: status,
+ TotalActiveTime: sessionActiveTime,
}
}
diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go
index c4833c8c936b..b90de30df038 100644
--- a/pkg/sql/conn_executor_exec.go
+++ b/pkg/sql/conn_executor_exec.go
@@ -2105,6 +2105,14 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndExecTransaction, timeutil.Now())
transactionFingerprintID :=
roachpb.TransactionFingerprintID(ex.extraTxnState.transactionStatementsHash.Sum())
+
+ err := ex.txnFingerprintIDCache.Add(transactionFingerprintID)
+ if err != nil {
+ if log.V(1) {
+ log.Warningf(ctx, "failed to enqueue transactionFingerprintID = %d: %s", transactionFingerprintID, err)
+ }
+ }
+
if !implicit {
ex.statsCollector.EndExplicitTransaction(
ctx,
@@ -2118,7 +2126,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
transactionFingerprintID,
)
}
- err := ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart)
+ err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to record transaction stats: %s", err)
@@ -2214,6 +2222,7 @@ func (ex *connExecutor) recordTransactionFinish(
txnEnd := timeutil.Now()
txnTime := txnEnd.Sub(txnStart)
+ ex.totalActiveTimeStopWatch.Stop()
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
}
diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go
index e9dc940b7529..c8ca11ccc409 100644
--- a/pkg/sql/conn_executor_test.go
+++ b/pkg/sql/conn_executor_test.go
@@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
+ "github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
@@ -1571,6 +1572,94 @@ func TestEmptyTxnIsBeingCorrectlyCounted(t *testing.T) {
"after executing empty transactions, but it was not")
}
+//TestSessionTotalActiveTime tests that a session's total active time is
+//correctly being recorded as transactions are executed.
+func TestSessionTotalActiveTime(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ ctx := context.Background()
+ params := base.TestServerArgs{}
+ s, mainDB, _ := serverutils.StartServer(t, params)
+ defer s.Stopper().Stop(ctx)
+
+ _, err := mainDB.Exec(fmt.Sprintf("CREATE USER %s", username.TestUser))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pgURL, cleanupDB := sqlutils.PGUrl(
+ t, s.ServingSQLAddr(), "TestSessionTotalActiveTime", url.User(username.TestUser))
+ defer cleanupDB()
+ rawSQL, err := gosql.Open("postgres", pgURL.String())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ defer func() {
+ err := rawSQL.Close()
+ if err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ getSessionWithTestUser := func() *serverpb.Session {
+ sessions := s.SQLServer().(*sql.Server).GetExecutorConfig().SessionRegistry.SerializeAll()
+ for _, s := range sessions {
+ if s.Username == username.TestUser {
+ return &s
+ }
+ }
+ t.Fatalf("expected session with username %s", username.TestUser)
+ return nil
+ }
+
+ sqlDB := sqlutils.MakeSQLRunner(rawSQL)
+ sqlDB.Exec(t, "SELECT 1")
+ session := getSessionWithTestUser()
+ activeTimeNanos := session.TotalActiveTime.Nanoseconds()
+
+ // We will execute different types of transactions.
+ // After each execution, verify the total active time has increased, but is no
+ // longer increasing after the transaction has completed.
+ testCases := []struct {
+ Query string
+ // SessionActiveAfterExecution signifies that the active time should still be active after this query.
+ SessionActiveAfterExecution bool
+ }{
+ {"SELECT 1", false},
+ // Test explicit transaction.
+ {"BEGIN", true},
+ {"SELECT 1", true},
+ {"SELECT 1, 2", true},
+ {"COMMIT", false},
+ {"BEGIN", true},
+ {"SELECT crdb_internal.force_retry('1s')", true},
+ {"COMMIT", false},
+ }
+
+ for _, tc := range testCases {
+ sqlDB.Exec(t, tc.Query)
+ if tc.Query == "crdb_internal.force_retry('1s'" {
+ continue
+ }
+ // Check that the total active time has increased.
+ session = getSessionWithTestUser()
+ require.Greater(t, session.TotalActiveTime.Nanoseconds(), activeTimeNanos)
+
+ activeTimeNanos = session.TotalActiveTime.Nanoseconds()
+ session = getSessionWithTestUser()
+
+ if tc.SessionActiveAfterExecution {
+ require.Greater(t, session.TotalActiveTime.Nanoseconds(), activeTimeNanos)
+ } else {
+ require.Equal(t, activeTimeNanos, session.TotalActiveTime.Nanoseconds())
+ }
+
+ activeTimeNanos = session.TotalActiveTime.Nanoseconds()
+ }
+}
+
// dynamicRequestFilter exposes a filter method which is a
// kvserverbase.ReplicaRequestFilter but can be set dynamically.
type dynamicRequestFilter struct {
diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go
index 8acc4b71d34e..2f422b885fa0 100644
--- a/pkg/sql/crdb_internal.go
+++ b/pkg/sql/crdb_internal.go
@@ -1619,7 +1619,7 @@ CREATE TABLE crdb_internal.%s (
node_id INT, -- the ID of the node running the transaction
session_id STRING, -- the ID of the session
start TIMESTAMP, -- the start time of the transaction
- txn_string STRING, -- the string representation of the transcation
+ txn_string STRING, -- the string representation of the transaction
application_name STRING, -- the name of the application as per SET application_name
num_stmts INT, -- the number of statements executed so far
num_retries INT, -- the number of times the transaction was restarted
@@ -1898,6 +1898,7 @@ CREATE TABLE crdb_internal.%s (
application_name STRING, -- the name of the application as per SET application_name
active_queries STRING, -- the currently running queries as SQL
last_active_query STRING, -- the query that finished last on this session as SQL
+ num_txns_executed INT, -- the number of transactions that were executed so far on this session
session_start TIMESTAMP, -- the time when the session was opened
oldest_query_start TIMESTAMP, -- the time when the oldest query in the session was started
kv_txn STRING, -- the ID of the current KV transaction
@@ -1999,6 +2000,7 @@ func populateSessionsTable(
tree.NewDString(session.ApplicationName),
tree.NewDString(activeQueries.String()),
tree.NewDString(session.LastActiveQuery),
+ tree.NewDInt(tree.DInt(session.NumTxnsExecuted)),
startTSDatum,
oldestStartDatum,
kvTxnIDDatum,
@@ -2024,6 +2026,7 @@ func populateSessionsTable(
tree.DNull, // application name
tree.NewDString("-- "+rpcErr.Message), // active queries
tree.DNull, // last active query
+ tree.DNull, // num txns executed
tree.DNull, // session start
tree.DNull, // oldest_query_start
tree.DNull, // kv_txn
diff --git a/pkg/sql/delegate/show_sessions.go b/pkg/sql/delegate/show_sessions.go
index f4adcc97f7f8..2a31c1181074 100644
--- a/pkg/sql/delegate/show_sessions.go
+++ b/pkg/sql/delegate/show_sessions.go
@@ -16,7 +16,7 @@ import (
)
func (d *delegator) delegateShowSessions(n *tree.ShowSessions) (tree.Statement, error) {
- const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.`
+ const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, num_txns_executed, session_start, oldest_query_start FROM crdb_internal.`
table := `node_sessions`
if n.Cluster {
table = `cluster_sessions`
diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal
index ee3c90c0204c..ae2ea6a2bafe 100644
--- a/pkg/sql/logictest/testdata/logic_test/crdb_internal
+++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal
@@ -365,15 +365,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0
----
id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries
-query ITTTTTTTTTTTTT colnames
+query ITTTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0
----
-node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
+node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
-query ITTTTTTTTTTTTT colnames
+query ITTTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0
----
-node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
+node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
query IIITTTI colnames
SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0
diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements
index 3a102fda525c..628ecd43f037 100644
--- a/pkg/sql/logictest/testdata/logic_test/create_statements
+++ b/pkg/sql/logictest/testdata/logic_test/create_statements
@@ -324,6 +324,7 @@ CREATE TABLE crdb_internal.cluster_sessions (
application_name STRING NULL,
active_queries STRING NULL,
last_active_query STRING NULL,
+ num_txns_executed INT8 NULL,
session_start TIMESTAMP NULL,
oldest_query_start TIMESTAMP NULL,
kv_txn STRING NULL,
@@ -339,6 +340,7 @@ CREATE TABLE crdb_internal.cluster_sessions (
application_name STRING NULL,
active_queries STRING NULL,
last_active_query STRING NULL,
+ num_txns_executed INT8 NULL,
session_start TIMESTAMP NULL,
oldest_query_start TIMESTAMP NULL,
kv_txn STRING NULL,
@@ -958,6 +960,7 @@ CREATE TABLE crdb_internal.node_sessions (
application_name STRING NULL,
active_queries STRING NULL,
last_active_query STRING NULL,
+ num_txns_executed INT8 NULL,
session_start TIMESTAMP NULL,
oldest_query_start TIMESTAMP NULL,
kv_txn STRING NULL,
@@ -973,6 +976,7 @@ CREATE TABLE crdb_internal.node_sessions (
application_name STRING NULL,
active_queries STRING NULL,
last_active_query STRING NULL,
+ num_txns_executed INT8 NULL,
session_start TIMESTAMP NULL,
oldest_query_start TIMESTAMP NULL,
kv_txn STRING NULL,
diff --git a/pkg/sql/testdata/txn_fingerprint_id_cache b/pkg/sql/testdata/txn_fingerprint_id_cache
new file mode 100644
index 000000000000..533be5d3e1bb
--- /dev/null
+++ b/pkg/sql/testdata/txn_fingerprint_id_cache
@@ -0,0 +1,77 @@
+# Initialize TxnFingerprintIDCache
+init capacity=10
+----
+size: 0
+
+# Add four TxnFingerprintIDs: 5, 6, 7, 8
+enqueue id=05
+----
+size: 1
+
+enqueue id=06
+----
+size: 2
+
+enqueue id=07
+----
+size: 3
+
+enqueue id=08
+----
+size: 4
+
+# There should be 4 valid TxnFingerprintIDs
+show
+----
+[8 7 6 5]
+
+enqueue id=09
+----
+size: 5
+
+enqueue id=10
+----
+size: 6
+
+show
+----
+[10 9 8 7 6 5]
+
+# Decrease the TxnFingerprintIDCacheCapacity cluster setting to below current size.
+override capacity=3
+----
+TxnFingerprintIDCacheCapacity: 3
+
+# Enqueue another id to
+enqueue id=11
+----
+size: 3
+
+# The cache should have the most recent 3 insertions.
+show
+----
+[11 10 9]
+
+# Check that retrieving IDs also properly truncates the cache when the capacity has
+# been changed.
+# Increase capacity back up to 5, insert some values, then decrease capacity to 2 and
+# retrieve all ids.
+override capacity=5
+----
+TxnFingerprintIDCacheCapacity: 5
+
+enqueue id=12
+----
+size: 4
+
+enqueue id=13
+----
+size: 5
+
+override capacity=2
+----
+TxnFingerprintIDCacheCapacity: 2
+
+show
+----
+[13 12]
diff --git a/pkg/sql/txn_fingerprint_id_cache.go b/pkg/sql/txn_fingerprint_id_cache.go
new file mode 100644
index 000000000000..f5341e01d929
--- /dev/null
+++ b/pkg/sql/txn_fingerprint_id_cache.go
@@ -0,0 +1,129 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package sql
+
+import (
+ "context"
+
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/settings"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/util/cache"
+ "github.com/cockroachdb/cockroach/pkg/util/mon"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+)
+
+// TxnFingerprintIDCacheCapacity is the cluster setting that controls the
+// capacity of the txn fingerprint ID cache. The cache will be resized
+// on the next insert or get operation.
+var TxnFingerprintIDCacheCapacity = settings.RegisterIntSetting(
+ settings.TenantWritable,
+ "sql.txn_fingerprint_id_cache.capacity",
+ "the maximum number of txn fingerprint IDs stored",
+ 100,
+ settings.NonNegativeInt,
+).WithPublic()
+
+// TxnFingerprintIDCache is a thread-safe cache tracking transaction
+// fingerprint IDs at the session level.
+type TxnFingerprintIDCache struct {
+ st *cluster.Settings
+
+ mu struct {
+ syncutil.RWMutex
+ acc *mon.BoundAccount
+ cache *cache.UnorderedCache
+ }
+
+ mon *mon.BytesMonitor
+}
+
+// NewTxnFingerprintIDCache returns a new TxnFingerprintIDCache.
+func NewTxnFingerprintIDCache(
+ st *cluster.Settings, parentMon *mon.BytesMonitor,
+) *TxnFingerprintIDCache {
+ b := &TxnFingerprintIDCache{st: st}
+
+ b.mu.cache = cache.NewUnorderedCache(cache.Config{
+ Policy: cache.CacheFIFO,
+ ShouldEvict: func(size int, _, _ interface{}) bool {
+ // Note that because the cache evicts as many elements as possible
+ // when adding an element, the cache will appropriately truncate
+ // when the capacity cluster setting is changed on the addition
+ // of an entry.
+ capacity := TxnFingerprintIDCacheCapacity.Get(&st.SV)
+ return int64(size) > capacity
+ },
+ OnEvictedEntry: func(entry *cache.Entry) {
+ b.mu.acc.Shrink(context.Background(), 1)
+ },
+ })
+
+ monitor := mon.NewMonitorInheritWithLimit("txn-fingerprint-id-cache", 0 /* limit */, parentMon)
+ b.mon = monitor
+ b.mon.Start(context.Background(), parentMon, mon.BoundAccount{})
+
+ return b
+}
+
+// Add adds a TxnFingerprintID to the cache, truncating the cache to the
+func (b *TxnFingerprintIDCache) Add(value roachpb.TransactionFingerprintID) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if err := b.mu.acc.Grow(context.Background(), 1); err != nil {
+ return err
+ }
+
+ b.mu.cache.Add(value, value)
+
+ return nil
+}
+
+// GetAllTxnFingerprintIDs returns a slice of all TxnFingerprintIDs in the cache.
+// The cache may be truncated if the capacity was updated to a smaller size.
+func (b *TxnFingerprintIDCache) GetAllTxnFingerprintIDs() []roachpb.TransactionFingerprintID {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ size := int64(b.mu.cache.Len())
+ capacity := TxnFingerprintIDCacheCapacity.Get(&b.st.SV)
+ if size > capacity {
+ size = capacity
+ }
+
+ txnFingerprintIDs := make([]roachpb.TransactionFingerprintID, 0, size)
+ txnFingerprintIDsRemoved := make([]roachpb.TransactionFingerprintID, 0)
+
+ b.mu.cache.Do(func(entry *cache.Entry) {
+ id := entry.Value.(roachpb.TransactionFingerprintID)
+
+ if int64(len(txnFingerprintIDs)) == size {
+ txnFingerprintIDsRemoved = append(txnFingerprintIDsRemoved, id)
+ return
+ }
+
+ txnFingerprintIDs = append(txnFingerprintIDs, id)
+ })
+
+ for _, id := range txnFingerprintIDsRemoved {
+ b.mu.cache.Del(id)
+ }
+
+ return txnFingerprintIDs
+}
+
+func (b *TxnFingerprintIDCache) size() int {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+
+ return b.mu.cache.Len()
+}
diff --git a/pkg/sql/txn_fingerprint_id_cache_test.go b/pkg/sql/txn_fingerprint_id_cache_test.go
new file mode 100644
index 000000000000..97de2eef0541
--- /dev/null
+++ b/pkg/sql/txn_fingerprint_id_cache_test.go
@@ -0,0 +1,159 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package sql
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "sort"
+ "strconv"
+ "testing"
+
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/server/serverpb"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
+ "github.com/cockroachdb/cockroach/pkg/sql/tests"
+ "github.com/cockroachdb/cockroach/pkg/testutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/mon"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
+ "github.com/cockroachdb/datadriven"
+ "github.com/stretchr/testify/require"
+)
+
+func TestTxnFingerprintIDCacheDataDriven(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ var txnFingerprintIDCache *TxnFingerprintIDCache
+
+ datadriven.Walk(t, testutils.TestDataPath(t, "txn_fingerprint_id_cache"), func(t *testing.T, path string) {
+ datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
+ ctx := context.Background()
+ switch d.Cmd {
+ case "init":
+ var capacity int
+ d.ScanArgs(t, "capacity", &capacity)
+
+ st := &cluster.Settings{}
+ monitor := mon.NewUnlimitedMonitor(
+ ctx,
+ "test",
+ mon.MemoryResource,
+ nil, /* currCount */
+ nil, /* maxHist */
+ math.MaxInt64,
+ st,
+ )
+ txnFingerprintIDCache = NewTxnFingerprintIDCache(st, monitor)
+
+ TxnFingerprintIDCacheCapacity.Override(ctx, &st.SV, int64(capacity))
+
+ return fmt.Sprintf("size: %d", txnFingerprintIDCache.size())
+
+ case "override":
+ var capacity int
+ d.ScanArgs(t, "capacity", &capacity)
+ TxnFingerprintIDCacheCapacity.Override(ctx, &txnFingerprintIDCache.st.SV, int64(capacity))
+ capacityClusterSetting := TxnFingerprintIDCacheCapacity.Get(&txnFingerprintIDCache.st.SV)
+ return fmt.Sprintf("TxnFingerprintIDCacheCapacity: %d", capacityClusterSetting)
+
+ case "enqueue":
+ var idStr string
+ d.ScanArgs(t, "id", &idStr)
+
+ id, err := strconv.ParseUint(idStr, 10, 64)
+ require.NoError(t, err)
+ txnFingerprintID := roachpb.TransactionFingerprintID(id)
+
+ err = txnFingerprintIDCache.Add(txnFingerprintID)
+ require.NoError(t, err)
+
+ return fmt.Sprintf("size: %d", txnFingerprintIDCache.size())
+
+ case "show":
+ return printTxnFingerprintIDCache(txnFingerprintIDCache)
+
+ default:
+ }
+ return ""
+
+ })
+ })
+}
+
+func printTxnFingerprintIDCache(txnFingerprintCache *TxnFingerprintIDCache) string {
+ txnFingerprintIDs := txnFingerprintCache.GetAllTxnFingerprintIDs()
+
+ return fmt.Sprintf("%d", txnFingerprintIDs)
+}
+
+func TestTxnFingerprintIDCache(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ ctx := context.Background()
+
+ txnFingerprintIDsRecorded := make([]roachpb.TransactionFingerprintID, 0)
+ appName := "testTxnFingerprintIDCache"
+
+ params, _ := tests.CreateTestServerParams()
+ params.Knobs.SQLExecutor = &ExecutorTestingKnobs{
+ BeforeTxnStatsRecorded: func(
+ sessionData *sessiondata.SessionData,
+ _ uuid.UUID,
+ txnFingerprintID roachpb.TransactionFingerprintID,
+ ) {
+ if !sessionData.Internal {
+ // Record every query we issue through our sql connection.
+ txnFingerprintIDsRecorded = append(txnFingerprintIDsRecorded, txnFingerprintID)
+ }
+ },
+ }
+
+ testServer, sqlConn, _ := serverutils.StartServer(t, params)
+
+ defer func() {
+ require.NoError(t, sqlConn.Close())
+ testServer.Stopper().Stop(ctx)
+ }()
+
+ testConn := sqlutils.MakeSQLRunner(sqlConn)
+
+ testConn.Exec(t, "SET application_name = $1", appName)
+ testConn.Exec(t, "CREATE TABLE test AS SELECT generate_series(1, 10)")
+ testConn.Exec(t, "SELECT * FROM test")
+ testConn.Exec(t, "BEGIN; SELECT 1; SELECT 1, 2, 3; COMMIT;")
+
+ sessions := testServer.SQLServer().(*Server).GetExecutorConfig().SessionRegistry.SerializeAll()
+
+ var session *serverpb.Session
+ for i, s := range sessions {
+ if s.ApplicationName == appName {
+ session = &sessions[i]
+ break
+ }
+ }
+ require.NotNil(t, session)
+
+ sort.Slice(session.TxnFingerprintIDs, func(i, j int) bool {
+ return session.TxnFingerprintIDs[i] < session.TxnFingerprintIDs[j]
+ })
+
+ sort.Slice(txnFingerprintIDsRecorded, func(i, j int) bool {
+ return txnFingerprintIDsRecorded[i] < txnFingerprintIDsRecorded[j]
+ })
+
+ require.Equal(t, txnFingerprintIDsRecorded, session.TxnFingerprintIDs)
+}
diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts
index bfc7aafa61f0..9b03f7414c13 100644
--- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts
+++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts
@@ -47,6 +47,8 @@ export const idleSession: SessionInfo = {
alloc_bytes: Long.fromNumber(0),
max_alloc_bytes: Long.fromNumber(10240),
active_queries: [],
+ num_txns_executed: 1,
+ txn_fingerprint_ids: [],
status: Status.IDLE,
toJSON: () => ({}),
},
@@ -85,6 +87,8 @@ export const idleTransactionSession: SessionInfo = {
},
last_active_query_no_constants: "SHOW database",
active_queries: [],
+ num_txns_executed: 1,
+ txn_fingerprint_ids: [],
status: Status.IDLE,
toJSON: () => ({}),
},
@@ -138,6 +142,8 @@ export const activeSession: SessionInfo = {
},
last_active_query_no_constants: "SHOW database",
status: Status.ACTIVE,
+ num_txns_executed: 1,
+ txn_fingerprint_ids: [],
toJSON: () => ({}),
},
};
@@ -163,6 +169,8 @@ export const closedSession: SessionInfo = {
nanos: 369989000,
},
status: Status.CLOSED,
+ num_txns_executed: 1,
+ txn_fingerprint_ids: [],
toJSON: () => ({}),
},
};
diff --git a/pkg/util/timeutil/stopwatch.go b/pkg/util/timeutil/stopwatch.go
index ed3bf152b517..78b0f26d7709 100644
--- a/pkg/util/timeutil/stopwatch.go
+++ b/pkg/util/timeutil/stopwatch.go
@@ -81,6 +81,15 @@ func (w *StopWatch) Elapsed() time.Duration {
return w.mu.elapsed
}
+// StartedAt returns a bool indicating if the stopwatch has started, and the
+// time at which the stopwatch was started. If the stopwatch is stopped,
+// the time returned is the last time the stopwatch was started.
+func (w *StopWatch) StartedAt() (started bool, startedAt time.Time) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ return w.mu.started, w.mu.startedAt
+}
+
// TestTimeSource is a source of time that remembers when it was created (in
// terms of the real time) and returns the time based on its creation time and
// the number of "advances" it has had. It is used for testing only.