Skip to content

Commit

Permalink
sql: optimize persistedsqlstats flush size check
Browse files Browse the repository at this point in the history
Problem:
The `persistedsqlstats` size check to make sure the table is not 1.5x
the max size is done on every flush which is done on every node every
10 minutes by default. This can cause serialization issues as it is over
the entire table. The check is unnecessary most of the time, because it
should only fail if the compaction job is failing.

Solution:
1. Reduce the check interval to only be done once an hour by default,
   and make it configurable.
2. The system table is split in to 8 shards. Instead of checking the
   entire table count limit it to only one shard. This reduces the scope
   of the check and reduces the chance of serialization issues.

Fixes: #109619

Release note (sql change): The persistedsqlstats table max size check
is now done once an hour instead of every 10 minutes. This reduces the
risk of serialization errors on the statistics tables.
  • Loading branch information
j82w committed Aug 29, 2023
1 parent e6e624c commit 51c8444
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 17 deletions.
12 changes: 12 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,15 @@ var sqlStatsLimitTableSizeEnabled = settings.RegisterBoolSetting(
"to grow past sql.stats.persisted_rows.max",
true,
)

// sqlStatsLimitTableCheckInterval is the cluster setting that controls the
// sql stats system tables to grow past the number of rows set by
// sql.stats.persisted_row.max.
var sqlStatsLimitTableCheckInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"sql.stats.limit_table_size.check_interval",
"controls what interval the check is done on if the statement and "+
"transaction statistics tables have grown past sql.stats.persisted_rows.max",
1*time.Hour,
settings.NonNegativeDuration,
)
43 changes: 34 additions & 9 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ package persistedsqlstats
import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand Down Expand Up @@ -104,28 +106,51 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) {
}

func (s *PersistedSQLStats) StmtsLimitSizeReached(ctx context.Context) (bool, error) {
maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.SQLStats.GetClusterSettings().SV))
// Doing a count check on every flush for every node adds a lot of overhead.
// To reduce the overhead only do the check once an hour by default.
intervalToCheck := sqlStatsLimitTableCheckInterval.Get(&s.cfg.Settings.SV)
if !s.lastSizeCheck.IsZero() && s.lastSizeCheck.Add(intervalToCheck).After(timeutil.Now()) {
log.Infof(ctx, "PersistedSQLStats.StmtsLimitSizeReached skipped with last check at: %s and check interval: %s", s.lastSizeCheck, intervalToCheck)
return false, nil
}

maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.cfg.Settings.SV))

// The statistics table is split into 8 shards. Instead of counting all the
// rows across all the shards the count can be limited to a single shard.
// Then check the size off that one shard. This reduces the risk of causing
// contention or serialization issues. The cleanup is done by the shard, so
// it should prevent the data from being skewed to a single shard.
randomShard := rand.Intn(systemschema.SQLStatsHashShardBucketCount)
readStmt := fmt.Sprintf(`SELECT count(*)
FROM system.statement_statistics
%s
WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 = $1
`, s.cfg.Knobs.GetAOSTClause())

readStmt := `
SELECT
count(*)
FROM
system.statement_statistics
`
readStmt += s.cfg.Knobs.GetAOSTClause()
row, err := s.cfg.DB.Executor().QueryRowEx(
ctx,
"fetch-stmt-count",
nil,
sessiondata.NodeUserSessionDataOverride,
readStmt,
randomShard,
)

if err != nil {
return false, err
}
actualSize := float64(tree.MustBeDInt(row[0]))
return actualSize > (maxPersistedRows * 1.5), nil
maxPersistedRowsByShard := maxPersistedRows / systemschema.SQLStatsHashShardBucketCount
isSizeLimitReached := actualSize > (maxPersistedRowsByShard * 1.5)
// If the table is over the limit do the check for every flush. This allows
// the flush to start again as soon as the data is within limits instead of
// needing to wait an hour.
if !isSizeLimitReached {
s.lastSizeCheck = timeutil.Now()
}

return isSizeLimitReached, nil
}

