Skip to content

Commit

Permalink
sqlstats: reuse the temporary statement stats container
Browse files Browse the repository at this point in the history
This commit cleans up the confusing swapping and usages of 2 fields on
the StatsCollector, `ApplicationStats` and `flushTarget`.

Originally the field `flushTarget`was only used for explicit transactions.
In order to provide each statement with the correct txn fingerprint id,
statements for explicit txns were stored in a temp stats container and
flushed to the app container at the end of transaction execution.
The assumption for implicit transactions was that they only had 1
statement and so such statements could be  written directly  to the parent
`ApplicationStats` container rather than waiting for the entire transaction
to finish.

Now that we create temporary containers for both explicit and implicit
txns we should reuse the temporary container between transactions instead
of discarding and allocating a new one for each.

Summary:
- `ApplicationStats` field is on sslocal.StatsCollector is renamed
to `currentTransactionStatementStats`
- `FlushTarget` is always defined for a stats collector. It represents
the current application's sql stats
- Instead of allocating a new container for each new txn, we'll clear and
reuse `currentTxnStatementStats` between transactions
- StatsCollector no longer  implementats the `sqlstats.ApplicationStats`
interface. There isn't a need for StatsCollector to be used as an
interface and doing so  makes working with the various containers
within StatsCollector more difficult.

An exception is made for stats collectors belonging to an internal executor
for an outer transaction. Such executors do not start or end the transaction,
and so no temporary containers will be used in this case.

Fixes: #94650
Release note: None
  • Loading branch information
xinhaoz committed Jun 3, 2024
1 parent 04ecf34 commit 630d77b
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 84 deletions.
4 changes: 4 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,7 @@ func (s *Server) newConnExecutor(
ex.server.insights.Writer(ex.sessionData().Internal),
ex.phaseTimes,
s.sqlStats.GetCounters(),
ex.extraTxnState.fromOuterTxn,
s.cfg.SQLStatsTestingKnobs,
)
ex.dataMutatorIterator.onApplicationNameChange = func(newName string) {
Expand Down Expand Up @@ -1259,6 +1260,9 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
log.Warningf(ctx, "error closing cursors: %v", err)
}

// Free any memory used by the stats collector.
ex.statsCollector.Free(ctx)

