Skip to content

Commit

Permalink
sql: fix and rename sql stats session transaction received time
Browse files Browse the repository at this point in the history
Resolves: #82894

Due to a change from #76792, implicit transactions can start before
`SessionQueryReceived` session phase time is set by the sqlstats system.
In turn, this caused the `SessionTransactionReceived` (now renamed as
`SessionTransactionStarted`) session phase time to be recorded
incorrectly, causing extremely large transactions times on the UI. This
change fixes this mistake by setting the actual transaction start time
as the `SessionTransactionStarted` session phase time, instead of
`SessionQueryReceived`.

Release note (bug fix): The `SessionTransactionReceived` session phase
time is no longer recorded incorrectly, fixing large transaction times
from appearing on the UI, also renamed to `SessionTransactionStarted`.
  • Loading branch information
Thomas Hardy committed Jul 6, 2022
1 parent 7cfa5d4 commit f8f7459
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 8 deletions.
9 changes: 5 additions & 4 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2182,10 +2182,7 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
ex.state.mu.RUnlock()
implicit := ex.implicitTxn()

// Transaction received time is the time at which the statement that prompted
// the creation of this transaction was received.
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionReceived,
ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived))
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionStarted, txnStart)
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction, timeutil.Now())
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionMostRecentStartExecTransaction,
ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction))
Expand Down Expand Up @@ -2281,6 +2278,10 @@ func (ex *connExecutor) recordTransactionFinish(
BytesRead: ex.extraTxnState.bytesRead,
}

if ex.server.cfg.TestingKnobs.OnRecordTxnFinish != nil {
ex.server.cfg.TestingKnobs.OnRecordTxnFinish(ex.executorType == executorTypeInternal, ex.phaseTimes, ex.planner.stmt.SQL)
}

return ex.statsCollector.RecordTransaction(
ctx,
transactionFingerprintID,
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sessioninit"
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand Down Expand Up @@ -1462,6 +1463,10 @@ type ExecutorTestingKnobs struct {
// AfterBackupCheckpoint if set will be called after a BACKUP-CHECKPOINT
// is written.
AfterBackupCheckpoint func()

// OnRecordTxnFinish, if set, will be called as we record a transaction
// finishing.
OnRecordTxnFinish func(isInternal bool, phaseTimes *sessionphase.Times, stmt string)
}

// PGWireTestingKnobs contains knobs for the pgwire module.
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/sessionphase/session_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ const (
// have no execution, like SHOW TRANSACTION STATUS.
SessionQueryServiced

// SessionTransactionReceived is the SessionPhase when a transaction is
// received.
SessionTransactionReceived
// SessionTransactionStarted is the SessionPhase when a transaction is
// started.
SessionTransactionStarted

// SessionFirstStartExecTransaction is the SessionPhase when a transaction
// is started for the first time.
Expand Down Expand Up @@ -197,7 +197,7 @@ func (t *Times) GetTransactionRetryLatency() time.Duration {
// GetTransactionServiceLatency returns the total time to service the
// transaction.
func (t *Times) GetTransactionServiceLatency() time.Duration {
return t.times[SessionEndExecTransaction].Sub(t.times[SessionTransactionReceived])
return t.times[SessionEndExecTransaction].Sub(t.times[SessionTransactionStarted])
}

// GetCommitLatency returns the total time spent for the transaction to
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/sslocal/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_stretchr_testify//require",
],
)
61 changes: 61 additions & 0 deletions pkg/sql/sqlstats/sslocal/sql_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ package sslocal_test
import (
"context"
"math"
"net/url"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -37,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -672,3 +675,61 @@ func TestUnprivilegedUserReset(t *testing.T) {

require.Contains(t, err.Error(), "requires admin privilege")
}

func TestTransactionServiceLatencyOnExtendedProtocol(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

testData := []*struct {
query string
placeholders []interface{}
phaseTimes *sessionphase.Times
}{
{
query: "SELECT $1::INT8",
placeholders: []interface{}{1},
phaseTimes: nil,
},
}

waitTxnFinish := make(chan struct{})
currentTestCaseIdx := 0
const latencyThreshold = time.Second * 5

params, _ := tests.CreateTestServerParams()
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
OnRecordTxnFinish: func(isInternal bool, phaseTimes *sessionphase.Times, stmt string) {
if !isInternal && testData[currentTestCaseIdx].query == stmt {
testData[currentTestCaseIdx].phaseTimes = phaseTimes.Clone()
go func() {
waitTxnFinish <- struct{}{}
}()
}
},
}
s, _, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

pgURL, cleanupGoDB := sqlutils.PGUrl(
t, s.ServingSQLAddr(), "StartServer", url.User(username.RootUser))
defer cleanupGoDB()
c, err := pgx.Connect(ctx, pgURL.String())
require.NoError(t, err, "error connecting with pg url")

for currentTestCaseIdx < len(testData) {
tc := testData[currentTestCaseIdx]
// Make extended protocol query
_ = c.QueryRow(ctx, tc.query, tc.placeholders...)
require.NoError(t, err, "error scanning row")
<-waitTxnFinish

// Ensure test case phase times are populated by query txn.
require.True(t, tc.phaseTimes != nil)
// Ensure SessionTransactionStarted variable is populated.
require.True(t, !tc.phaseTimes.GetSessionPhaseTime(sessionphase.SessionTransactionStarted).IsZero())
// Ensure compute transaction service latency is within a reasonable threshold.
require.True(t, tc.phaseTimes.GetTransactionServiceLatency() < latencyThreshold)
currentTestCaseIdx++
}
}

0 comments on commit f8f7459

Please sign in to comment.