From 60160c8cf77957142f65f824e6099701704d48d6 Mon Sep 17 00:00:00 2001 From: Gerardo Torres Date: Thu, 28 Apr 2022 11:36:31 -0400 Subject: [PATCH] server, sql: surface session txnCount, txn fingerprints, active time Partially addresses #74257. Previously, the status server did not provide session details such as total number of transactions executed, transaction fingerprint IDs, and total active time. This change adds the aforementioned session details to the `serverpb.Session` struct. To track recently executed transaction fingerprint IDs, a FIFO cache `TxnFingerprintIDCache` is introduced with its corresponding cluster setting `TxnFingerprintIDBufferCapacity` to control the capacity. The default capacity is set at 100 fingerprints. The total number of transactions executed is filled using the existing `txnCounter` from the `extraTxnState` in `connExecutor`. The total active time is calculated by introducing a `timeutil.StopWatch` to the connection executor, which is started and stopped when a transaction is started and finished respectively. Release note (api change): the `serverpb.Session` struct now has three new fields: number of transactions executed, transaction fingerprint IDs, and total active time. --- docs/generated/http/full.md | 6 + .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + docs/generated/swagger/spec.json | 29 ++++ .../testdata/logic_test/crdb_internal_tenant | 8 +- pkg/server/serverpb/status.proto | 16 ++ pkg/sql/BUILD.bazel | 2 + pkg/sql/conn_executor.go | 23 +++ pkg/sql/conn_executor_exec.go | 11 +- pkg/sql/conn_executor_test.go | 89 ++++++++++ pkg/sql/crdb_internal.go | 5 +- pkg/sql/delegate/show_sessions.go | 2 +- .../testdata/logic_test/crdb_internal | 8 +- .../testdata/logic_test/create_statements | 4 + pkg/sql/testdata/txn_fingerprint_id_cache | 77 +++++++++ pkg/sql/txn_fingerprint_id_cache.go | 129 ++++++++++++++ pkg/sql/txn_fingerprint_id_cache_test.go | 158 ++++++++++++++++++ .../src/sessions/sessionsPage.fixture.ts | 8 + pkg/util/timeutil/stopwatch.go | 9 + 19 files changed, 575 insertions(+), 11 deletions(-) create mode 100644 pkg/sql/testdata/txn_fingerprint_id_cache create mode 100644 pkg/sql/txn_fingerprint_id_cache.go create mode 100644 pkg/sql/txn_fingerprint_id_cache_test.go diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 688ad0eae957..f79716ca0fcc 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -2111,6 +2111,9 @@ Session represents one SQL session. | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | | status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | | end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | +| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Count of the number of transactions that have been opened on this session. This count includes transactions that are in progress. | [reserved](#support-status) | +| txn_fingerprint_ids | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) | +| total_active_time | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) | @@ -2245,6 +2248,9 @@ Session represents one SQL session. | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | | status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | | end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | +| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Count of the number of transactions that have been opened on this session. This count includes transactions that are in progress. | [reserved](#support-status) | +| txn_fingerprint_ids | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) | +| total_active_time | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) | diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index f80571cb0ee0..d2b693b7a30c 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -274,6 +274,7 @@ sql.ttl.default_range_concurrency integer 1 default amount of ranges to process sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job sql.ttl.job.enabled boolean true whether the TTL job is enabled sql.ttl.range_batch_size integer 100 amount of ranges to fetch at a time for a table during the TTL job +sql.txn_fingerprint_id_cache.capacity integer 100 the maximum number of txn fingerprint IDs stored timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 22bcdcdc5f2a..599f6c5714ff 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -205,6 +205,7 @@ sql.ttl.default_select_batch_sizeinteger500default amount of rows to select in a single query during a TTL job sql.ttl.job.enabledbooleantruewhether the TTL job is enabled sql.ttl.range_batch_sizeinteger100amount of ranges to fetch at a time for a table during the TTL job +sql.txn_fingerprint_id_cache.capacityinteger100the maximum number of txn fingerprint IDs stored timeseries.storage.enabledbooleantrueif set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere timeseries.storage.resolution_10s.ttlduration240h0m0sthe maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. timeseries.storage.resolution_30m.ttlduration2160h0m0sthe maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. diff --git a/docs/generated/swagger/spec.json b/docs/generated/swagger/spec.json index 69ffddbb0389..1c7ffa572db3 100644 --- a/docs/generated/swagger/spec.json +++ b/docs/generated/swagger/spec.json @@ -768,6 +768,12 @@ }, "x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb" }, + "Duration": { + "description": "A Duration represents the elapsed time between two instants\nas an int64 nanosecond count. The representation limits the\nlargest representable duration to approximately 290 years.", + "type": "integer", + "format": "int64", + "x-go-package": "time" + }, "EventsResponse": { "description": "EventsResponse contains a set of event log entries. This is always limited\nto the latest N entries (N is enforced in the associated endpoint).", "type": "object", @@ -1220,6 +1226,12 @@ "node_id": { "$ref": "#/definitions/NodeID" }, + "num_txns_executed": { + "description": "Count of the number of transactions that have been opened on this session.\nThis count includes transactions that are in progress.", + "type": "integer", + "format": "int32", + "x-go-name": "NumTxnsExecuted" + }, "start": { "description": "Timestamp of session's start.", "type": "string", @@ -1229,6 +1241,17 @@ "status": { "$ref": "#/definitions/Session_Status" }, + "total_active_time": { + "$ref": "#/definitions/Duration" + }, + "txn_fingerprint_ids": { + "description": "List of transaction fingerprint IDs in this session.", + "type": "array", + "items": { + "$ref": "#/definitions/TransactionFingerprintID" + }, + "x-go-name": "TxnFingerprintIDs" + }, "username": { "description": "Username of the user for this session.", "type": "string", @@ -1493,6 +1516,12 @@ }, "x-go-package": "github.com/cockroachdb/cockroach/pkg/util/hlc" }, + "TransactionFingerprintID": { + "description": "TransactionFingerprintID is the hashed string constructed using the\nindividual statement fingerprint IDs that comprise the transaction.", + "type": "integer", + "format": "uint64", + "x-go-package": "github.com/cockroachdb/cockroach/pkg/roachpb" + }, "TxnInfo": { "type": "object", "title": "TxnInfo represents an in flight user transaction on some Session.", diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant index f6801210d3bd..cce558567301 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant +++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant @@ -245,15 +245,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index 180868b3df19..9e4ebf63faa8 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -941,9 +941,11 @@ message Session { // Information about the txn in progress on this session. Nil if the // session doesn't currently have a transaction. TxnInfo active_txn = 12; + // The SQL statement fingerprint of the last query executed on this session, // compatible with StatementStatisticsKey. string last_active_query_no_constants = 13; + // Enum for sessions status. enum Status { ACTIVE = 0; @@ -952,9 +954,23 @@ message Session { } // The session's status. Status status = 14; + // Timestamp of session's end. google.protobuf.Timestamp end = 15 [ (gogoproto.nullable) = true, (gogoproto.stdtime) = true ]; + + // Count of the number of transactions that have been opened on this session. + // This count includes transactions that are in progress. + int32 num_txns_executed = 16; + + // List of transaction fingerprint IDs in this session. + repeated uint64 txn_fingerprint_ids = 17 [(gogoproto.customname) = "TxnFingerprintIDs", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID", + (gogoproto.nullable) = false]; + + // The session's total active time. + google.protobuf.Duration total_active_time = 18 [(gogoproto.nullable) = false, + (gogoproto.stdduration) = true]; } // An error wrapper object for ListSessionsResponse. diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index b3791deaef24..71ebc1878278 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -226,6 +226,7 @@ go_library( "testutils.go", "topk.go", "truncate.go", + "txn_fingerprint_id_cache.go", "txn_state.go", "type_change.go", "unary.go", @@ -581,6 +582,7 @@ go_test( "temporary_schema_test.go", "tenant_test.go", "trace_test.go", + "txn_fingerprint_id_cache_test.go", "txn_restart_test.go", "txn_state_test.go", "type_change_test.go", diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 2a6dbaf4ee05..50f6a11a93d7 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -886,6 +886,8 @@ func (s *Server) newConnExecutor( stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder, indexUsageStats: s.indexUsageStats, txnIDCacheWriter: s.txnIDCache, + totalActiveTimeStopWatch: timeutil.NewStopWatch(), + txnFingerprintIDCache: NewTxnFingerprintIDCache(s.cfg.Settings, sessionRootMon), } ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount @@ -1500,6 +1502,14 @@ type connExecutor struct { // txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the // Transaction ID Cache. txnIDCacheWriter txnidcache.Writer + + // txnFingerprintIDCache is used to track the most recent + // txnFingerprintIDs executed in this session. + txnFingerprintIDCache *TxnFingerprintIDCache + + // totalActiveTimeStopWatch tracks the total active time of the session. + // This is defined as the time spent executing transactions and statements. + totalActiveTimeStopWatch *timeutil.StopWatch } // ctxHolder contains a connection's context and, while session tracing is @@ -2890,6 +2900,10 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( // Start of the transaction, so no statements were executed earlier. // Bump the txn counter for logging. ex.extraTxnState.txnCounter++ + + // Session is considred active when executing a transaction. + ex.totalActiveTimeStopWatch.Start() + if !ex.server.cfg.Codec.ForSystemTenant() { // Update the leased descriptor collection with the current sqlliveness.Session. // This is required in the multi-tenant environment to update the transaction @@ -3133,6 +3147,12 @@ func (ex *connExecutor) serialize() serverpb.Session { remoteStr = sd.RemoteAddr.String() } + txnFingerprintIDs := ex.txnFingerprintIDCache.GetAllTxnFingerprintIDs() + sessionActiveTime := ex.totalActiveTimeStopWatch.Elapsed() + if started, startedAt := ex.totalActiveTimeStopWatch.StartedAt(); started { + sessionActiveTime = time.Duration(sessionActiveTime.Nanoseconds() + timeutil.Since(startedAt).Nanoseconds()) + } + return serverpb.Session{ Username: sd.SessionUser().Normalized(), ClientAddress: remoteStr, @@ -3140,12 +3160,15 @@ func (ex *connExecutor) serialize() serverpb.Session { Start: ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionInit).UTC(), ActiveQueries: activeQueries, ActiveTxn: activeTxnInfo, + NumTxnsExecuted: int32(ex.extraTxnState.txnCounter), + TxnFingerprintIDs: txnFingerprintIDs, LastActiveQuery: lastActiveQuery, ID: ex.sessionID.GetBytes(), AllocBytes: ex.mon.AllocBytes(), MaxAllocBytes: ex.mon.MaximumBytes(), LastActiveQueryNoConstants: lastActiveQueryNoConstants, Status: status, + TotalActiveTime: sessionActiveTime, } } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index c4833c8c936b..b90de30df038 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2105,6 +2105,14 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndExecTransaction, timeutil.Now()) transactionFingerprintID := roachpb.TransactionFingerprintID(ex.extraTxnState.transactionStatementsHash.Sum()) + + err := ex.txnFingerprintIDCache.Add(transactionFingerprintID) + if err != nil { + if log.V(1) { + log.Warningf(ctx, "failed to enqueue transactionFingerprintID = %d: %s", transactionFingerprintID, err) + } + } + if !implicit { ex.statsCollector.EndExplicitTransaction( ctx, @@ -2118,7 +2126,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { transactionFingerprintID, ) } - err := ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart) + err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart) if err != nil { if log.V(1) { log.Warningf(ctx, "failed to record transaction stats: %s", err) @@ -2214,6 +2222,7 @@ func (ex *connExecutor) recordTransactionFinish( txnEnd := timeutil.Now() txnTime := txnEnd.Sub(txnStart) + ex.totalActiveTimeStopWatch.Stop() if ex.executorType != executorTypeInternal { ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1) } diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index e9dc940b7529..c8ca11ccc409 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -1571,6 +1572,94 @@ func TestEmptyTxnIsBeingCorrectlyCounted(t *testing.T) { "after executing empty transactions, but it was not") } +//TestSessionTotalActiveTime tests that a session's total active time is +//correctly being recorded as transactions are executed. +func TestSessionTotalActiveTime(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params := base.TestServerArgs{} + s, mainDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + _, err := mainDB.Exec(fmt.Sprintf("CREATE USER %s", username.TestUser)) + if err != nil { + t.Fatal(err) + } + + pgURL, cleanupDB := sqlutils.PGUrl( + t, s.ServingSQLAddr(), "TestSessionTotalActiveTime", url.User(username.TestUser)) + defer cleanupDB() + rawSQL, err := gosql.Open("postgres", pgURL.String()) + if err != nil { + t.Fatal(err) + } + + defer func() { + err := rawSQL.Close() + if err != nil { + t.Fatal(err) + } + }() + + getSessionWithTestUser := func() *serverpb.Session { + sessions := s.SQLServer().(*sql.Server).GetExecutorConfig().SessionRegistry.SerializeAll() + for _, s := range sessions { + if s.Username == username.TestUser { + return &s + } + } + t.Fatalf("expected session with username %s", username.TestUser) + return nil + } + + sqlDB := sqlutils.MakeSQLRunner(rawSQL) + sqlDB.Exec(t, "SELECT 1") + session := getSessionWithTestUser() + activeTimeNanos := session.TotalActiveTime.Nanoseconds() + + // We will execute different types of transactions. + // After each execution, verify the total active time has increased, but is no + // longer increasing after the transaction has completed. + testCases := []struct { + Query string + // SessionActiveAfterExecution signifies that the active time should still be active after this query. + SessionActiveAfterExecution bool + }{ + {"SELECT 1", false}, + // Test explicit transaction. + {"BEGIN", true}, + {"SELECT 1", true}, + {"SELECT 1, 2", true}, + {"COMMIT", false}, + {"BEGIN", true}, + {"SELECT crdb_internal.force_retry('1s')", true}, + {"COMMIT", false}, + } + + for _, tc := range testCases { + sqlDB.Exec(t, tc.Query) + if tc.Query == "crdb_internal.force_retry('1s'" { + continue + } + // Check that the total active time has increased. + session = getSessionWithTestUser() + require.Greater(t, session.TotalActiveTime.Nanoseconds(), activeTimeNanos) + + activeTimeNanos = session.TotalActiveTime.Nanoseconds() + session = getSessionWithTestUser() + + if tc.SessionActiveAfterExecution { + require.Greater(t, session.TotalActiveTime.Nanoseconds(), activeTimeNanos) + } else { + require.Equal(t, activeTimeNanos, session.TotalActiveTime.Nanoseconds()) + } + + activeTimeNanos = session.TotalActiveTime.Nanoseconds() + } +} + // dynamicRequestFilter exposes a filter method which is a // kvserverbase.ReplicaRequestFilter but can be set dynamically. type dynamicRequestFilter struct { diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 8acc4b71d34e..2f422b885fa0 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1619,7 +1619,7 @@ CREATE TABLE crdb_internal.%s ( node_id INT, -- the ID of the node running the transaction session_id STRING, -- the ID of the session start TIMESTAMP, -- the start time of the transaction - txn_string STRING, -- the string representation of the transcation + txn_string STRING, -- the string representation of the transaction application_name STRING, -- the name of the application as per SET application_name num_stmts INT, -- the number of statements executed so far num_retries INT, -- the number of times the transaction was restarted @@ -1898,6 +1898,7 @@ CREATE TABLE crdb_internal.%s ( application_name STRING, -- the name of the application as per SET application_name active_queries STRING, -- the currently running queries as SQL last_active_query STRING, -- the query that finished last on this session as SQL + num_txns_executed INT, -- the number of transactions that were executed so far on this session session_start TIMESTAMP, -- the time when the session was opened oldest_query_start TIMESTAMP, -- the time when the oldest query in the session was started kv_txn STRING, -- the ID of the current KV transaction @@ -1999,6 +2000,7 @@ func populateSessionsTable( tree.NewDString(session.ApplicationName), tree.NewDString(activeQueries.String()), tree.NewDString(session.LastActiveQuery), + tree.NewDInt(tree.DInt(session.NumTxnsExecuted)), startTSDatum, oldestStartDatum, kvTxnIDDatum, @@ -2024,6 +2026,7 @@ func populateSessionsTable( tree.DNull, // application name tree.NewDString("-- "+rpcErr.Message), // active queries tree.DNull, // last active query + tree.DNull, // num txns executed tree.DNull, // session start tree.DNull, // oldest_query_start tree.DNull, // kv_txn diff --git a/pkg/sql/delegate/show_sessions.go b/pkg/sql/delegate/show_sessions.go index f4adcc97f7f8..2a31c1181074 100644 --- a/pkg/sql/delegate/show_sessions.go +++ b/pkg/sql/delegate/show_sessions.go @@ -16,7 +16,7 @@ import ( ) func (d *delegator) delegateShowSessions(n *tree.ShowSessions) (tree.Statement, error) { - const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.` + const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, num_txns_executed, session_start, oldest_query_start FROM crdb_internal.` table := `node_sessions` if n.Cluster { table = `cluster_sessions` diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index ee3c90c0204c..ae2ea6a2bafe 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -365,15 +365,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index 3a102fda525c..628ecd43f037 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -324,6 +324,7 @@ CREATE TABLE crdb_internal.cluster_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -339,6 +340,7 @@ CREATE TABLE crdb_internal.cluster_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -958,6 +960,7 @@ CREATE TABLE crdb_internal.node_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -973,6 +976,7 @@ CREATE TABLE crdb_internal.node_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, diff --git a/pkg/sql/testdata/txn_fingerprint_id_cache b/pkg/sql/testdata/txn_fingerprint_id_cache new file mode 100644 index 000000000000..533be5d3e1bb --- /dev/null +++ b/pkg/sql/testdata/txn_fingerprint_id_cache @@ -0,0 +1,77 @@ +# Initialize TxnFingerprintIDCache +init capacity=10 +---- +size: 0 + +# Add four TxnFingerprintIDs: 5, 6, 7, 8 +enqueue id=05 +---- +size: 1 + +enqueue id=06 +---- +size: 2 + +enqueue id=07 +---- +size: 3 + +enqueue id=08 +---- +size: 4 + +# There should be 4 valid TxnFingerprintIDs +show +---- +[8 7 6 5] + +enqueue id=09 +---- +size: 5 + +enqueue id=10 +---- +size: 6 + +show +---- +[10 9 8 7 6 5] + +# Decrease the TxnFingerprintIDCacheCapacity cluster setting to below current size. +override capacity=3 +---- +TxnFingerprintIDCacheCapacity: 3 + +# Enqueue another id to +enqueue id=11 +---- +size: 3 + +# The cache should have the most recent 3 insertions. +show +---- +[11 10 9] + +# Check that retrieving IDs also properly truncates the cache when the capacity has +# been changed. +# Increase capacity back up to 5, insert some values, then decrease capacity to 2 and +# retrieve all ids. +override capacity=5 +---- +TxnFingerprintIDCacheCapacity: 5 + +enqueue id=12 +---- +size: 4 + +enqueue id=13 +---- +size: 5 + +override capacity=2 +---- +TxnFingerprintIDCacheCapacity: 2 + +show +---- +[13 12] diff --git a/pkg/sql/txn_fingerprint_id_cache.go b/pkg/sql/txn_fingerprint_id_cache.go new file mode 100644 index 000000000000..f5341e01d929 --- /dev/null +++ b/pkg/sql/txn_fingerprint_id_cache.go @@ -0,0 +1,129 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// TxnFingerprintIDCacheCapacity is the cluster setting that controls the +// capacity of the txn fingerprint ID cache. The cache will be resized +// on the next insert or get operation. +var TxnFingerprintIDCacheCapacity = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.txn_fingerprint_id_cache.capacity", + "the maximum number of txn fingerprint IDs stored", + 100, + settings.NonNegativeInt, +).WithPublic() + +// TxnFingerprintIDCache is a thread-safe cache tracking transaction +// fingerprint IDs at the session level. +type TxnFingerprintIDCache struct { + st *cluster.Settings + + mu struct { + syncutil.RWMutex + acc *mon.BoundAccount + cache *cache.UnorderedCache + } + + mon *mon.BytesMonitor +} + +// NewTxnFingerprintIDCache returns a new TxnFingerprintIDCache. +func NewTxnFingerprintIDCache( + st *cluster.Settings, parentMon *mon.BytesMonitor, +) *TxnFingerprintIDCache { + b := &TxnFingerprintIDCache{st: st} + + b.mu.cache = cache.NewUnorderedCache(cache.Config{ + Policy: cache.CacheFIFO, + ShouldEvict: func(size int, _, _ interface{}) bool { + // Note that because the cache evicts as many elements as possible + // when adding an element, the cache will appropriately truncate + // when the capacity cluster setting is changed on the addition + // of an entry. + capacity := TxnFingerprintIDCacheCapacity.Get(&st.SV) + return int64(size) > capacity + }, + OnEvictedEntry: func(entry *cache.Entry) { + b.mu.acc.Shrink(context.Background(), 1) + }, + }) + + monitor := mon.NewMonitorInheritWithLimit("txn-fingerprint-id-cache", 0 /* limit */, parentMon) + b.mon = monitor + b.mon.Start(context.Background(), parentMon, mon.BoundAccount{}) + + return b +} + +// Add adds a TxnFingerprintID to the cache, truncating the cache to the +func (b *TxnFingerprintIDCache) Add(value roachpb.TransactionFingerprintID) error { + b.mu.Lock() + defer b.mu.Unlock() + + if err := b.mu.acc.Grow(context.Background(), 1); err != nil { + return err + } + + b.mu.cache.Add(value, value) + + return nil +} + +// GetAllTxnFingerprintIDs returns a slice of all TxnFingerprintIDs in the cache. +// The cache may be truncated if the capacity was updated to a smaller size. +func (b *TxnFingerprintIDCache) GetAllTxnFingerprintIDs() []roachpb.TransactionFingerprintID { + b.mu.Lock() + defer b.mu.Unlock() + + size := int64(b.mu.cache.Len()) + capacity := TxnFingerprintIDCacheCapacity.Get(&b.st.SV) + if size > capacity { + size = capacity + } + + txnFingerprintIDs := make([]roachpb.TransactionFingerprintID, 0, size) + txnFingerprintIDsRemoved := make([]roachpb.TransactionFingerprintID, 0) + + b.mu.cache.Do(func(entry *cache.Entry) { + id := entry.Value.(roachpb.TransactionFingerprintID) + + if int64(len(txnFingerprintIDs)) == size { + txnFingerprintIDsRemoved = append(txnFingerprintIDsRemoved, id) + return + } + + txnFingerprintIDs = append(txnFingerprintIDs, id) + }) + + for _, id := range txnFingerprintIDsRemoved { + b.mu.cache.Del(id) + } + + return txnFingerprintIDs +} + +func (b *TxnFingerprintIDCache) size() int { + b.mu.RLock() + defer b.mu.RUnlock() + + return b.mu.cache.Len() +} diff --git a/pkg/sql/txn_fingerprint_id_cache_test.go b/pkg/sql/txn_fingerprint_id_cache_test.go new file mode 100644 index 000000000000..0180a4a5fea6 --- /dev/null +++ b/pkg/sql/txn_fingerprint_id_cache_test.go @@ -0,0 +1,158 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "fmt" + "math" + "sort" + "strconv" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +func TestTxnFingerprintIDCacheDataDriven(t *testing.T) { + defer leaktest.AfterTest(t)() + var txnFingerprintIDCache *TxnFingerprintIDCache + + datadriven.Walk(t, testutils.TestDataPath(t, "txn_fingerprint_id_cache"), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + ctx := context.Background() + switch d.Cmd { + case "init": + var capacity int + d.ScanArgs(t, "capacity", &capacity) + + st := &cluster.Settings{} + monitor := mon.NewUnlimitedMonitor( + ctx, + "test", + mon.MemoryResource, + nil, /* currCount */ + nil, /* maxHist */ + math.MaxInt64, + st, + ) + txnFingerprintIDCache = NewTxnFingerprintIDCache(st, monitor) + + TxnFingerprintIDCacheCapacity.Override(ctx, &st.SV, int64(capacity)) + + return fmt.Sprintf("size: %d", txnFingerprintIDCache.size()) + + case "override": + var capacity int + d.ScanArgs(t, "capacity", &capacity) + TxnFingerprintIDCacheCapacity.Override(ctx, &txnFingerprintIDCache.st.SV, int64(capacity)) + capacityClusterSetting := TxnFingerprintIDCacheCapacity.Get(&txnFingerprintIDCache.st.SV) + return fmt.Sprintf("TxnFingerprintIDCacheCapacity: %d", capacityClusterSetting) + + case "enqueue": + var idStr string + d.ScanArgs(t, "id", &idStr) + + id, err := strconv.ParseUint(idStr, 10, 64) + require.NoError(t, err) + txnFingerprintID := roachpb.TransactionFingerprintID(id) + + err = txnFingerprintIDCache.Add(txnFingerprintID) + require.NoError(t, err) + + return fmt.Sprintf("size: %d", txnFingerprintIDCache.size()) + + case "show": + return printTxnFingerprintIDCache(txnFingerprintIDCache) + + default: + } + return "" + + }) + }) +} + +func printTxnFingerprintIDCache(txnFingerprintCache *TxnFingerprintIDCache) string { + txnFingerprintIDs := txnFingerprintCache.GetAllTxnFingerprintIDs() + + return fmt.Sprintf("%d", txnFingerprintIDs) +} + +func TestTxnFingerprintIDCache(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + txnFingerprintIDsRecorded := make([]roachpb.TransactionFingerprintID, 0) + appName := "testTxnFingerprintIDCache" + + params, _ := tests.CreateTestServerParams() + params.Knobs.SQLExecutor = &ExecutorTestingKnobs{ + BeforeTxnStatsRecorded: func( + sessionData *sessiondata.SessionData, + _ uuid.UUID, + txnFingerprintID roachpb.TransactionFingerprintID, + ) { + if !sessionData.Internal { + // Record every query we issue through our sql connection. + txnFingerprintIDsRecorded = append(txnFingerprintIDsRecorded, txnFingerprintID) + } + }, + } + + testServer, sqlConn, _ := serverutils.StartServer(t, params) + + defer func() { + require.NoError(t, sqlConn.Close()) + testServer.Stopper().Stop(ctx) + }() + + testConn := sqlutils.MakeSQLRunner(sqlConn) + + testConn.Exec(t, "SET application_name = $1", appName) + testConn.Exec(t, "CREATE TABLE test AS SELECT generate_series(1, 10)") + testConn.Exec(t, "SELECT * FROM test") + testConn.Exec(t, "BEGIN; SELECT 1; SELECT 1, 2, 3; COMMIT;") + + sessions := testServer.SQLServer().(*Server).GetExecutorConfig().SessionRegistry.SerializeAll() + + var session *serverpb.Session + for i, s := range sessions { + if s.ApplicationName == appName { + session = &sessions[i] + } + } + require.NotNil(t, session) + + sort.Slice(session.TxnFingerprintIDs, func(i, j int) bool { + return session.TxnFingerprintIDs[i] < session.TxnFingerprintIDs[j] + }) + + sort.Slice(txnFingerprintIDsRecorded, func(i, j int) bool { + return txnFingerprintIDsRecorded[i] < txnFingerprintIDsRecorded[j] + }) + + require.Equal(t, txnFingerprintIDsRecorded, session.TxnFingerprintIDs) +} diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts index bfc7aafa61f0..9b03f7414c13 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts @@ -47,6 +47,8 @@ export const idleSession: SessionInfo = { alloc_bytes: Long.fromNumber(0), max_alloc_bytes: Long.fromNumber(10240), active_queries: [], + num_txns_executed: 1, + txn_fingerprint_ids: [], status: Status.IDLE, toJSON: () => ({}), }, @@ -85,6 +87,8 @@ export const idleTransactionSession: SessionInfo = { }, last_active_query_no_constants: "SHOW database", active_queries: [], + num_txns_executed: 1, + txn_fingerprint_ids: [], status: Status.IDLE, toJSON: () => ({}), }, @@ -138,6 +142,8 @@ export const activeSession: SessionInfo = { }, last_active_query_no_constants: "SHOW database", status: Status.ACTIVE, + num_txns_executed: 1, + txn_fingerprint_ids: [], toJSON: () => ({}), }, }; @@ -163,6 +169,8 @@ export const closedSession: SessionInfo = { nanos: 369989000, }, status: Status.CLOSED, + num_txns_executed: 1, + txn_fingerprint_ids: [], toJSON: () => ({}), }, }; diff --git a/pkg/util/timeutil/stopwatch.go b/pkg/util/timeutil/stopwatch.go index ed3bf152b517..78b0f26d7709 100644 --- a/pkg/util/timeutil/stopwatch.go +++ b/pkg/util/timeutil/stopwatch.go @@ -81,6 +81,15 @@ func (w *StopWatch) Elapsed() time.Duration { return w.mu.elapsed } +// StartedAt returns a bool indicating if the stopwatch has started, and the +// time at which the stopwatch was started. If the stopwatch is stopped, +// the time returned is the last time the stopwatch was started. +func (w *StopWatch) StartedAt() (started bool, startedAt time.Time) { + w.mu.Lock() + defer w.mu.Unlock() + return w.mu.started, w.mu.startedAt +} + // TestTimeSource is a source of time that remembers when it was created (in // terms of the real time) and returns the time based on its creation time and // the number of "advances" it has had. It is used for testing only.