Skip to content

Commit

Permalink
Merge pull request #109707 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1-109696

release-23.1: sql: optimize persistedsqlstats flush size check
  • Loading branch information
j82w authored Aug 31, 2023
2 parents d4c6cd3 + 51c8444 commit fc3fe21
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 17 deletions.
12 changes: 12 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,15 @@ var sqlStatsLimitTableSizeEnabled = settings.RegisterBoolSetting(
"to grow past sql.stats.persisted_rows.max",
true,
)

// sqlStatsLimitTableCheckInterval is the cluster setting that controls the
// sql stats system tables to grow past the number of rows set by
// sql.stats.persisted_row.max.
var sqlStatsLimitTableCheckInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"sql.stats.limit_table_size.check_interval",
"controls what interval the check is done on if the statement and "+
"transaction statistics tables have grown past sql.stats.persisted_rows.max",
1*time.Hour,
settings.NonNegativeDuration,
)
43 changes: 34 additions & 9 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ package persistedsqlstats
import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand Down Expand Up @@ -104,28 +106,51 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) {
}

func (s *PersistedSQLStats) StmtsLimitSizeReached(ctx context.Context) (bool, error) {
maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.SQLStats.GetClusterSettings().SV))
// Doing a count check on every flush for every node adds a lot of overhead.
// To reduce the overhead only do the check once an hour by default.
intervalToCheck := sqlStatsLimitTableCheckInterval.Get(&s.cfg.Settings.SV)
if !s.lastSizeCheck.IsZero() && s.lastSizeCheck.Add(intervalToCheck).After(timeutil.Now()) {
log.Infof(ctx, "PersistedSQLStats.StmtsLimitSizeReached skipped with last check at: %s and check interval: %s", s.lastSizeCheck, intervalToCheck)
return false, nil
}

maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.cfg.Settings.SV))

// The statistics table is split into 8 shards. Instead of counting all the
// rows across all the shards the count can be limited to a single shard.
// Then check the size off that one shard. This reduces the risk of causing
// contention or serialization issues. The cleanup is done by the shard, so
// it should prevent the data from being skewed to a single shard.
randomShard := rand.Intn(systemschema.SQLStatsHashShardBucketCount)
readStmt := fmt.Sprintf(`SELECT count(*)
FROM system.statement_statistics
%s
WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 = $1
`, s.cfg.Knobs.GetAOSTClause())

readStmt := `
SELECT
count(*)
FROM
system.statement_statistics
`
readStmt += s.cfg.Knobs.GetAOSTClause()
row, err := s.cfg.DB.Executor().QueryRowEx(
ctx,
"fetch-stmt-count",
nil,
sessiondata.NodeUserSessionDataOverride,
readStmt,
randomShard,
)

if err != nil {
return false, err
}
actualSize := float64(tree.MustBeDInt(row[0]))
return actualSize > (maxPersistedRows * 1.5), nil
maxPersistedRowsByShard := maxPersistedRows / systemschema.SQLStatsHashShardBucketCount
isSizeLimitReached := actualSize > (maxPersistedRowsByShard * 1.5)
// If the table is over the limit do the check for every flush. This allows
// the flush to start again as soon as the data is within limits instead of
// needing to wait an hour.
if !isSizeLimitReached {
s.lastSizeCheck = timeutil.Now()
}

return isSizeLimitReached, nil
}

func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) {
Expand Down
61 changes: 53 additions & 8 deletions pkg/sql/sqlstats/persistedsqlstats/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,18 @@ func TestSQLStatsPersistedLimitReached(t *testing.T) {
return nil
})

// Set table size check interval to 1 second.
sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.limit_table_size.check_interval='0.00001ms'")
testutils.SucceedsSoon(t, func() error {
var appliedSetting string
row := sqlConn.QueryRow(t, "SHOW CLUSTER SETTING sql.stats.limit_table_size.check_interval")
row.Scan(&appliedSetting)
if appliedSetting != "00:00:00" {
return errors.Newf("waiting for sql.stats.limit_table_size.check_interval to be applied: %s", appliedSetting)
}
return nil
})

sqlConn.Exec(t, "SELECT 1, 2, 3, 4")
sqlConn.Exec(t, "SELECT 1, 2, 3, 4, 5")
sqlConn.Exec(t, "SELECT 1, 2, 3, 4, 6, 7")
Expand Down Expand Up @@ -556,6 +568,11 @@ func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) {
waitForFollowerReadTimestamp(t, sqlConn)
pss := s.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats)

// It should be false since nothing has flushed. The table will be empty.
limitReached, err := pss.StmtsLimitSizeReached(ctx)
require.NoError(t, err)
require.False(t, limitReached)

const minNumExpectedStmts = int64(3)
// Maximum number of persisted rows less than minNumExpectedStmts/1.5
const maxNumPersistedRows = 1
Expand All @@ -581,22 +598,50 @@ func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) {
return nil
})

// We need SucceedsSoon here for the follower read timestamp to catch up
// enough for this state to be reached.
testutils.SucceedsSoon(t, func() error {
row := sqlConn.QueryRow(t, "SELECT count_rows() FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()")
var rowCount int
row.Scan(&rowCount)
if rowCount < 3 {
return errors.Newf("waiting for AOST query to return results")
}
return nil
})

// It should still return false because it only checks once an hour by default
// unless the previous run was over the limit.
limitReached, err = pss.StmtsLimitSizeReached(ctx)
require.NoError(t, err)
require.False(t, limitReached)

// Set table size check interval to 1 second.
sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.limit_table_size.check_interval='1s'")
testutils.SucceedsSoon(t, func() error {
var appliedSetting string
row := sqlConn.QueryRow(t, "SHOW CLUSTER SETTING sql.stats.limit_table_size.check_interval")
row.Scan(&appliedSetting)
if appliedSetting != "00:00:01" {
return errors.Newf("waiting for sql.stats.limit_table_size.check_interval to be applied: %s", appliedSetting)
}
return nil
})

// Begin a transaction.
sqlConn.Exec(t, "BEGIN")
// Lock the table. Create a state of contention.
sqlConn.Exec(t, "SELECT * FROM system.statement_statistics FOR UPDATE")

// Ensure that we can read from the table despite it being locked, due to the follower read (AOST).
// Expect that the number of statements in the table exceeds sql.stats.persisted_rows.max * 1.5
// (meaning that the limit will be reached) and no error. We need SucceedsSoon here for the follower
// read timestamp to catch up enough for this state to be reached.
testutils.SucceedsSoon(t, func() error {
// (meaning that the limit will be reached) and no error. Loop to make sure that
// checking it multiple times still returns the correct value.
for i := 0; i < 3; i++ {
limitReached, err := pss.StmtsLimitSizeReached(ctx)
if limitReached != true {
return errors.New("waiting for limit reached to be true")
}
return err
})
require.NoError(t, err)
require.True(t, limitReached)
}

// Close the transaction.
sqlConn.Exec(t, "COMMIT")
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type PersistedSQLStats struct {
setDraining sync.Once
// tasksDoneWG is used to wait for all background tasks to finish.
tasksDoneWG sync.WaitGroup

// The last time the size was checked before doing a flush.
lastSizeCheck time.Time
}

var _ sqlstats.Provider = &PersistedSQLStats{}
Expand Down

0 comments on commit fc3fe21

Please sign in to comment.