Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contention: refactor to collect in sql stats #94750

Merged
merged 1 commit into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func distBackup(
noTxn, /* txn - the flow does not read or write the database */
nil, /* clockUpdater */
evalCtx.Tracing,
evalCtx.ExecCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ func distRestore(
noTxn, /* txn - the flow does not read or write the database */
nil, /* clockUpdater */
evalCtx.Tracing,
evalCtx.ExecCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func (e *Evaluator) executePlan(
nil,
nil, /* clockUpdater */
&sql.SessionTracing{},
e.execCfg.ContentionRegistry,
)

// Start execution.
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ func startDistChangefeed(
noTxn,
nil, /* clockUpdater */
evalCtx.Tracing,
execCtx.ExecCfg().ContentionRegistry,
)
defer recv.Release()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func distStreamIngest(
noTxn,
nil, /* clockUpdater */
evalCtx.Tracing,
execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ func runPlanInsidePlan(
params.p.Txn(),
params.ExecCfg().Clock,
params.p.extendedEvalCtx.Tracing,
params.p.ExecCfg().ContentionRegistry,
)
defer recv.Release()

Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,6 @@ func (sc *SchemaChanger) distIndexBackfill(
nil, /* txn - the flow does not run wholly in a txn */
sc.clock,
evalCtx.Tracing,
sc.execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down Expand Up @@ -1330,7 +1329,6 @@ func (sc *SchemaChanger) distColumnBackfill(
nil, /* txn - the flow does not run wholly in a txn */
sc.clock,
evalCtx.Tracing,
sc.execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1621,7 +1621,6 @@ func (ex *connExecutor) execWithDistSQLEngine(
planner.txn,
ex.server.cfg.Clock,
&ex.sessionTracing,
ex.server.cfg.ContentionRegistry,
)
recv.progressAtomic = progressAtomic
if ex.server.cfg.TestingKnobs.DistSQLReceiverPushCallbackFactory != nil {
Expand Down Expand Up @@ -2445,6 +2444,7 @@ func (ex *connExecutor) recordTransactionFinish(

if contentionDuration := ex.extraTxnState.accumulatedStats.ContentionTime.Nanoseconds(); contentionDuration > 0 {
ex.metrics.EngineMetrics.SQLContendedTxns.Inc(1)
ex.planner.DistSQLPlanner().distSQLSrv.Metrics.ContendedQueriesCount.Inc(1)
}

ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{
Expand Down
9 changes: 4 additions & 5 deletions pkg/sql/contentionpb/contention.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package cockroach.sql.contentionpb;
option go_package = "contentionpb";

import "roachpb/api.proto";

import "gogoproto/gogo.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
Expand Down Expand Up @@ -167,8 +166,8 @@ message ExtendedContentionEvent {
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID"
];

google.protobuf.Timestamp collection_ts = 5 [
(gogoproto.nullable) = false,
(gogoproto.stdtime) = true
];
google.protobuf.Timestamp collection_ts = 5 [
(gogoproto.nullable) = false,
(gogoproto.stdtime) = true
];
}
1 change: 0 additions & 1 deletion pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func TestDistSQLReceiverUpdatesCaches(t *testing.T) {
nil, /* txn */
nil, /* clockUpdater */
&SessionTracing{},
nil, /* contentionRegistry */
)

replicas := []roachpb.ReplicaDescriptor{{ReplicaID: 1}, {ReplicaID: 2}, {ReplicaID: 3}}
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/distsql_plan_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ func TestCdcExpressionExecution(t *testing.T) {
nil,
nil, /* clockUpdater */
planner.extendedEvalCtx.Tracing,
planner.execCfg.ContentionRegistry,
)
defer r.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/distsql_plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ func (dsp *DistSQLPlanner) planAndRunCreateStats(
txn,
evalCtx.ExecCfg.Clock,
evalCtx.Tracing,
evalCtx.ExecCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
70 changes: 16 additions & 54 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand All @@ -49,15 +47,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/ring"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
pbtypes "github.com/gogo/protobuf/types"
)

var settingDistSQLNumRunners = settings.RegisterIntSetting(
Expand Down Expand Up @@ -761,7 +757,6 @@ func (dsp *DistSQLPlanner) Run(
defer dsp.distSQLSrv.ServerConfig.Metrics.QueryStop()

recv.outputTypes = plan.GetResultTypes()
recv.contendedQueryMetric = dsp.distSQLSrv.Metrics.ContendedQueriesCount
if multitenant.TenantRUEstimateEnabled.Get(&dsp.st.SV) &&
dsp.distSQLSrv.TenantCostController != nil && planCtx.planner != nil {
if instrumentation := planCtx.planner.curPlan.instrumentation; instrumentation != nil {
Expand Down Expand Up @@ -909,12 +904,6 @@ type DistSQLReceiver struct {
expectedRowsRead int64
progressAtomic *uint64

// contendedQueryMetric is a Counter that is incremented at most once if the
// query produces at least one contention event.
contendedQueryMetric *metric.Counter
// contentionRegistry is a Registry that contention events are added to.
contentionRegistry *contention.Registry

testingKnobs struct {
// pushCallback, if set, will be called every time DistSQLReceiver.Push
// is called, with the same arguments.
Expand Down Expand Up @@ -1129,7 +1118,6 @@ func MakeDistSQLReceiver(
txn *kv.Txn,
clockUpdater clockUpdater,
tracing *SessionTracing,
contentionRegistry *contention.Registry,
) *DistSQLReceiver {
consumeCtx, cleanup := tracing.TraceExecConsume(ctx)
r := receiverSyncPool.Get().(*DistSQLReceiver)
Expand All @@ -1142,15 +1130,14 @@ func MakeDistSQLReceiver(
}
}
*r = DistSQLReceiver{
ctx: consumeCtx,
cleanup: cleanup,
rangeCache: rangeCache,
txn: txn,
clockUpdater: clockUpdater,
stats: &topLevelQueryStats{},
stmtType: stmtType,
tracing: tracing,
contentionRegistry: contentionRegistry,
ctx: consumeCtx,
cleanup: cleanup,
rangeCache: rangeCache,
txn: txn,
clockUpdater: clockUpdater,
stats: &topLevelQueryStats{},
stmtType: stmtType,
tracing: tracing,
}
r.resultWriterMu.row = resultWriter
r.resultWriterMu.batch = batchWriter
Expand All @@ -1169,15 +1156,14 @@ func (r *DistSQLReceiver) Release() {
func (r *DistSQLReceiver) clone() *DistSQLReceiver {
ret := receiverSyncPool.Get().(*DistSQLReceiver)
*ret = DistSQLReceiver{
ctx: r.ctx,
cleanup: func() {},
rangeCache: r.rangeCache,
txn: r.txn,
clockUpdater: r.clockUpdater,
stats: r.stats,
stmtType: tree.Rows,
tracing: r.tracing,
contentionRegistry: r.contentionRegistry,
ctx: r.ctx,
cleanup: func() {},
rangeCache: r.rangeCache,
txn: r.txn,
clockUpdater: r.clockUpdater,
stats: r.stats,
stmtType: tree.Rows,
tracing: r.tracing,
}
return ret
}
Expand Down Expand Up @@ -1291,30 +1277,6 @@ func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra
if span := tracing.SpanFromContext(r.ctx); span != nil {
span.ImportRemoteRecording(meta.TraceData)
}
var ev roachpb.ContentionEvent
for i := range meta.TraceData {
meta.TraceData[i].Structured(func(any *pbtypes.Any, _ time.Time) {
if !pbtypes.Is(any, &ev) {
return
}
if err := pbtypes.UnmarshalAny(any, &ev); err != nil {
return
}
if r.contendedQueryMetric != nil {
// Increment the contended query metric at most once
// if the query sees at least one contention event.
r.contendedQueryMetric.Inc(1)
r.contendedQueryMetric = nil
}
contentionEvent := contentionpb.ExtendedContentionEvent{
BlockingEvent: ev,
}
if r.txn != nil {
contentionEvent.WaitingTxnID = r.txn.ID()
}
r.contentionRegistry.AddContentionEvent(contentionEvent)
})
}
}
if meta.Metrics != nil {
r.stats.bytesRead += meta.Metrics.BytesRead
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) {
txn,
execCfg.Clock,
p.ExtendedEvalContext().Tracing,
execCfg.ContentionRegistry,
)

// We need to re-plan every time, since the plan is closed automatically
Expand Down Expand Up @@ -223,7 +222,6 @@ func TestDistSQLReceiverErrorRanking(t *testing.T) {
txn,
nil, /* clockUpdater */
&SessionTracing{},
nil, /* contentionRegistry */
)

retryErr := roachpb.NewErrorWithTxn(
Expand Down Expand Up @@ -367,7 +365,6 @@ func TestDistSQLReceiverDrainsOnError(t *testing.T) {
nil, /* txn */
nil, /* clockUpdater */
&SessionTracing{},
nil, /* contentionRegistry */
)
status := recv.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: errors.New("some error")})
require.Equal(t, execinfra.DrainRequested, status)
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/idxrecommendations"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -185,7 +186,7 @@ func (ex *connExecutor) recordStatementSummary(
}
recordedStmtStats := sqlstats.RecordedStmtStats{
SessionID: ex.sessionID,
StatementID: planner.stmt.QueryID,
StatementID: stmt.QueryID,
AutoRetryCount: automaticRetryCount,
AutoRetryReason: ex.state.mu.autoRetryReason,
RowsAffected: rowsAffected,
Expand Down Expand Up @@ -226,6 +227,15 @@ func (ex *connExecutor) recordStatementSummary(
// Record statement execution statistics if span is recorded and no error was
// encountered while collecting query-level statistics.
if queryLevelStatsOk {
for _, ev := range queryLevelStats.ContentionEvents {
contentionEvent := contentionpb.ExtendedContentionEvent{
BlockingEvent: ev,
WaitingTxnID: planner.txn.ID(),
}

ex.server.cfg.ContentionRegistry.AddContentionEvent(contentionEvent)
}

err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, *queryLevelStats)
if err != nil {
if log.V(2 /* level */) {
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/importer/import_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ func distImport(
nil, /* txn - the flow does not read or write the database */
nil, /* clockUpdater */
evalCtx.Tracing,
evalCtx.ExecCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (ib *IndexBackfillPlanner) plan(
nil, /* txn - the flow does not run wholly in a txn */
ib.execCfg.Clock,
evalCtx.Tracing,
ib.execCfg.ContentionRegistry,
)
defer recv.Release()
evalCtxCopy := evalCtx
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/mvcc_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func (im *IndexBackfillerMergePlanner) plan(
nil, /* txn - the flow does not run wholly in a txn */
im.execCfg.Clock,
evalCtx.Tracing,
im.execCfg.ContentionRegistry,
)
defer recv.Release()
evalCtxCopy := evalCtx
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
// because it sets "enabled: false" and thus none of the
// other fields are used.
&SessionTracing{},
sc.execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func (dsp *DistSQLPlanner) Exec(
p.txn,
execCfg.Clock,
p.ExtendedEvalContext().Tracing,
execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down Expand Up @@ -174,7 +173,6 @@ func (dsp *DistSQLPlanner) ExecLocalAll(
p.txn,
execCfg.Clock,
p.ExtendedEvalContext().Tracing,
execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
nil, /* txn */
nil, /* clockUpdater */
evalCtx.Tracing,
execCfg.ContentionRegistry,
)
defer distSQLReceiver.Release()

Expand Down