Skip to content

Commit

Permalink
server, sql: surface session txnCount, txn fingerprints, active time
Browse files Browse the repository at this point in the history
Partially addresses #74257.

Previously, the status server did not provide session details such as
total number of transactions executed, transaction fingerprint
IDs, and total active time. This change adds the aforementioned session
details to the `serverpb.Session` struct.

To track recently executed transaction fingerprint IDs, a FIFO cache
`TxnFingerprintIDCache` is introduced with its corresponding cluster
setting `TxnFingerprintIDBufferCapacity` to control the capacity. The
default capacity is set at 100 fingerprints.

The total number of transactions executed is filled using the existing
`txnCounter` from the `extraTxnState` in `connExecutor`. The total active
time is calculated by introducing a `timeutil.StopWatch` to the connection
executor, which is started and stopped when a transaction is started and
finished respectively.

Release note (api change): the `serverpb.Session` struct now has three
new fields: number of transactions executed, transaction fingerprint
IDs, and total active time.
  • Loading branch information
Gerardo Torres authored and xinhaoz committed Jun 21, 2022
1 parent 377b113 commit ceb5981
Show file tree
Hide file tree
Showing 19 changed files with 575 additions and 10 deletions.
6 changes: 6 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -2111,6 +2111,9 @@ Session represents one SQL session.
| last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) |
| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) |
| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) |
| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Count of the number of transactions that have been opened on this session. This count includes transactions that are in progress. | [reserved](#support-status) |
| txn_fingerprint_ids | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) |
| total_active_time | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) |