func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) {
Expand Down
61 changes: 53 additions & 8 deletions pkg/sql/sqlstats/persistedsqlstats/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,18 @@ func TestSQLStatsPersistedLimitReached(t *testing.T) {
return nil
})

// Set table size check interval to 1 second.
sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.limit_table_size.check_interval='0.00001ms'")
testutils.SucceedsSoon(t, func() error {
var appliedSetting string
row := sqlConn.QueryRow(t, "SHOW CLUSTER SETTING sql.stats.limit_table_size.check_interval")
row.Scan(&appliedSetting)
if appliedSetting != "00:00:00" {
return errors.Newf("waiting for sql.stats.limit_table_size.check_interval to be applied: %s", appliedSetting)
}
return nil
})

sqlConn.Exec(t, "SELECT 1, 2, 3, 4")
sqlConn.Exec(t, "SELECT 1, 2, 3, 4, 5")
sqlConn.Exec(t, "SELECT 1, 2, 3, 4, 6, 7")
Expand Down Expand Up @@ -556,6 +568,11 @@ func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) {
waitForFollowerReadTimestamp(t, sqlConn)
pss := s.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats)

// It should be false since nothing has flushed. The table will be empty.
limitReached, err := pss.StmtsLimitSizeReached(ctx)
require.NoError(t, err)
require.False(t, limitReached)

const minNumExpectedStmts = int64(3)
// Maximum number of persisted rows less than minNumExpectedStmts/1.5
const maxNumPersistedRows = 1
Expand All @@ -581,22 +598,50 @@ func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) {
return nil
})

// We need SucceedsSoon here for the follower read timestamp to catch up
// enough for this state to be reached.
testutils.SucceedsSoon(t, func() error {
row := sqlConn.QueryRow(t, "SELECT count_rows() FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()")
var rowCount int
row.Scan(&rowCount)
if rowCount < 3 {
return errors.Newf("waiting for AOST query to return results")
}
return nil
})

// It should still return false because it only checks once an hour by default
// unless the previous run was over the limit.
limitReached, err = pss.StmtsLimitSizeReached(ctx)
require.NoError(t, err)
require.False(t, limitReached)

// Set table size check interval to 1 second.
sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.limit_table_size.check_interval='1s'")
testutils.SucceedsSoon(t, func() error {
var appliedSetting string
row := sqlConn.QueryRow(t, "SHOW CLUSTER SETTING sql.stats.limit_table_size.check_interval")
row.Scan(&appliedSetting)
if appliedSetting != "00:00:01" {
return errors.Newf("waiting for sql.stats.limit_table_size.check_interval to be applied: %s", appliedSetting)
}
return nil
})

// Begin a transaction.
sqlConn.Exec(t, "BEGIN")
// Lock the table. Create a state of contention.
sqlConn.Exec(t, "SELECT * FROM system.statement_statistics FOR UPDATE")

// Ensure that we can read from the table despite it being locked, due to the follower read (AOST).
// Expect that the number of statements in the table exceeds sql.stats.persisted_rows.max * 1.5
// (meaning that the limit will be reached) and no error. We need SucceedsSoon here for the follower
// read timestamp to catch up enough for this state to be reached.
testutils.SucceedsSoon(t, func() error {
// (meaning that the limit will be reached) and no error. Loop to make sure that
// checking it multiple times still returns the correct value.
for i := 0; i < 3; i++ {
limitReached, err := pss.StmtsLimitSizeReached(ctx)
if limitReached != true {
return errors.New("waiting for limit reached to be true")
}
return err
})
require.NoError(t, err)
require.True(t, limitReached)
}

// Close the transaction.
sqlConn.Exec(t, "COMMIT")
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type PersistedSQLStats struct {
setDraining sync.Once
// tasksDoneWG is used to wait for all background tasks to finish.
tasksDoneWG sync.WaitGroup

// The last time the size was checked before doing a flush.
lastSizeCheck time.Time
}

var _ sqlstats.Provider = &PersistedSQLStats{}
Expand Down

0 comments on commit 51c8444

Please sign in to comment.