diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index de506720120c..5f4deacf061f 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2160,10 +2160,7 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { ex.state.mu.RUnlock() implicit := ex.implicitTxn() - // Transaction received time is the time at which the statement that prompted - // the creation of this transaction was received. - ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionReceived, - ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived)) + ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionStarted, txnStart) ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction, timeutil.Now()) ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionMostRecentStartExecTransaction, ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction)) @@ -2256,6 +2253,10 @@ func (ex *connExecutor) recordTransactionFinish( BytesRead: ex.extraTxnState.bytesRead, } + if ex.server.cfg.TestingKnobs.OnRecordTxnFinish != nil { + ex.server.cfg.TestingKnobs.OnRecordTxnFinish(ex.executorType == executorTypeInternal, ex.phaseTimes, ex.planner.stmt.SQL) + } + return ex.statsCollector.RecordTransaction( ctx, transactionFingerprintID, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 134311462b5e..172c28f86035 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -84,6 +84,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" + "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -1471,6 +1472,10 @@ type ExecutorTestingKnobs struct { // AfterBackupCheckpoint if set will be called after a BACKUP-CHECKPOINT // is written. AfterBackupCheckpoint func() + + // OnRecordTxnFinish, if set, will be called as we record a transaction + // finishing. + OnRecordTxnFinish func(isInternal bool, phaseTimes *sessionphase.Times, stmt string) } // PGWireTestingKnobs contains knobs for the pgwire module. diff --git a/pkg/sql/sessionphase/session_phase.go b/pkg/sql/sessionphase/session_phase.go index daf5d78e3920..0218003d164a 100644 --- a/pkg/sql/sessionphase/session_phase.go +++ b/pkg/sql/sessionphase/session_phase.go @@ -59,9 +59,9 @@ const ( // have no execution, like SHOW TRANSACTION STATUS. SessionQueryServiced - // SessionTransactionReceived is the SessionPhase when a transaction is - // received. - SessionTransactionReceived + // SessionTransactionStarted is the SessionPhase when a transaction is + // started. + SessionTransactionStarted // SessionFirstStartExecTransaction is the SessionPhase when a transaction // is started for the first time. @@ -197,7 +197,7 @@ func (t *Times) GetTransactionRetryLatency() time.Duration { // GetTransactionServiceLatency returns the total time to service the // transaction. func (t *Times) GetTransactionServiceLatency() time.Duration { - return t.times[SessionEndExecTransaction].Sub(t.times[SessionTransactionReceived]) + return t.times[SessionEndExecTransaction].Sub(t.times[SessionTransactionStarted]) } // GetCommitLatency returns the total time spent for the transaction to diff --git a/pkg/sql/sqlstats/sslocal/BUILD.bazel b/pkg/sql/sqlstats/sslocal/BUILD.bazel index 8fb12789d4ab..37ba450364dc 100644 --- a/pkg/sql/sqlstats/sslocal/BUILD.bazel +++ b/pkg/sql/sqlstats/sslocal/BUILD.bazel @@ -66,6 +66,7 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/mon", + "@com_github_jackc_pgx_v4//:pgx", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_test.go b/pkg/sql/sqlstats/sslocal/sql_stats_test.go index 08b57e8a1d69..00d5e908dae7 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats_test.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats_test.go @@ -13,7 +13,9 @@ package sslocal_test import ( "context" "math" + "net/url" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -36,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/jackc/pgx/v4" "github.com/stretchr/testify/require" ) @@ -669,3 +672,61 @@ func TestUnprivilegedUserReset(t *testing.T) { require.Contains(t, err.Error(), "requires admin privilege") } + +func TestTransactionServiceLatencyOnExtendedProtocol(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + testData := []*struct { + query string + placeholders []interface{} + phaseTimes *sessionphase.Times + }{ + { + query: "SELECT $1::INT8", + placeholders: []interface{}{1}, + phaseTimes: nil, + }, + } + + waitTxnFinish := make(chan struct{}) + currentTestCaseIdx := 0 + const latencyThreshold = time.Second * 5 + + params, _ := tests.CreateTestServerParams() + params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ + OnRecordTxnFinish: func(isInternal bool, phaseTimes *sessionphase.Times, stmt string) { + if !isInternal && testData[currentTestCaseIdx].query == stmt { + testData[currentTestCaseIdx].phaseTimes = phaseTimes.Clone() + go func() { + waitTxnFinish <- struct{}{} + }() + } + }, + } + s, _, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + pgURL, cleanupGoDB := sqlutils.PGUrl( + t, s.ServingSQLAddr(), "StartServer", url.User(security.RootUser)) + defer cleanupGoDB() + c, err := pgx.Connect(ctx, pgURL.String()) + require.NoError(t, err, "error connecting with pg url") + + for currentTestCaseIdx < len(testData) { + tc := testData[currentTestCaseIdx] + // Make extended protocol query + _ = c.QueryRow(ctx, tc.query, tc.placeholders...) + require.NoError(t, err, "error scanning row") + <-waitTxnFinish + + // Ensure test case phase times are populated by query txn. + require.True(t, tc.phaseTimes != nil) + // Ensure SessionTransactionStarted variable is populated. + require.True(t, !tc.phaseTimes.GetSessionPhaseTime(sessionphase.SessionTransactionStarted).IsZero()) + // Ensure compute transaction service latency is within a reasonable threshold. + require.True(t, tc.phaseTimes.GetTransactionServiceLatency() < latencyThreshold) + currentTestCaseIdx++ + } +}