From 630d77b939df2d138294c11d1d2a50de6012a7ef Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Fri, 3 May 2024 18:22:57 -0400 Subject: [PATCH] sqlstats: reuse the temporary statement stats container This commit cleans up the confusing swapping and usages of 2 fields on the StatsCollector, `ApplicationStats` and `flushTarget`. Originally the field `flushTarget`was only used for explicit transactions. In order to provide each statement with the correct txn fingerprint id, statements for explicit txns were stored in a temp stats container and flushed to the app container at the end of transaction execution. The assumption for implicit transactions was that they only had 1 statement and so such statements could be written directly to the parent `ApplicationStats` container rather than waiting for the entire transaction to finish. Now that we create temporary containers for both explicit and implicit txns we should reuse the temporary container between transactions instead of discarding and allocating a new one for each. Summary: - `ApplicationStats` field is on sslocal.StatsCollector is renamed to `currentTransactionStatementStats` - `FlushTarget` is always defined for a stats collector. It represents the current application's sql stats - Instead of allocating a new container for each new txn, we'll clear and reuse `currentTxnStatementStats` between transactions - StatsCollector no longer implementats the `sqlstats.ApplicationStats` interface. There isn't a need for StatsCollector to be used as an interface and doing so makes working with the various containers within StatsCollector more difficult. An exception is made for stats collectors belonging to an internal executor for an outer transaction. Such executors do not start or end the transaction, and so no temporary containers will be used in this case. Fixes: #94650 Release note: None --- pkg/sql/conn_executor.go | 4 + pkg/sql/sqlstats/sslocal/BUILD.bazel | 1 + pkg/sql/sqlstats/sslocal/sql_stats_test.go | 2 + .../sslocal/sslocal_stats_collector.go | 109 ++++++++++++------ .../sqlstats/ssmemstorage/ss_mem_storage.go | 45 +------- pkg/sql/sqlstats/ssprovider.go | 11 +- 6 files changed, 88 insertions(+), 84 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 6b89c990e94a..3692eff28035 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1145,6 +1145,7 @@ func (s *Server) newConnExecutor( ex.server.insights.Writer(ex.sessionData().Internal), ex.phaseTimes, s.sqlStats.GetCounters(), + ex.extraTxnState.fromOuterTxn, s.cfg.SQLStatsTestingKnobs, ) ex.dataMutatorIterator.onApplicationNameChange = func(newName string) { @@ -1259,6 +1260,9 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) { log.Warningf(ctx, "error closing cursors: %v", err) } + // Free any memory used by the stats collector. + ex.statsCollector.Free(ctx) + var payloadErr error if closeType == normalClose { // We'll cleanup the SQL txn by creating a non-retriable (commit:true) event. diff --git a/pkg/sql/sqlstats/sslocal/BUILD.bazel b/pkg/sql/sqlstats/sslocal/BUILD.bazel index 4fe6a6e6f51f..168c196ebccc 100644 --- a/pkg/sql/sqlstats/sslocal/BUILD.bazel +++ b/pkg/sql/sqlstats/sslocal/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/appstatspb", + "//pkg/sql/execstats", "//pkg/sql/pgwire/pgerror", "//pkg/sql/sessionphase", "//pkg/sql/sqlstats", diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_test.go b/pkg/sql/sqlstats/sslocal/sql_stats_test.go index b1788da68de9..b4015030e915 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats_test.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats_test.go @@ -464,6 +464,7 @@ func TestExplicitTxnFingerprintAccounting(t *testing.T) { insightsProvider.Writer(false /* internal */), sessionphase.NewTimes(), sqlStats.GetCounters(), + false, nil, /* knobs */ ) @@ -591,6 +592,7 @@ func TestAssociatingStmtStatsWithTxnFingerprint(t *testing.T) { insightsProvider.Writer(false /* internal */), sessionphase.NewTimes(), sqlStats.GetCounters(), + false, nil, /* knobs */ ) diff --git a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go index 5a6301ab38ef..8e8cfd080bd7 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" + "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" @@ -27,7 +28,13 @@ import ( // StatsCollector is used to collect statistics for transactions and // statements for the entire lifetime of a session. type StatsCollector struct { - sqlstats.ApplicationStats + + // currentTransactionStatementStats contains the current transaction's statement + // statistics. They will be flushed to flushTarget when the transaction is done + // so that we can include the transaction fingerprint ID as part of the + // statement's key. This container is local per stats collector and + // is cleared for reuse after every transaction. + currentTransactionStatementStats sqlstats.ApplicationStats // stmtFingerprintID is the fingerprint ID of the current statement we are // recording. Note that we don't observe sql stats for all statements (e.g. COMMIT). @@ -53,16 +60,20 @@ type StatsCollector struct { // statement insights. sendInsights bool + // flushTarget is the sql stats container for the current application. + // This is the target where the statement stats are flushed to upon + // transaction completion. Note that these are the global stats for the + // application. flushTarget sqlstats.ApplicationStats + // uniqueServerCounts is a pointer to the statement and transaction + // fingerprint counters tracked per server. uniqueServerCounts *ssmemstorage.SQLStatsAtomicCounters st *cluster.Settings knobs *sqlstats.TestingKnobs } -var _ sqlstats.ApplicationStats = &StatsCollector{} - // NewStatsCollector returns an instance of StatsCollector. func NewStatsCollector( st *cluster.Settings, @@ -70,15 +81,28 @@ func NewStatsCollector( insights insights.Writer, phaseTime *sessionphase.Times, uniqueServerCounts *ssmemstorage.SQLStatsAtomicCounters, + fromOuterTxn bool, knobs *sqlstats.TestingKnobs, ) *StatsCollector { + // See #124935 for more details. If fromOuterTxn is true, the + // executor owning the stats collector is not responsible for + // starting or committing the transaction. Since the statements + // are merged into flushTarget on EndTransaction, in this case the + // container would never be merged into the flushTarget. Instead + // we'll write directly to the flushTarget when we're collecting + // stats for a conn exec belonging to an outer transaction. + currentTransactionStatementStats := appStats + if !fromOuterTxn { + currentTransactionStatementStats = appStats.NewApplicationStatsWithInheritedOptions() + } return &StatsCollector{ - ApplicationStats: appStats, - insightsWriter: insights, - phaseTimes: phaseTime.Clone(), - uniqueServerCounts: uniqueServerCounts, - st: st, - knobs: knobs, + flushTarget: appStats, + currentTransactionStatementStats: currentTransactionStatementStats, + insightsWriter: insights, + phaseTimes: phaseTime.Clone(), + uniqueServerCounts: uniqueServerCounts, + st: st, + knobs: knobs, } } @@ -103,7 +127,7 @@ func (s *StatsCollector) PreviousPhaseTimes() *sessionphase.Times { return s.previousPhaseTimes } -// Reset resets the StatsCollector with a new ApplicationStats and a new copy +// Reset resets the StatsCollector with a new flushTarget and a new copy // of the sessionphase.Times. func (s *StatsCollector) Reset(appStats sqlstats.ApplicationStats, phaseTime *sessionphase.Times) { previousPhaseTime := s.phaseTimes @@ -114,12 +138,20 @@ func (s *StatsCollector) Reset(appStats sqlstats.ApplicationStats, phaseTime *se s.stmtFingerprintID = 0 } +// Free frees any local memory used by the stats collector. +func (s *StatsCollector) Free(ctx context.Context) { + // For stats collectors for executors with outer transactions, + // the currentTransactionStatementStats is the flush target. + // We should make sure we're never freeing the flush target, + // since that container exists beyond the stats collector. + if s.currentTransactionStatementStats != s.flushTarget { + s.currentTransactionStatementStats.Free(ctx) + } +} + // StartTransaction sets up the StatsCollector for a new transaction. -// The current application stats are reset for the new transaction. func (s *StatsCollector) StartTransaction() { s.sendInsights = s.shouldObserveInsights() - s.flushTarget = s.ApplicationStats - s.ApplicationStats = s.flushTarget.NewApplicationStatsWithInheritedOptions() } // EndTransaction informs the StatsCollector that the current txn has @@ -140,12 +172,7 @@ func (s *StatsCollector) EndTransaction( var discardedStats uint64 discardedStats += s.flushTarget.MergeApplicationStatementStats( - ctx, s.ApplicationStats, transactionFingerprintID, - ) - - discardedStats += s.flushTarget.MergeApplicationTransactionStats( - ctx, - s.ApplicationStats, + ctx, s.currentTransactionStatementStats, transactionFingerprintID, ) // Avoid taking locks if no stats are discarded. @@ -153,9 +180,7 @@ func (s *StatsCollector) EndTransaction( s.flushTarget.MaybeLogDiscardMessage(ctx) } - s.ApplicationStats.Free(ctx) - s.ApplicationStats = s.flushTarget - s.flushTarget = nil + s.currentTransactionStatementStats.Clear(ctx) } // ShouldSample returns two booleans, the first one indicates whether we @@ -165,24 +190,15 @@ func (s *StatsCollector) EndTransaction( func (s *StatsCollector) ShouldSample( fingerprint string, implicitTxn bool, database string, ) (previouslySampled bool, savePlanForStats bool) { - sampledInFlushTarget := false - savePlanForStatsInFlushTarget := true - - if s.flushTarget != nil { - sampledInFlushTarget, savePlanForStatsInFlushTarget = s.flushTarget.ShouldSample(fingerprint, implicitTxn, database) - } - sampledInAppStats, savePlanForStatsInAppStats := s.ApplicationStats.ShouldSample(fingerprint, implicitTxn, database) - previouslySampled = sampledInFlushTarget || sampledInAppStats - savePlanForStats = savePlanForStatsInFlushTarget && savePlanForStatsInAppStats - return previouslySampled, savePlanForStats + return s.flushTarget.ShouldSample(fingerprint, implicitTxn, database) } // UpgradeImplicitTxn informs the StatsCollector that the current txn has been // upgraded to an explicit transaction, thus all previously recorded statements // should be updated accordingly. func (s *StatsCollector) UpgradeImplicitTxn(ctx context.Context) error { - err := s.ApplicationStats.IterateStatementStats(ctx, sqlstats.IteratorOptions{}, + err := s.currentTransactionStatementStats.IterateStatementStats(ctx, sqlstats.IteratorOptions{}, func(_ context.Context, statistics *appstatspb.CollectedStatementStatistics) error { statistics.Key.ImplicitTxn = false return nil @@ -324,3 +340,30 @@ func (s *StatsCollector) ObserveTransaction( func (s *StatsCollector) StatementsContainerFull() bool { return s.uniqueServerCounts.GetStatementCount() >= s.uniqueServerCounts.UniqueStmtFingerprintLimit.Get(&s.st.SV) } + +// RecordStatement records the statistics of a statement. +func (s *StatsCollector) RecordStatement( + ctx context.Context, key appstatspb.StatementStatisticsKey, value sqlstats.RecordedStmtStats, +) (appstatspb.StmtFingerprintID, error) { + return s.currentTransactionStatementStats.RecordStatement(ctx, key, value) +} + +// RecordTransaction records the statistics of a transaction. +// Transaction stats are always recorded directly on the flushTarget. +func (s *StatsCollector) RecordTransaction( + ctx context.Context, key appstatspb.TransactionFingerprintID, value sqlstats.RecordedTxnStats, +) error { + return s.flushTarget.RecordTransaction(ctx, key, value) +} + +func (s *StatsCollector) RecordStatementExecStats( + key appstatspb.StatementStatisticsKey, stats execstats.QueryLevelStats, +) error { + return s.currentTransactionStatementStats.RecordStatementExecStats(key, stats) +} + +func (s *StatsCollector) IterateStatementStats( + ctx context.Context, opts sqlstats.IteratorOptions, f sqlstats.StatementVisitor, +) error { + return s.flushTarget.IterateStatementStats(ctx, opts, f) +} diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index 689446aa699c..c655f57b874e 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -361,12 +361,6 @@ func (t *txnStats) sizeUnsafeLocked() int64 { return txnStatsShallowSize + stmtFingerprintIDsSize + dataSize } -func (t *txnStats) mergeStats(stats *appstatspb.TransactionStatistics) { - t.mu.Lock() - defer t.mu.Unlock() - t.mu.data.Add(stats) -} - // stmtStats holds per-statement statistics. type stmtStats struct { // ID is the statementFingerprintID constructed using the stmtKey fields. @@ -671,10 +665,8 @@ func (s *Container) Clear(ctx context.Context) { } func (s *Container) clearLocked(ctx context.Context) { - // freeLocked needs to be called before we clear the containers as - // it uses the size of each container to decrements counters that - // track the node-wide unique in-memory fingerprint counts for stmts - // and txns. + // We must call freeLocked before clearing the containers as freeLocked + // reads the size of each container to reset the counters. s.freeLocked(ctx) // Clear the map, to release the memory; make the new map somewhat already @@ -755,39 +747,6 @@ func (s *Container) MergeApplicationStatementStats( return discardedStats } -// MergeApplicationTransactionStats implements the sqlstats.ApplicationStats interface. -func (s *Container) MergeApplicationTransactionStats( - ctx context.Context, other sqlstats.ApplicationStats, -) (discardedStats uint64) { - if err := other.IterateTransactionStats( - ctx, - sqlstats.IteratorOptions{}, - func(ctx context.Context, statistics *appstatspb.CollectedTransactionStatistics) error { - txnStats, _, throttled := - s.getStatsForTxnWithKey( - statistics.TransactionFingerprintID, - statistics.StatementFingerprintIDs, - true, /* createIfNonexistent */ - ) - - if throttled { - discardedStats++ - return nil - } - - txnStats.mergeStats(&statistics.Stats) - return nil - }); err != nil { - // Calling Iterate.*Stats() function with a visitor function that does not - // return error should not cause any error. - panic( - errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error returned when iterating through application stats"), - ) - } - - return discardedStats -} - // Add combines one Container into another. Add manages locks on a, so taking // a lock on a will cause a deadlock. func (s *Container) Add(ctx context.Context, other *Container) (err error) { diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go index 1668fd1fba4b..f04c59c12a7a 100644 --- a/pkg/sql/sqlstats/ssprovider.go +++ b/pkg/sql/sqlstats/ssprovider.go @@ -84,14 +84,6 @@ type ApplicationStats interface { transactionFingerprintID appstatspb.TransactionFingerprintID, ) uint64 - // MergeApplicationTransactionStats merges the other application's transaction - // statistics into the current ApplicationStats. It returns how many number - // of statistics were being discarded due to memory constraint. - MergeApplicationTransactionStats( - ctx context.Context, - other ApplicationStats, - ) uint64 - // MaybeLogDiscardMessage is used to possibly log a message when statistics // are being discarded because of memory limits. MaybeLogDiscardMessage(ctx context.Context) @@ -103,6 +95,9 @@ type ApplicationStats interface { // Free frees the current ApplicationStats and zeros out the memory counts // and fingerprint counts. Free(context.Context) + + // Clear is like Free but also prepares the container for reuse. + Clear(context.Context) } // IteratorOptions provides the ability to the caller to change how it iterates