Expand Down Expand Up @@ -2247,6 +2250,9 @@ Session represents one SQL session.
| last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) |
| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) |
| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) |
| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Count of the number of transactions that have been opened on this session. This count includes transactions that are in progress. | [reserved](#support-status) |
| txn_fingerprint_ids | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) |
| total_active_time | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) |



Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ sql.ttl.default_delete_rate_limit integer 0 default delete rate limit for all TT
sql.ttl.default_range_concurrency integer 1 default amount of ranges to process at once during a TTL delete
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job
sql.ttl.job.enabled boolean true whether the TTL job is enabled
sql.txn_fingerprint_id_cache.capacity integer 100 the maximum number of txn fingerprint IDs stored
timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere
timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.
timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@
<tr><td><code>sql.ttl.default_range_concurrency</code></td><td>integer</td><td><code>1</code></td><td>default amount of ranges to process at once during a TTL delete</td></tr>
<tr><td><code>sql.ttl.default_select_batch_size</code></td><td>integer</td><td><code>500</code></td><td>default amount of rows to select in a single query during a TTL job</td></tr>
<tr><td><code>sql.ttl.job.enabled</code></td><td>boolean</td><td><code>true</code></td><td>whether the TTL job is enabled</td></tr>
<tr><td><code>sql.txn_fingerprint_id_cache.capacity</code></td><td>integer</td><td><code>100</code></td><td>the maximum number of txn fingerprint IDs stored</td></tr>
<tr><td><code>timeseries.storage.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere</td></tr>
<tr><td><code>timeseries.storage.resolution_10s.ttl</code></td><td>duration</td><td><code>240h0m0s</code></td><td>the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.</td></tr>
<tr><td><code>timeseries.storage.resolution_30m.ttl</code></td><td>duration</td><td><code>2160h0m0s</code></td><td>the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.</td></tr>
Expand Down
29 changes: 29 additions & 0 deletions docs/generated/swagger/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,12 @@
},
"x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb"
},
"Duration": {
"description": "A Duration represents the elapsed time between two instants\nas an int64 nanosecond count. The representation limits the\nlargest representable duration to approximately 290 years.",
"type": "integer",
"format": "int64",
"x-go-package": "time"
},
"EventsResponse": {
"description": "EventsResponse contains a set of event log entries. This is always limited\nto the latest N entries (N is enforced in the associated endpoint).",
"type": "object",
Expand Down Expand Up @@ -1225,6 +1231,12 @@
"node_id": {
"$ref": "#/definitions/NodeID"
},
"num_txns_executed": {
"description": "Count of the number of transactions that have been opened on this session.\nThis count includes transactions that are in progress.",
"type": "integer",
"format": "int32",
"x-go-name": "NumTxnsExecuted"
},
"start": {
"description": "Timestamp of session's start.",
"type": "string",
Expand All @@ -1234,6 +1246,17 @@
"status": {
"$ref": "#/definitions/Session_Status"
},
"total_active_time": {
"$ref": "#/definitions/Duration"
},
"txn_fingerprint_ids": {
"description": "List of transaction fingerprint IDs in this session.",
"type": "array",
"items": {
"$ref": "#/definitions/TransactionFingerprintID"
},
"x-go-name": "TxnFingerprintIDs"
},
"username": {
"description": "Username of the user for this session.",
"type": "string",
Expand Down Expand Up @@ -1498,6 +1521,12 @@
},
"x-go-package": "github.com/cockroachdb/cockroach/pkg/util/hlc"
},
"TransactionFingerprintID": {
"description": "TransactionFingerprintID is the hashed string constructed using the\nindividual statement fingerprint IDs that comprise the transaction.",
"type": "integer",
"format": "uint64",
"x-go-package": "github.com/cockroachdb/cockroach/pkg/roachpb"
},
"TxnInfo": {
"type": "object",
"title": "TxnInfo represents an in flight user transaction on some Session.",
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0
----
id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries last_auto_retry_reason

query ITTTTTTTTTTTTT colnames
query ITTTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0
----
node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end

query ITTTTTTTTTTTTT colnames
query ITTTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0
----
node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end

query IIITTTI colnames
SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0
Expand Down
16 changes: 16 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -949,9 +949,11 @@ message Session {
// Information about the txn in progress on this session. Nil if the
// session doesn't currently have a transaction.
TxnInfo active_txn = 12;

// The SQL statement fingerprint of the last query executed on this session,
// compatible with StatementStatisticsKey.
string last_active_query_no_constants = 13;

// Enum for sessions status.
enum Status {
ACTIVE = 0;
Expand All @@ -960,9 +962,23 @@ message Session {
}
// The session's status.
Status status = 14;

// Timestamp of session's end.
google.protobuf.Timestamp end = 15
[ (gogoproto.nullable) = true, (gogoproto.stdtime) = true ];

// Count of the number of transactions that have been opened on this session.
// This count includes transactions that are in progress.
int32 num_txns_executed = 16;

// List of transaction fingerprint IDs in this session.
repeated uint64 txn_fingerprint_ids = 17 [(gogoproto.customname) = "TxnFingerprintIDs",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID",
(gogoproto.nullable) = false];

// The session's total active time.
google.protobuf.Duration total_active_time = 18 [(gogoproto.nullable) = false,
(gogoproto.stdduration) = true];
}

// An error wrapper object for ListSessionsResponse.
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ go_library(
"testutils.go",
"topk.go",
"truncate.go",
"txn_fingerprint_id_cache.go",
"txn_state.go",
"type_change.go",
"unary.go",
Expand Down Expand Up @@ -585,6 +586,7 @@ go_test(
"temporary_schema_test.go",
"tenant_test.go",
"trace_test.go",
"txn_fingerprint_id_cache_test.go",
"txn_restart_test.go",
"txn_state_test.go",
"type_change_test.go",
Expand Down
23 changes: 23 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,8 @@ func (s *Server) newConnExecutor(
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
indexUsageStats: s.indexUsageStats,
txnIDCacheWriter: s.txnIDCache,
totalActiveTimeStopWatch: timeutil.NewStopWatch(),
txnFingerprintIDCache: NewTxnFingerprintIDCache(s.cfg.Settings, sessionRootMon),
}

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
Expand Down Expand Up @@ -1489,6 +1491,14 @@ type connExecutor struct {
// txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the
// Transaction ID Cache.
txnIDCacheWriter txnidcache.Writer

// txnFingerprintIDCache is used to track the most recent
// txnFingerprintIDs executed in this session.
txnFingerprintIDCache *TxnFingerprintIDCache

// totalActiveTimeStopWatch tracks the total active time of the session.
// This is defined as the time spent executing transactions and statements.
totalActiveTimeStopWatch *timeutil.StopWatch
}

// ctxHolder contains a connection's context and, while session tracing is
Expand Down Expand Up @@ -2865,6 +2875,10 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
// Start of the transaction, so no statements were executed earlier.
// Bump the txn counter for logging.
ex.extraTxnState.txnCounter++

// Session is considered active when executing a transaction.
ex.totalActiveTimeStopWatch.Start()

if !ex.server.cfg.Codec.ForSystemTenant() {
// Update the leased descriptor collection with the current sqlliveness.Session.
// This is required in the multi-tenant environment to update the transaction
Expand Down Expand Up @@ -3120,19 +3134,28 @@ func (ex *connExecutor) serialize() serverpb.Session {
remoteStr = sd.RemoteAddr.String()
}

txnFingerprintIDs := ex.txnFingerprintIDCache.GetAllTxnFingerprintIDs()
sessionActiveTime := ex.totalActiveTimeStopWatch.Elapsed()
if startedAt, started := ex.totalActiveTimeStopWatch.LastStartedAt(); started {
sessionActiveTime = time.Duration(sessionActiveTime.Nanoseconds() + timeutil.Since(startedAt).Nanoseconds())
}

return serverpb.Session{
Username: sd.SessionUser().Normalized(),
ClientAddress: remoteStr,
ApplicationName: ex.applicationName.Load().(string),
Start: ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionInit).UTC(),
ActiveQueries: activeQueries,
ActiveTxn: activeTxnInfo,
NumTxnsExecuted: int32(ex.extraTxnState.txnCounter),
TxnFingerprintIDs: txnFingerprintIDs,
LastActiveQuery: lastActiveQuery,
ID: ex.sessionID.GetBytes(),
AllocBytes: ex.mon.AllocBytes(),
MaxAllocBytes: ex.mon.MaximumBytes(),
LastActiveQueryNoConstants: lastActiveQueryNoConstants,
Status: status,
TotalActiveTime: sessionActiveTime,
}
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2110,6 +2110,14 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndExecTransaction, timeutil.Now())
transactionFingerprintID :=
roachpb.TransactionFingerprintID(ex.extraTxnState.transactionStatementsHash.Sum())

err := ex.txnFingerprintIDCache.Add(transactionFingerprintID)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to enqueue transactionFingerprintID = %d: %s", transactionFingerprintID, err)
}
}

if !implicit {
ex.statsCollector.EndExplicitTransaction(
ctx,
Expand All @@ -2123,7 +2131,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
transactionFingerprintID,
)
}
err := ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart)
err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to record transaction stats: %s", err)
Expand Down Expand Up @@ -2219,6 +2227,7 @@ func (ex *connExecutor) recordTransactionFinish(

txnEnd := timeutil.Now()
txnTime := txnEnd.Sub(txnStart)
ex.totalActiveTimeStopWatch.Stop()
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
}
Expand Down
89 changes: 89 additions & 0 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -1623,6 +1624,94 @@ func TestEmptyTxnIsBeingCorrectlyCounted(t *testing.T) {
"after executing empty transactions, but it was not")
}

