Skip to content

Commit

Permalink
sqlstats: always enable tracing first time fingerprint is seen
Browse files Browse the repository at this point in the history
Fixes: #89185

The first time a fingerprint is seen tracing should be enabled.
This currently is broken if
sql.metrics.statement_details.plan_collection.enabled is set to false.
This can cause crdb_internal.transaction_contention_events to be
empty because tracing was never enabled so the contention event was
never recorded.

To properly fix this a new value needs to be returned on ShouldSample
to tell if it is the first time a fingerprint is seen. This will remove
the dependency on plan_collection feature switch.

Release justification: Bug fixes and low-risk updates to new
functionality.

Release note: none
  • Loading branch information
j82w committed Sep 27, 2022
1 parent 4192db2 commit 4f1ad05
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 59 deletions.
117 changes: 117 additions & 0 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"net/url"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -937,6 +938,122 @@ func TestIsAtLeastVersion(t *testing.T) {
}
}

func TestTxnContentionEventsTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Start the cluster. (One node is sufficient; the outliers system
// is currently in-memory only.)
ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
args := base.TestClusterArgs{ServerArgs: base.TestServerArgs{Settings: settings}}
tc := testcluster.StartTestCluster(t, 1, args)
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

sqlDB.Exec(
t,
`SET CLUSTER SETTING sql.metrics.statement_details.plan_collection.enabled = false;`)

// Reduce the resolution interval to speed up the test.
sqlDB.Exec(
t,
`SET CLUSTER SETTING sql.contention.event_store.resolution_interval = '100ms'`)

sqlDB.Exec(t, "CREATE TABLE t (id string, s string);")

causeContention := func(insertValue string, updateValue string) {
// Create a new connection, and then in a go routine have it start a
// transaction, update a row, sleep for a time, and then complete the
// transaction. With original connection attempt to update the same row
// being updated concurrently in the separate go routine, this will be
// blocked until the original transaction completes.
var wgTxnStarted sync.WaitGroup
wgTxnStarted.Add(1)

// Lock to wait for the txn to complete to avoid the test finishing
// before the txn is committed.
var wgTxnDone sync.WaitGroup
wgTxnDone.Add(1)

go func() {
defer wgTxnDone.Done()
tx, errTxn := conn.BeginTx(ctx, &gosql.TxOptions{})
require.NoError(t, errTxn)
_, errTxn = tx.ExecContext(ctx,
"INSERT INTO t (id, s) VALUES ('test', $1);",
insertValue)
require.NoError(t, errTxn)
wgTxnStarted.Done()
_, errTxn = tx.ExecContext(ctx, "select pg_sleep(.5);")
require.NoError(t, errTxn)
errTxn = tx.Commit()
require.NoError(t, errTxn)
}()

start := timeutil.Now()

// Need to wait for the txn to start to ensure lock contention.
wgTxnStarted.Wait()
// This will be blocked until the updateRowWithDelay finishes.
_, errUpdate := conn.ExecContext(
ctx, "UPDATE t SET s = $1 where id = 'test';", updateValue)
require.NoError(t, errUpdate)
end := timeutil.Now()
require.GreaterOrEqual(t, end.Sub(start), 500*time.Millisecond)

wgTxnDone.Wait()
}

causeContention("insert1", "update1")
causeContention("insert2", "update2")

rowCount := 0

// Verify the table content is valid.
// Filter the fingerprint id to only be the query in the test.
// This ensures the event is the one caused in the test and not by some other
// internal workflow.
testutils.SucceedsWithin(t, func() error {
rows, errVerify := conn.QueryContext(ctx, `SELECT
blocking_txn_id,
waiting_txn_id
FROM crdb_internal.transaction_contention_events tce
inner join (
select
transaction_fingerprint_id,
metadata->'query' as query
from crdb_internal.statement_statistics t
where metadata->>'query' like 'UPDATE t SET %') stats
on stats.transaction_fingerprint_id = tce.waiting_txn_fingerprint_id`)
if errVerify != nil {
return errVerify
}

for rows.Next() {
rowCount++

var blocking, waiting string
errVerify = rows.Scan(&blocking, &waiting)
if errVerify != nil {
return errVerify
}

}

if rowCount < 1 {
return fmt.Errorf("transaction_contention_events did not return any rows")
}
return nil
}, 5*time.Second)

require.LessOrEqual(t, rowCount, 2, "transaction_contention_events "+
"found 3 rows. It should only record first, but there is a chance based "+
"on sampling to get 2 rows.")

}