var payloadErr error
if closeType == normalClose {
// We'll cleanup the SQL txn by creating a non-retriable (commit:true) event.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/sslocal/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/appstatspb",
"//pkg/sql/execstats",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sessionphase",
"//pkg/sql/sqlstats",
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlstats/sslocal/sql_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ func TestExplicitTxnFingerprintAccounting(t *testing.T) {
insightsProvider.Writer(false /* internal */),
sessionphase.NewTimes(),
sqlStats.GetCounters(),
false,
nil, /* knobs */
)

Expand Down Expand Up @@ -591,6 +592,7 @@ func TestAssociatingStmtStatsWithTxnFingerprint(t *testing.T) {
insightsProvider.Writer(false /* internal */),
sessionphase.NewTimes(),
sqlStats.GetCounters(),
false,
nil, /* knobs */
)

Expand Down
109 changes: 76 additions & 33 deletions pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
Expand All @@ -27,7 +28,13 @@ import (
// StatsCollector is used to collect statistics for transactions and
// statements for the entire lifetime of a session.
type StatsCollector struct {
sqlstats.ApplicationStats

// currentTransactionStatementStats contains the current transaction's statement
// statistics. They will be flushed to flushTarget when the transaction is done
// so that we can include the transaction fingerprint ID as part of the
// statement's key. This container is local per stats collector and
// is cleared for reuse after every transaction.
currentTransactionStatementStats sqlstats.ApplicationStats

// stmtFingerprintID is the fingerprint ID of the current statement we are
// recording. Note that we don't observe sql stats for all statements (e.g. COMMIT).
Expand All @@ -53,32 +60,49 @@ type StatsCollector struct {
// statement insights.
sendInsights bool

// flushTarget is the sql stats container for the current application.
// This is the target where the statement stats are flushed to upon
// transaction completion. Note that these are the global stats for the
// application.
flushTarget sqlstats.ApplicationStats

// uniqueServerCounts is a pointer to the statement and transaction
// fingerprint counters tracked per server.
uniqueServerCounts *ssmemstorage.SQLStatsAtomicCounters

st *cluster.Settings
knobs *sqlstats.TestingKnobs
}

var _ sqlstats.ApplicationStats = &StatsCollector{}

// NewStatsCollector returns an instance of StatsCollector.
func NewStatsCollector(
st *cluster.Settings,
appStats sqlstats.ApplicationStats,
insights insights.Writer,
phaseTime *sessionphase.Times,
uniqueServerCounts *ssmemstorage.SQLStatsAtomicCounters,
fromOuterTxn bool,
knobs *sqlstats.TestingKnobs,
) *StatsCollector {
// See #124935 for more details. If fromOuterTxn is true, the
// executor owning the stats collector is not responsible for
// starting or committing the transaction. Since the statements
// are merged into flushTarget on EndTransaction, in this case the
// container would never be merged into the flushTarget. Instead
// we'll write directly to the flushTarget when we're collecting
// stats for a conn exec belonging to an outer transaction.
currentTransactionStatementStats := appStats
if !fromOuterTxn {
currentTransactionStatementStats = appStats.NewApplicationStatsWithInheritedOptions()
}
return &StatsCollector{
ApplicationStats: appStats,
insightsWriter: insights,
phaseTimes: phaseTime.Clone(),
uniqueServerCounts: uniqueServerCounts,
st: st,
knobs: knobs,
flushTarget: appStats,
currentTransactionStatementStats: currentTransactionStatementStats,
insightsWriter: insights,
phaseTimes: phaseTime.Clone(),
uniqueServerCounts: uniqueServerCounts,
st: st,
knobs: knobs,
}
}

Expand All @@ -103,7 +127,7 @@ func (s *StatsCollector) PreviousPhaseTimes() *sessionphase.Times {
return s.previousPhaseTimes
}

// Reset resets the StatsCollector with a new ApplicationStats and a new copy
// Reset resets the StatsCollector with a new flushTarget and a new copy
// of the sessionphase.Times.
func (s *StatsCollector) Reset(appStats sqlstats.ApplicationStats, phaseTime *sessionphase.Times) {
previousPhaseTime := s.phaseTimes
Expand All @@ -114,12 +138,20 @@ func (s *StatsCollector) Reset(appStats sqlstats.ApplicationStats, phaseTime *se
s.stmtFingerprintID = 0
}

// Free frees any local memory used by the stats collector.
func (s *StatsCollector) Free(ctx context.Context) {
// For stats collectors for executors with outer transactions,
// the currentTransactionStatementStats is the flush target.
// We should make sure we're never freeing the flush target,
// since that container exists beyond the stats collector.
if s.currentTransactionStatementStats != s.flushTarget {
s.currentTransactionStatementStats.Free(ctx)
}
}

// StartTransaction sets up the StatsCollector for a new transaction.
// The current application stats are reset for the new transaction.
func (s *StatsCollector) StartTransaction() {
s.sendInsights = s.shouldObserveInsights()
s.flushTarget = s.ApplicationStats
s.ApplicationStats = s.flushTarget.NewApplicationStatsWithInheritedOptions()
}

// EndTransaction informs the StatsCollector that the current txn has
Expand All @@ -140,22 +172,15 @@ func (s *StatsCollector) EndTransaction(

var discardedStats uint64
discardedStats += s.flushTarget.MergeApplicationStatementStats(
ctx, s.ApplicationStats, transactionFingerprintID,
)

discardedStats += s.flushTarget.MergeApplicationTransactionStats(
ctx,
s.ApplicationStats,
ctx, s.currentTransactionStatementStats, transactionFingerprintID,
)

// Avoid taking locks if no stats are discarded.
if discardedStats > 0 {
s.flushTarget.MaybeLogDiscardMessage(ctx)
}

s.ApplicationStats.Free(ctx)
s.ApplicationStats = s.flushTarget
s.flushTarget = nil
s.currentTransactionStatementStats.Clear(ctx)
}

// ShouldSample returns two booleans, the first one indicates whether we
Expand All @@ -165,24 +190,15 @@ func (s *StatsCollector) EndTransaction(
func (s *StatsCollector) ShouldSample(
fingerprint string, implicitTxn bool, database string,
) (previouslySampled bool, savePlanForStats bool) {
sampledInFlushTarget := false
savePlanForStatsInFlushTarget := true

if s.flushTarget != nil {
sampledInFlushTarget, savePlanForStatsInFlushTarget = s.flushTarget.ShouldSample(fingerprint, implicitTxn, database)
}

sampledInAppStats, savePlanForStatsInAppStats := s.ApplicationStats.ShouldSample(fingerprint, implicitTxn, database)
previouslySampled = sampledInFlushTarget || sampledInAppStats
savePlanForStats = savePlanForStatsInFlushTarget && savePlanForStatsInAppStats
return previouslySampled, savePlanForStats
return s.flushTarget.ShouldSample(fingerprint, implicitTxn, database)
}

// UpgradeImplicitTxn informs the StatsCollector that the current txn has been
// upgraded to an explicit transaction, thus all previously recorded statements
// should be updated accordingly.
func (s *StatsCollector) UpgradeImplicitTxn(ctx context.Context) error {
err := s.ApplicationStats.IterateStatementStats(ctx, sqlstats.IteratorOptions{},
err := s.currentTransactionStatementStats.IterateStatementStats(ctx, sqlstats.IteratorOptions{},
func(_ context.Context, statistics *appstatspb.CollectedStatementStatistics) error {
statistics.Key.ImplicitTxn = false
return nil
Expand Down Expand Up @@ -324,3 +340,30 @@ func (s *StatsCollector) ObserveTransaction(
func (s *StatsCollector) StatementsContainerFull() bool {
return s.uniqueServerCounts.GetStatementCount() >= s.uniqueServerCounts.UniqueStmtFingerprintLimit.Get(&s.st.SV)
}

// RecordStatement records the statistics of a statement.
func (s *StatsCollector) RecordStatement(
ctx context.Context, key appstatspb.StatementStatisticsKey, value sqlstats.RecordedStmtStats,
) (appstatspb.StmtFingerprintID, error) {
return s.currentTransactionStatementStats.RecordStatement(ctx, key, value)
}

// RecordTransaction records the statistics of a transaction.
// Transaction stats are always recorded directly on the flushTarget.
func (s *StatsCollector) RecordTransaction(
ctx context.Context, key appstatspb.TransactionFingerprintID, value sqlstats.RecordedTxnStats,
) error {
return s.flushTarget.RecordTransaction(ctx, key, value)
}

func (s *StatsCollector) RecordStatementExecStats(
key appstatspb.StatementStatisticsKey, stats execstats.QueryLevelStats,
) error {
return s.currentTransactionStatementStats.RecordStatementExecStats(key, stats)
}

func (s *StatsCollector) IterateStatementStats(
ctx context.Context, opts sqlstats.IteratorOptions, f sqlstats.StatementVisitor,
) error {
return s.flushTarget.IterateStatementStats(ctx, opts, f)
}
45 changes: 2 additions & 43 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,6 @@ func (t *txnStats) sizeUnsafeLocked() int64 {
return txnStatsShallowSize + stmtFingerprintIDsSize + dataSize
}

func (t *txnStats) mergeStats(stats *appstatspb.TransactionStatistics) {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.data.Add(stats)
}

// stmtStats holds per-statement statistics.
type stmtStats struct {
// ID is the statementFingerprintID constructed using the stmtKey fields.
Expand Down Expand Up @@ -671,10 +665,8 @@ func (s *Container) Clear(ctx context.Context) {
}

func (s *Container) clearLocked(ctx context.Context) {
// freeLocked needs to be called before we clear the containers as
// it uses the size of each container to decrements counters that
// track the node-wide unique in-memory fingerprint counts for stmts
// and txns.
// We must call freeLocked before clearing the containers as freeLocked
// reads the size of each container to reset the counters.
s.freeLocked(ctx)

// Clear the map, to release the memory; make the new map somewhat already
Expand Down Expand Up @@ -755,39 +747,6 @@ func (s *Container) MergeApplicationStatementStats(
return discardedStats
}

// MergeApplicationTransactionStats implements the sqlstats.ApplicationStats interface.
func (s *Container) MergeApplicationTransactionStats(
ctx context.Context, other sqlstats.ApplicationStats,
) (discardedStats uint64) {
if err := other.IterateTransactionStats(
ctx,
sqlstats.IteratorOptions{},
func(ctx context.Context, statistics *appstatspb.CollectedTransactionStatistics) error {
txnStats, _, throttled :=
s.getStatsForTxnWithKey(
statistics.TransactionFingerprintID,
statistics.StatementFingerprintIDs,
true, /* createIfNonexistent */
)

if throttled {
discardedStats++
return nil
}

txnStats.mergeStats(&statistics.Stats)
return nil
}); err != nil {
// Calling Iterate.*Stats() function with a visitor function that does not
// return error should not cause any error.
panic(
errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error returned when iterating through application stats"),
)
}

return discardedStats
}

// Add combines one Container into another. Add manages locks on a, so taking
// a lock on a will cause a deadlock.
func (s *Container) Add(ctx context.Context, other *Container) (err error) {
Expand Down
11 changes: 3 additions & 8 deletions pkg/sql/sqlstats/ssprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,6 @@ type ApplicationStats interface {
transactionFingerprintID appstatspb.TransactionFingerprintID,
) uint64

// MergeApplicationTransactionStats merges the other application's transaction
// statistics into the current ApplicationStats. It returns how many number
// of statistics were being discarded due to memory constraint.
MergeApplicationTransactionStats(
ctx context.Context,
other ApplicationStats,
) uint64

// MaybeLogDiscardMessage is used to possibly log a message when statistics
// are being discarded because of memory limits.
MaybeLogDiscardMessage(ctx context.Context)
Expand All @@ -103,6 +95,9 @@ type ApplicationStats interface {
// Free frees the current ApplicationStats and zeros out the memory counts
// and fingerprint counts.
Free(context.Context)

// Clear is like Free but also prepares the container for reuse.
Clear(context.Context)
}

// IteratorOptions provides the ability to the caller to change how it iterates
Expand Down

0 comments on commit 630d77b

Please sign in to comment.