//TestSessionTotalActiveTime tests that a session's total active time is
//correctly being recorded as transactions are executed.
func TestSessionTotalActiveTime(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params := base.TestServerArgs{}
s, mainDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

_, err := mainDB.Exec(fmt.Sprintf("CREATE USER %s", username.TestUser))
if err != nil {
t.Fatal(err)
}

pgURL, cleanupDB := sqlutils.PGUrl(
t, s.ServingSQLAddr(), "TestSessionTotalActiveTime", url.User(username.TestUser))
defer cleanupDB()
rawSQL, err := gosql.Open("postgres", pgURL.String())
if err != nil {
t.Fatal(err)
}

defer func() {
err := rawSQL.Close()
if err != nil {
t.Fatal(err)
}
}()

getSessionWithTestUser := func() *serverpb.Session {
sessions := s.SQLServer().(*sql.Server).GetExecutorConfig().SessionRegistry.SerializeAll()
for _, s := range sessions {
if s.Username == username.TestUser {
return &s
}
}
t.Fatalf("expected session with username %s", username.TestUser)
return nil
}

sqlDB := sqlutils.MakeSQLRunner(rawSQL)
sqlDB.Exec(t, "SELECT 1")
session := getSessionWithTestUser()
activeTimeNanos := session.TotalActiveTime.Nanoseconds()

// We will execute different types of transactions.
// After each execution, verify the total active time has increased, but is no
// longer increasing after the transaction has completed.
testCases := []struct {
Query string
// SessionActiveAfterExecution signifies that the active time should still be active after this query.
SessionActiveAfterExecution bool
}{
{"SELECT 1", false},
// Test explicit transaction.
{"BEGIN", true},
{"SELECT 1", true},
{"SELECT 1, 2", true},
{"COMMIT", false},
{"BEGIN", true},
{"SELECT crdb_internal.force_retry('1s')", true},
{"COMMIT", false},
}

for _, tc := range testCases {
sqlDB.Exec(t, tc.Query)
if tc.Query == "crdb_internal.force_retry('1s'" {
continue
}
// Check that the total active time has increased.
session = getSessionWithTestUser()
require.Greater(t, session.TotalActiveTime.Nanoseconds(), activeTimeNanos)

activeTimeNanos = session.TotalActiveTime.Nanoseconds()
session = getSessionWithTestUser()

if tc.SessionActiveAfterExecution {
require.Greater(t, session.TotalActiveTime.Nanoseconds(), activeTimeNanos)
} else {
require.Equal(t, activeTimeNanos, session.TotalActiveTime.Nanoseconds())
}

activeTimeNanos = session.TotalActiveTime.Nanoseconds()
}
}