// This test doesn't care about the contents of these virtual tables;
// other places (the insights integration tests) do that for us.
// What we look at here is the role-option-checking we need to make sure
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ type instrumentationHelper struct {

queryLevelStatsWithErr *execstats.QueryLevelStatsWithErr

// If savePlanForStats is true, the explainPlan will be collected and returned
// via PlanForStats().
// If savePlanForStats is true and the explainPlan was collected, the
// serialized version of the plan will be returned via PlanForStats().
savePlanForStats bool

explainPlan *explain.Plan
Expand Down Expand Up @@ -254,8 +254,8 @@ func (ih *instrumentationHelper) Setup(
ih.stmtDiagnosticsRecorder = stmtDiagnosticsRecorder
ih.withStatementTrace = cfg.TestingKnobs.WithStatementTrace

ih.savePlanForStats =
statsCollector.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, p.SessionData().Database)
var previouslySampled bool
previouslySampled, ih.savePlanForStats = statsCollector.ShouldSample(fingerprint, implicitTxn, p.SessionData().Database)

defer func() {
if ih.ShouldBuildExplainPlan() {
Expand Down Expand Up @@ -286,7 +286,7 @@ func (ih *instrumentationHelper) Setup(

ih.collectExecStats = collectTxnExecStats

if !collectTxnExecStats && ih.savePlanForStats {
if !collectTxnExecStats && !previouslySampled {
// We don't collect the execution stats for statements in this txn, but
// this is the first time we see this statement ever, so we'll collect
// its execution stats anyway (unless the user disabled txn stats
Expand Down Expand Up @@ -437,7 +437,7 @@ func (ih *instrumentationHelper) ShouldUseJobForCreateStats() bool {
// ShouldBuildExplainPlan returns true if we should build an explain plan and
// call RecordExplainPlan.
func (ih *instrumentationHelper) ShouldBuildExplainPlan() bool {
return ih.collectBundle || ih.collectExecStats || ih.savePlanForStats ||
return ih.collectBundle || ih.collectExecStats ||
ih.outputMode == explainAnalyzePlanOutput ||
ih.outputMode == explainAnalyzeDistSQLOutput
}
Expand Down Expand Up @@ -470,7 +470,7 @@ func (ih *instrumentationHelper) RecordPlanInfo(
// collected (nil otherwise). It should be called after RecordExplainPlan() and
// RecordPlanInfo().
func (ih *instrumentationHelper) PlanForStats(ctx context.Context) *roachpb.ExplainTreePlanNode {
if ih.explainPlan == nil {
if ih.explainPlan == nil || !ih.savePlanForStats {
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/sqlstats/persistedsqlstats/appStats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func (s *ApplicationStats) RecordStatement(
return fingerprintID, err
}

// ShouldSaveLogicalPlanDesc implements sqlstats.ApplicationStats interface.
func (s *ApplicationStats) ShouldSaveLogicalPlanDesc(
// ShouldSample implements sqlstats.ApplicationStats interface.
func (s *ApplicationStats) ShouldSample(
fingerprint string, implicitTxn bool, database string,
) bool {
return s.ApplicationStats.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, database)
) (bool, bool) {
return s.ApplicationStats.ShouldSample(fingerprint, implicitTxn, database)
}

// RecordTransaction implements sqlstats.ApplicationStats interface and saves
Expand Down
15 changes: 7 additions & 8 deletions pkg/sql/sqlstats/persistedsqlstats/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const (
// system table.
// - set-time: this changes the clock time perceived by SQL Stats subsystem.
// This is useful when unit tests need to manipulate times.
// - should-sample-logical-plan: this checks if the given tuple of
// - should-sample: this checks if the given tuple of
// (db, implicitTxn, fingerprint) will be sampled
// next time it is being executed.
func TestSQLStatsDataDriven(t *testing.T) {
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestSQLStatsDataDriven(t *testing.T) {
}
stubTime.setTime(tm)
return stubTime.Now().String()
case "should-sample-logical-plan":
case "should-sample":
mustHaveArgsOrFatal(t, d, fingerprintArgs, implicitTxnArgs, dbNameArgs)

var dbName string
Expand All @@ -145,13 +145,12 @@ func TestSQLStatsDataDriven(t *testing.T) {
// them.
fingerprint = strings.Replace(fingerprint, "%", " ", -1)

return fmt.Sprintf("%t",
appStats.ShouldSaveLogicalPlanDesc(
fingerprint,
implicitTxn,
dbName,
),
previouslySampled, savePlanForStats := appStats.ShouldSample(
fingerprint,
implicitTxn,
dbName,
)
return fmt.Sprintf("%t, %t", previouslySampled, savePlanForStats)
}

return ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ set-time time=2021-09-20T15:00:00Z

# Logical plan should be sampled here, since we have not collected logical plan
# at all.
should-sample-logical-plan db=defaultdb implicitTxn=true fingerprint=SELECT%_
should-sample db=defaultdb implicitTxn=true fingerprint=SELECT%_
----
true
false, true

# Execute the query to trigger a collection of logical plan.
# (db_name=defaultdb implicitTxn=true fingerprint=SELECT _)
Expand All @@ -26,15 +26,15 @@ SELECT 1

# Ensure that if a query is to be subsequently executed, it will not cause
# logical plan sampling.
should-sample-logical-plan db=defaultdb implicitTxn=true fingerprint=SELECT%_
should-sample db=defaultdb implicitTxn=true fingerprint=SELECT%_
----
false
true, false

# However, if we are to execute the same statement but under explicit
# transaction, the plan will still need to be sampled.
should-sample-logical-plan db=defaultdb implicitTxn=false fingerprint=SELECT%_
should-sample db=defaultdb implicitTxn=false fingerprint=SELECT%_
----
true
false, true

# Execute the statement under explicit transaction.
# (db_name=defaultdb implicitTxn=false fingerprint=SELECT _)
Expand All @@ -46,32 +46,32 @@ COMMIT

# Ensure that the subsequent execution of the query will not cause logical plan
# collection.
should-sample-logical-plan db=defaultdb implicitTxn=false fingerprint=SELECT%_
should-sample db=defaultdb implicitTxn=false fingerprint=SELECT%_
----
false
true, false

# Set the time to the future and ensure we will resample the logical plan.
set-time time=2021-09-20T15:05:01Z
----
2021-09-20 15:05:01 +0000 UTC


should-sample-logical-plan db=defaultdb implicitTxn=true fingerprint=SELECT%_
should-sample db=defaultdb implicitTxn=true fingerprint=SELECT%_
----
true
true, true

# implicit txn
exec-sql
SELECT 1
----

should-sample-logical-plan db=defaultdb implicitTxn=true fingerprint=SELECT%_
should-sample db=defaultdb implicitTxn=true fingerprint=SELECT%_
----
false
true, true

should-sample-logical-plan db=defaultdb implicitTxn=false fingerprint=SELECT%_
should-sample db=defaultdb implicitTxn=false fingerprint=SELECT%_
----
true
true, true

# explicit txn
exec-sql
Expand All @@ -80,6 +80,6 @@ SELECT 1
COMMIT
----

should-sample-logical-plan db=defaultdb implicitTxn=false fingerprint=SELECT%_
should-sample db=defaultdb implicitTxn=false fingerprint=SELECT%_
----
false
true, true
17 changes: 10 additions & 7 deletions pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,21 @@ func (s *StatsCollector) EndTransaction(
s.flushTarget = nil
}

// ShouldSaveLogicalPlanDesc implements sqlstats.StatsCollector interface.
func (s *StatsCollector) ShouldSaveLogicalPlanDesc(
// ShouldSample implements sqlstats.StatsCollector interface.
func (s *StatsCollector) ShouldSample(
fingerprint string, implicitTxn bool, database string,
) bool {
foundInFlushTarget := true
) (previouslySampled bool, savePlanForStats bool) {
sampledInFlushTarget := false
savePlanForStatsInFlushTarget := true

if s.flushTarget != nil {
foundInFlushTarget = s.flushTarget.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, database)
sampledInFlushTarget, savePlanForStatsInFlushTarget = s.flushTarget.ShouldSample(fingerprint, implicitTxn, database)
}

return foundInFlushTarget &&
s.ApplicationStats.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, database)
sampledInAppStats, savePlanForStatsInAppStats := s.ApplicationStats.ShouldSample(fingerprint, implicitTxn, database)
previouslySampled = sampledInFlushTarget || sampledInAppStats
savePlanForStats = savePlanForStatsInFlushTarget && savePlanForStatsInAppStats
return previouslySampled, savePlanForStats
}

// UpgradeImplicitTxn implements sqlstats.StatsCollector interface.
Expand Down
15 changes: 6 additions & 9 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (s *Container) MergeApplicationStatementStats(
defer stmtStats.mu.Unlock()

stmtStats.mergeStatsLocked(statistics)
planLastSampled := s.getLogicalPlanLastSampled(key.sampledPlanKey)
planLastSampled, _ := s.getLogicalPlanLastSampled(key.sampledPlanKey)
if planLastSampled.Before(stmtStats.mu.data.SensitiveInfo.MostRecentPlanTimestamp) {
s.setLogicalPlanLastSampled(key.sampledPlanKey, stmtStats.mu.data.SensitiveInfo.MostRecentPlanTimestamp)
}
Expand Down Expand Up @@ -922,16 +922,13 @@ func (s *transactionCounts) recordTransactionCounts(
}
}

func (s *Container) getLogicalPlanLastSampled(key sampledPlanKey) time.Time {
func (s *Container) getLogicalPlanLastSampled(
key sampledPlanKey,
) (lastSampled time.Time, found bool) {
s.mu.Lock()
defer s.mu.Unlock()

lastSampled, found := s.mu.sampledPlanMetadataCache[key]
if !found {
return time.Time{}
}

return lastSampled
lastSampled, found = s.mu.sampledPlanMetadataCache[key]
return lastSampled, found
}

func (s *Container) setLogicalPlanLastSampled(key sampledPlanKey, time time.Time) {
Expand Down
11 changes: 6 additions & 5 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,17 @@ func (s *Container) RecordStatementExecStats(
return nil
}

// ShouldSaveLogicalPlanDesc implements sqlstats.Writer interface.
func (s *Container) ShouldSaveLogicalPlanDesc(
// ShouldSample implements sqlstats.Writer interface.
func (s *Container) ShouldSample(
fingerprint string, implicitTxn bool, database string,
) bool {
lastSampled := s.getLogicalPlanLastSampled(sampledPlanKey{
) (previouslySampled, savePlanForStats bool) {
lastSampled, previouslySampled := s.getLogicalPlanLastSampled(sampledPlanKey{
stmtNoConstants: fingerprint,
implicitTxn: implicitTxn,
database: database,
})
return s.shouldSaveLogicalPlanDescription(lastSampled)
savePlanForStats = s.shouldSaveLogicalPlanDescription(lastSampled)
return previouslySampled, savePlanForStats
}

// RecordTransaction implements sqlstats.Writer interface and saves
Expand Down
Loading

0 comments on commit 4f1ad05

Please sign in to comment.