// dynamicRequestFilter exposes a filter method which is a
// kvserverbase.ReplicaRequestFilter but can be set dynamically.
type dynamicRequestFilter struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,7 @@ CREATE TABLE crdb_internal.%s (
application_name STRING, -- the name of the application as per SET application_name
active_queries STRING, -- the currently running queries as SQL
last_active_query STRING, -- the query that finished last on this session as SQL
num_txns_executed INT, -- the number of transactions that were executed so far on this session
session_start TIMESTAMP, -- the time when the session was opened
oldest_query_start TIMESTAMP, -- the time when the oldest query in the session was started
kv_txn STRING, -- the ID of the current KV transaction
Expand Down Expand Up @@ -2011,6 +2012,7 @@ func populateSessionsTable(
tree.NewDString(session.ApplicationName),
tree.NewDString(activeQueries.String()),
tree.NewDString(session.LastActiveQuery),
tree.NewDInt(tree.DInt(session.NumTxnsExecuted)),
startTSDatum,
oldestStartDatum,
kvTxnIDDatum,
Expand All @@ -2036,6 +2038,7 @@ func populateSessionsTable(
tree.DNull, // application name
tree.NewDString("-- "+rpcErr.Message), // active queries
tree.DNull, // last active query
tree.DNull, // num txns executed
tree.DNull, // session start
tree.DNull, // oldest_query_start
tree.DNull, // kv_txn
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delegate/show_sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

func (d *delegator) delegateShowSessions(n *tree.ShowSessions) (tree.Statement, error) {
const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.`
const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start, num_txns_executed FROM crdb_internal.`
table := `node_sessions`
if n.Cluster {
table = `cluster_sessions`
Expand Down
Loading

0 comments on commit ceb5981

Please sign in to comment.