diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index c37a9c940511..68e7762422cf 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -2817,6 +2817,11 @@ contains common SQL event/execution details. | `KVRowsRead` | The number of rows read at the KV layer for this query. | no | | `NetworkMessages` | The number of network messages sent by nodes for this query. | no | | `IndexRecommendations` | Generated index recommendations for this query. | no | +| `ScanCount` | The number of scans in the query plan. | no | +| `ScanWithStatsCount` | The number of scans using statistics (including forecasted statistics) in the query plan. | no | +| `ScanWithStatsForecastCount` | The number of scans using forecasted statistics in the query plan. | no | +| `TotalScanRowsWithoutForecastsEstimate` | Total number of rows read by all scans in the query, as estimated by the optimizer without using forecasts. | no | +| `NanosSinceStatsForecasted` | The greatest quantity of nanoseconds that have passed since the forecast time (or until the forecast time, if it is in the future, in which case it will be negative) for any table with forecasted stats scanned by this query. | no | #### Common fields diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 300a2db20f65..5ed5def0739a 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -402,7 +402,7 @@ func (c *parallelEventConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Eve startTime := timeutil.Now().UnixNano() defer func() { time := timeutil.Now().UnixNano() - c.metrics.ParallelConsumerConsumeNanos.Inc(time - startTime) + c.metrics.ParallelConsumerConsumeNanos.RecordValue(time - startTime) }() bucket := c.getBucketForEvent(ev) @@ -486,14 +486,13 @@ func (c *parallelEventConsumer) workerLoop( func (c *parallelEventConsumer) incInFlight() { c.mu.Lock() c.mu.inFlight++ + c.metrics.ParallelConsumerInFlightEvents.Update(int64(c.mu.inFlight)) c.mu.Unlock() - c.metrics.ParallelConsumerInFlightEvents.Inc(1) } func (c *parallelEventConsumer) decInFlight() { c.mu.Lock() c.mu.inFlight-- - c.metrics.ParallelConsumerInFlightEvents.Dec(1) notifyFlush := c.mu.waiting && c.mu.inFlight == 0 c.mu.Unlock() @@ -521,7 +520,7 @@ func (c *parallelEventConsumer) Flush(ctx context.Context) error { startTime := timeutil.Now().UnixNano() defer func() { time := timeutil.Now().UnixNano() - c.metrics.ParallelConsumerFlushNanos.Inc(time - startTime) + c.metrics.ParallelConsumerFlushNanos.RecordValue(time - startTime) }() needFlush := func() bool { diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index cd625b0f60fd..3496573e5a61 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -576,8 +576,8 @@ type Metrics struct { FrontierUpdates *metric.Counter ThrottleMetrics cdcutils.Metrics ReplanCount *metric.Counter - ParallelConsumerFlushNanos *metric.Counter - ParallelConsumerConsumeNanos *metric.Counter + ParallelConsumerFlushNanos *metric.Histogram + ParallelConsumerConsumeNanos *metric.Histogram ParallelConsumerInFlightEvents *metric.Gauge mu struct { @@ -609,8 +609,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates), ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow), ReplanCount: metric.NewCounter(metaChangefeedReplanCount), - ParallelConsumerFlushNanos: metric.NewCounter(metaChangefeedEventConsumerFlushNanos), - ParallelConsumerConsumeNanos: metric.NewCounter(metaChangefeedEventConsumerConsumeNanos), + ParallelConsumerFlushNanos: metric.NewHistogram(metaChangefeedEventConsumerFlushNanos, histogramWindow, metric.IOLatencyBuckets), + ParallelConsumerConsumeNanos: metric.NewHistogram(metaChangefeedEventConsumerConsumeNanos, histogramWindow, metric.IOLatencyBuckets), ParallelConsumerInFlightEvents: metric.NewGauge(metaChangefeedEventConsumerInFlightEvents), } diff --git a/pkg/kv/kvserver/api.proto b/pkg/kv/kvserver/api.proto index 18453f87763c..9cc67b95a2bb 100644 --- a/pkg/kv/kvserver/api.proto +++ b/pkg/kv/kvserver/api.proto @@ -36,22 +36,14 @@ message CollectChecksumRequest { bytes checksum_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ChecksumID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; - reserved 4; - // If true then the response must include the snapshot of the data from which - // the checksum is computed. - bool with_snapshot = 5; + reserved 4, 5; } message CollectChecksumResponse { // The checksum is the sha512 hash of the requested computation. It is empty // if the computation failed. bytes checksum = 1; - // snapshot is set if the with_snapshot in CollectChecksumRequest is true. For - // example, it can be set by the caller when it has detected an inconsistency. - // - // TODO(tschottdorf): with larger ranges, this is no longer tenable. - // See https://github.com/cockroachdb/cockroach/issues/21128. - roachpb.RaftSnapshotData snapshot = 2; + reserved 2; // delta carries the stats of the range minus the recomputed stats. storage.enginepb.MVCCStatsDelta delta = 3 [(gogoproto.nullable) = false]; // persisted carries the persisted stats of the replica. diff --git a/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go b/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go index 563d74cedfba..48a1a7236259 100644 --- a/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go +++ b/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go @@ -63,12 +63,11 @@ func ComputeChecksum( var pd result.Result pd.Replicated.ComputeChecksum = &kvserverpb.ComputeChecksum{ - Version: args.Version, - ChecksumID: reply.ChecksumID, - SaveSnapshot: args.Snapshot, - Mode: args.Mode, - Checkpoint: args.Checkpoint, - Terminate: args.Terminate, + Version: args.Version, + ChecksumID: reply.ChecksumID, + Mode: args.Mode, + Checkpoint: args.Checkpoint, + Terminate: args.Terminate, } return pd, nil } diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index 3f6a2c4e6aaf..095ab2fcc0d6 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -222,10 +222,12 @@ func TestCheckConsistencyInconsistent(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // TODO(pavelkalinnikov): not if we remove TestingSetRedactable below? skip.UnderRaceWithIssue(t, 81819, "slow test, and TestingSetRedactable triggers race detector") // This test prints a consistency checker diff, so it's // good to make sure we're overly redacting said diff. + // TODO(pavelkalinnikov): remove this since we don't print diffs anymore? defer log.TestingSetRedactable(true)() // Test expects simple MVCC value encoding. diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index d7e51d211cf8..eb9e7cbd4c73 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -75,9 +75,7 @@ message ComputeChecksum { // that hardcoded in the binary will a computation be carried out. uint32 version = 5; - // SaveSnapshot indicates that the snapshot used to compute the checksum - // should be saved so that a diff of divergent replicas can later be computed. - bool save_snapshot = 2; + reserved 2; roachpb.ChecksumMode mode = 3; // If set, a checkpoint (i.e. cheap backup) of the engine will be taken. This // is expected to be set only if we already know that there is an diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 8b65e58ab7c7..76793390cff3 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -76,8 +76,8 @@ type replicaChecksum struct { // other replicas. These are inspected and a CheckConsistencyResponse is assembled. // // When req.Mode is CHECK_VIA_QUEUE and an inconsistency is detected, the -// consistency check will be re-run to collect a diff, which is then printed -// before calling `log.Fatal`. This behavior should be lifted to the consistency +// consistency check will be re-run to save storage engine checkpoints and +// terminate suspicious nodes. This behavior should be lifted to the consistency // checker queue in the future. func (r *Replica) CheckConsistency( ctx context.Context, req roachpb.CheckConsistencyRequest, @@ -146,12 +146,6 @@ func (r *Replica) checkConsistencyImpl( &results[idx].Response.Delta, ) } - minoritySnap := results[shaToIdxs[minoritySHA][0]].Response.Snapshot - curSnap := results[shaToIdxs[sha][0]].Response.Snapshot - if sha != minoritySHA && minoritySnap != nil && curSnap != nil { - diff := DiffRange(curSnap, minoritySnap) - buf.Printf("====== diff(%x, [minority]) ======\n%v", redact.Safe(sha), diff) - } } if isQueue { @@ -246,18 +240,17 @@ func (r *Replica) checkConsistencyImpl( return resp, roachpb.NewError(err) } - if args.Snapshot { - // A diff was already printed. Return because all the code below will do - // is request another consistency check, with a diff and with - // instructions to terminate the minority nodes. + if args.Checkpoint { + // A checkpoint/termination request has already been sent. Return because + // all the code below will do is request another consistency check, with + // instructions to make a checkpoint and to terminate the minority nodes. log.Errorf(ctx, "consistency check failed") return resp, nil } - // No diff was printed, so we want to re-run the check with snapshots - // requested, to build the diff. Note that this recursive call will be - // terminated in the `args.Snapshot` branch above. - args.Snapshot = true + // No checkpoint was requested, so we want to re-run the check with + // checkpoints and termination of suspicious nodes. Note that this recursive + // call will be terminated in the `args.Checkpoint` branch above. args.Checkpoint = true for _, idxs := range shaToIdxs[minoritySHA] { args.Terminate = append(args.Terminate, results[idxs].Replica) @@ -279,10 +272,11 @@ func (r *Replica) checkConsistencyImpl( // // See: // https://github.com/cockroachdb/cockroach/issues/36861 + // TODO(pavelkalinnikov): remove this now that diffs are not printed? defer log.TemporarilyDisableFileGCForMainLogger()() if _, pErr := r.checkConsistencyImpl(ctx, args); pErr != nil { - log.Errorf(ctx, "replica inconsistency detected; could not obtain actual diff: %s", pErr) + log.Errorf(ctx, "replica inconsistency detected; second round failed: %s", pErr) } return resp, nil @@ -296,7 +290,7 @@ type ConsistencyCheckResult struct { } func (r *Replica) collectChecksumFromReplica( - ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, withSnap bool, + ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, ) (CollectChecksumResponse, error) { conn, err := r.store.cfg.NodeDialer.Dial(ctx, replica.NodeID, rpc.DefaultClass) if err != nil { @@ -308,7 +302,6 @@ func (r *Replica) collectChecksumFromReplica( StoreRequestHeader: StoreRequestHeader{NodeID: replica.NodeID, StoreID: replica.StoreID}, RangeID: r.RangeID, ChecksumID: id, - WithSnapshot: withSnap, } resp, err := client.CollectChecksum(ctx, req) if err != nil { @@ -350,7 +343,7 @@ func (r *Replica) runConsistencyCheck( if err := r.store.Stopper().RunAsyncTask(ctx, "storage.Replica: checking consistency", func(ctx context.Context) { defer wg.Done() - resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, req.Snapshot) + resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID) resultCh <- ConsistencyCheckResult{ Replica: replica, Response: resp, @@ -467,10 +460,8 @@ func (*Replica) checksumInitialWait(ctx context.Context) time.Duration { } // computeChecksumDone sends the checksum computation result to the receiver. -func (*Replica) computeChecksumDone( - rc *replicaChecksum, result *replicaHash, snapshot *roachpb.RaftSnapshotData, -) { - c := CollectChecksumResponse{Snapshot: snapshot} +func (*Replica) computeChecksumDone(rc *replicaChecksum, result *replicaHash) { + var c CollectChecksumResponse if result != nil { c.Checksum = result.SHA512[:] delta := result.PersistedMS @@ -649,7 +640,7 @@ func (*Replica) sha512( return nil, err } if snapshot != nil { - // Add LeaseAppliedState to the diff. + // Add LeaseAppliedState to the snapshot. kv := roachpb.RaftSnapshotData_KeyValue{ Timestamp: hlc.Timestamp{}, } @@ -752,16 +743,12 @@ func (r *Replica) computeChecksumPostApply( ); err != nil { log.Errorf(ctx, "checksum collection did not join: %v", err) } else { - var snapshot *roachpb.RaftSnapshotData - if cc.SaveSnapshot { - snapshot = &roachpb.RaftSnapshotData{} - } - result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter) + result, err := r.sha512(ctx, desc, snap, nil /* snapshot */, cc.Mode, r.store.consistencyLimiter) if err != nil { log.Errorf(ctx, "checksum computation failed: %v", err) result = nil } - r.computeChecksumDone(c, result, snapshot) + r.computeChecksumDone(c, result) } var shouldFatal bool @@ -776,10 +763,10 @@ func (r *Replica) computeChecksumPostApply( } // This node should fatal as a result of a previous consistency check (i.e. - // this round is carried out only to obtain a diff). If we fatal too early, - // the diff won't make it back to the leaseholder and thus won't be printed - // to the logs. Since we're already in a goroutine that's about to end, - // simply sleep for a few seconds and then terminate. + // this round only saves checkpoints and kills some nodes). If we fatal too + // early, the reply won't make it back to the leaseholder, so it will not be + // certain of completing the check. Since we're already in a goroutine + // that's about to end, just sleep for a few seconds and then terminate. auxDir := r.store.engine.GetAuxiliaryDir() _ = r.store.engine.MkdirAll(auxDir) path := base.PreventedStartupFile(auxDir) diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index f3154aca6842..9b44389264b7 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -64,9 +64,6 @@ func (is Server) CollectChecksum( if err != nil { return err } - if !req.WithSnapshot { - ccr.Snapshot = nil - } resp = &ccr return nil }) diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index eda6d039b803..c3b09cbe881d 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -1435,10 +1435,7 @@ message ComputeChecksumRequest { // The version used to pick the checksum method. It allows us to use a // consistent checksumming method across replicas. uint32 version = 2; - reserved 3; - // Compute a checksum along with a snapshot of the entire range, that will be - // used in logging a diff during checksum verification. - bool snapshot = 4; + reserved 3, 4; // The type of checksum to compute. See ChecksumMode. ChecksumMode mode = 5; // If set, a checkpoint (i.e. cheap backup) of the storage engine will be diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 3f7f42e807a4..9f004cc1ca2a 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -968,7 +968,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { jobRegistry.SetInternalExecutorFactory(ieFactory) execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg) execCfg.IndexMerger = sql.NewIndexBackfillerMergePlanner(execCfg) - execCfg.IndexValidator = scdeps.NewIndexValidator( + execCfg.Validator = scdeps.NewValidator( execCfg.DB, execCfg.Codec, execCfg.Settings, diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go index 5b819b253701..f456ffbd8aaa 100644 --- a/pkg/sql/crdb_internal_test.go +++ b/pkg/sql/crdb_internal_test.go @@ -16,6 +16,7 @@ import ( "fmt" "net/url" "strings" + "sync" "sync/atomic" "testing" "time" @@ -937,6 +938,122 @@ func TestIsAtLeastVersion(t *testing.T) { } } +func TestTxnContentionEventsTable(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Start the cluster. (One node is sufficient; the outliers system + // is currently in-memory only.) + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + args := base.TestClusterArgs{ServerArgs: base.TestServerArgs{Settings: settings}} + tc := testcluster.StartTestCluster(t, 1, args) + defer tc.Stopper().Stop(ctx) + conn := tc.ServerConn(0) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + sqlDB.Exec( + t, + `SET CLUSTER SETTING sql.metrics.statement_details.plan_collection.enabled = false;`) + + // Reduce the resolution interval to speed up the test. + sqlDB.Exec( + t, + `SET CLUSTER SETTING sql.contention.event_store.resolution_interval = '100ms'`) + + sqlDB.Exec(t, "CREATE TABLE t (id string, s string);") + + causeContention := func(insertValue string, updateValue string) { + // Create a new connection, and then in a go routine have it start a + // transaction, update a row, sleep for a time, and then complete the + // transaction. With original connection attempt to update the same row + // being updated concurrently in the separate go routine, this will be + // blocked until the original transaction completes. + var wgTxnStarted sync.WaitGroup + wgTxnStarted.Add(1) + + // Lock to wait for the txn to complete to avoid the test finishing + // before the txn is committed. + var wgTxnDone sync.WaitGroup + wgTxnDone.Add(1) + + go func() { + defer wgTxnDone.Done() + tx, errTxn := conn.BeginTx(ctx, &gosql.TxOptions{}) + require.NoError(t, errTxn) + _, errTxn = tx.ExecContext(ctx, + "INSERT INTO t (id, s) VALUES ('test', $1);", + insertValue) + require.NoError(t, errTxn) + wgTxnStarted.Done() + _, errTxn = tx.ExecContext(ctx, "select pg_sleep(.5);") + require.NoError(t, errTxn) + errTxn = tx.Commit() + require.NoError(t, errTxn) + }() + + start := timeutil.Now() + + // Need to wait for the txn to start to ensure lock contention. + wgTxnStarted.Wait() + // This will be blocked until the updateRowWithDelay finishes. + _, errUpdate := conn.ExecContext( + ctx, "UPDATE t SET s = $1 where id = 'test';", updateValue) + require.NoError(t, errUpdate) + end := timeutil.Now() + require.GreaterOrEqual(t, end.Sub(start), 500*time.Millisecond) + + wgTxnDone.Wait() + } + + causeContention("insert1", "update1") + causeContention("insert2", "update2") + + rowCount := 0 + + // Verify the table content is valid. + // Filter the fingerprint id to only be the query in the test. + // This ensures the event is the one caused in the test and not by some other + // internal workflow. + testutils.SucceedsWithin(t, func() error { + rows, errVerify := conn.QueryContext(ctx, `SELECT + blocking_txn_id, + waiting_txn_id + FROM crdb_internal.transaction_contention_events tce + inner join ( + select + transaction_fingerprint_id, + metadata->'query' as query + from crdb_internal.statement_statistics t + where metadata->>'query' like 'UPDATE t SET %') stats + on stats.transaction_fingerprint_id = tce.waiting_txn_fingerprint_id`) + if errVerify != nil { + return errVerify + } + + for rows.Next() { + rowCount++ + + var blocking, waiting string + errVerify = rows.Scan(&blocking, &waiting) + if errVerify != nil { + return errVerify + } + + } + + if rowCount < 1 { + return fmt.Errorf("transaction_contention_events did not return any rows") + } + return nil + }, 5*time.Second) + + require.LessOrEqual(t, rowCount, 2, "transaction_contention_events "+ + "found 3 rows. It should only record first, but there is a chance based "+ + "on sampling to get 2 rows.") + +} + // This test doesn't care about the contents of these virtual tables; // other places (the insights integration tests) do that for us. // What we look at here is the role-option-checking we need to make sure diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 87eaa24a5250..4ce38aa6d11a 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -457,6 +457,12 @@ func (p *planner) maybeLogStatementInternal( KVRowsRead: stats.KVRowsRead, NetworkMessages: stats.NetworkMessages, IndexRecommendations: indexRecs, + + ScanCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanCount]), + ScanWithStatsCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanWithStatsCount]), + ScanWithStatsForecastCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanWithStatsForecastCount]), + TotalScanRowsWithoutForecastsEstimate: p.curPlan.instrumentation.totalScanRowsWithoutForecasts, + NanosSinceStatsForecasted: int64(p.curPlan.instrumentation.nanosSinceStatsForecasted), } p.logOperationalEventsOnlyExternally(ctx, isCopy, &sampledQuery) } else { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index cb46543c74ab..d5f190814a7b 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1257,8 +1257,8 @@ type ExecutorConfig struct { // IndexMerger is also used to backfill indexes and is also rather circular. IndexMerger *IndexBackfillerMergePlanner - // IndexValidator is used to validate indexes. - IndexValidator scexec.IndexValidator + // Validator is used to validate indexes and check constraints. + Validator scexec.Validator // ContentionRegistry is a node-level registry of contention events used for // contention observability. diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 333503155710..7c34be1db875 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -121,8 +121,8 @@ type instrumentationHelper struct { queryLevelStatsWithErr *execstats.QueryLevelStatsWithErr - // If savePlanForStats is true, the explainPlan will be collected and returned - // via PlanForStats(). + // If savePlanForStats is true and the explainPlan was collected, the + // serialized version of the plan will be returned via PlanForStats(). savePlanForStats bool explainPlan *explain.Plan @@ -155,6 +155,11 @@ type instrumentationHelper struct { // as estimated by the optimizer. totalScanRows float64 + // totalScanRowsWithoutForecasts is the total number of rows read by all scans + // in the query, as estimated by the optimizer without using forecasts. (If + // forecasts were not used, this should be the same as totalScanRows.) + totalScanRowsWithoutForecasts float64 + // outputRows is the number of rows output by the query, as estimated by the // optimizer. outputRows float64 @@ -167,6 +172,12 @@ type instrumentationHelper struct { // passed since stats were collected on any table scanned by this query. nanosSinceStatsCollected time.Duration + // nanosSinceStatsForecasted is the greatest quantity of nanoseconds that have + // passed since the forecast time (or until the forecast time, if it is in the + // future, in which case it will be negative) for any table with forecasted + // stats scanned by this query. + nanosSinceStatsForecasted time.Duration + // joinTypeCounts records the number of times each type of logical join was // used in the query. joinTypeCounts map[descpb.JoinType]int @@ -174,6 +185,9 @@ type instrumentationHelper struct { // joinAlgorithmCounts records the number of times each type of join algorithm // was used in the query. joinAlgorithmCounts map[exec.JoinAlgorithm]int + + // scanCounts records the number of times scans were used in the query. + scanCounts [exec.NumScanCountTypes]int } // outputMode indicates how the statement output needs to be populated (for @@ -255,8 +269,8 @@ func (ih *instrumentationHelper) Setup( ih.stmtDiagnosticsRecorder = stmtDiagnosticsRecorder ih.withStatementTrace = cfg.TestingKnobs.WithStatementTrace - ih.savePlanForStats = - statsCollector.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, p.SessionData().Database) + var previouslySampled bool + previouslySampled, ih.savePlanForStats = statsCollector.ShouldSample(fingerprint, implicitTxn, p.SessionData().Database) defer func() { if ih.ShouldBuildExplainPlan() { @@ -289,7 +303,7 @@ func (ih *instrumentationHelper) Setup( ih.collectExecStats = collectTxnExecStats - if !collectTxnExecStats && ih.savePlanForStats { + if !collectTxnExecStats && !previouslySampled { // We don't collect the execution stats for statements in this txn, but // this is the first time we see this statement ever, so we'll collect // its execution stats anyway (unless the user disabled txn stats @@ -439,7 +453,7 @@ func (ih *instrumentationHelper) ShouldUseJobForCreateStats() bool { // ShouldBuildExplainPlan returns true if we should build an explain plan and // call RecordExplainPlan. func (ih *instrumentationHelper) ShouldBuildExplainPlan() bool { - return ih.collectBundle || ih.collectExecStats || ih.savePlanForStats || + return ih.collectBundle || ih.collectExecStats || ih.outputMode == explainAnalyzePlanOutput || ih.outputMode == explainAnalyzeDistSQLOutput } @@ -472,7 +486,7 @@ func (ih *instrumentationHelper) RecordPlanInfo( // collected (nil otherwise). It should be called after RecordExplainPlan() and // RecordPlanInfo(). func (ih *instrumentationHelper) PlanForStats(ctx context.Context) *roachpb.ExplainTreePlanNode { - if ih.explainPlan == nil { + if ih.explainPlan == nil || !ih.savePlanForStats { return nil } diff --git a/pkg/sql/opt/exec/execbuilder/builder.go b/pkg/sql/opt/exec/execbuilder/builder.go index a135292cf793..b20712166a33 100644 --- a/pkg/sql/opt/exec/execbuilder/builder.go +++ b/pkg/sql/opt/exec/execbuilder/builder.go @@ -142,10 +142,21 @@ type Builder struct { // as estimated by the optimizer. TotalScanRows float64 + // TotalScanRowsWithoutForecasts is the total number of rows read by all scans + // in the query, as estimated by the optimizer without using forecasts. (If + // forecasts were not used, this should be the same as TotalScanRows.) + TotalScanRowsWithoutForecasts float64 + // NanosSinceStatsCollected is the maximum number of nanoseconds that have // passed since stats were collected on any table scanned by this query. NanosSinceStatsCollected time.Duration + // NanosSinceStatsForecasted is the greatest quantity of nanoseconds that have + // passed since the forecast time (or until the forecast time, if the it is in + // the future, in which case it will be negative) for any table with + // forecasted stats scanned by this query. + NanosSinceStatsForecasted time.Duration + // JoinTypeCounts records the number of times each type of logical join was // used in the query. JoinTypeCounts map[descpb.JoinType]int @@ -154,6 +165,9 @@ type Builder struct { // was used in the query. JoinAlgorithmCounts map[exec.JoinAlgorithm]int + // ScanCounts records the number of times scans were used in the query. + ScanCounts [exec.NumScanCountTypes]int + // wrapFunctionOverride overrides default implementation to return resolvable // function reference for function with specified function name. // The default can be overridden by calling SetBuiltinFuncWrapper method to provide diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index cc0d423ea46e..a1fadd065376 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -14,6 +14,7 @@ import ( "bytes" "context" "fmt" + "math" "strings" "github.com/cockroachdb/cockroach/pkg/server/telemetry" @@ -732,7 +733,8 @@ func (b *Builder) buildScan(scan *memo.ScanExpr) (execPlan, error) { // Save if we planned a full table/index scan on the builder so that the // planner can be made aware later. We only do this for non-virtual tables. - stats := scan.Relational().Statistics() + relProps := scan.Relational() + stats := relProps.Statistics() if !tab.IsVirtualTable() && isUnfiltered { large := !stats.Available || stats.RowCount > b.evalCtx.SessionData().LargeFullScanRows if scan.Index == cat.PrimaryIndex { @@ -747,16 +749,50 @@ func (b *Builder) buildScan(scan *memo.ScanExpr) (execPlan, error) { } } - // Save the total estimated number of rows scanned and the time since stats - // were collected. + // Save some instrumentation info. + b.ScanCounts[exec.ScanCount]++ if stats.Available { b.TotalScanRows += stats.RowCount - if tab.StatisticCount() > 0 { - // The first stat is the most recent one. - nanosSinceStatsCollected := timeutil.Since(tab.Statistic(0).CreatedAt()) + b.ScanCounts[exec.ScanWithStatsCount]++ + + // The first stat is the most recent one. Check if it was a forecast. + var first int + if first < tab.StatisticCount() && tab.Statistic(first).IsForecast() { + if b.evalCtx.SessionData().OptimizerUseForecasts { + b.ScanCounts[exec.ScanWithStatsForecastCount]++ + + // Calculate time since the forecast (or negative time until the forecast). + nanosSinceStatsForecasted := timeutil.Since(tab.Statistic(first).CreatedAt()) + if nanosSinceStatsForecasted.Abs() > b.NanosSinceStatsForecasted.Abs() { + b.NanosSinceStatsForecasted = nanosSinceStatsForecasted + } + } + // Find the first non-forecast stat. + for first < tab.StatisticCount() && tab.Statistic(first).IsForecast() { + first++ + } + } + + if first < tab.StatisticCount() { + tabStat := tab.Statistic(first) + + nanosSinceStatsCollected := timeutil.Since(tabStat.CreatedAt()) if nanosSinceStatsCollected > b.NanosSinceStatsCollected { b.NanosSinceStatsCollected = nanosSinceStatsCollected } + + // Calculate another row count estimate using these (non-forecast) + // stats. If forecasts were not used, this should be the same as + // stats.RowCount. + rowCountWithoutForecast := float64(tabStat.RowCount()) + rowCountWithoutForecast *= stats.Selectivity.AsFloat() + minCardinality, maxCardinality := relProps.Cardinality.Min, relProps.Cardinality.Max + if rowCountWithoutForecast > float64(maxCardinality) && maxCardinality != math.MaxUint32 { + rowCountWithoutForecast = float64(maxCardinality) + } else if rowCountWithoutForecast < float64(minCardinality) { + rowCountWithoutForecast = float64(minCardinality) + } + b.TotalScanRowsWithoutForecasts += rowCountWithoutForecast } } diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index 5fa4d4435bca..989a2db9cb2f 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -386,3 +386,18 @@ const ( ZigZagJoin NumJoinAlgorithms ) + +// ScanCountType is the type of count of scan operations in a query. +type ScanCountType int + +const ( + // ScanCount is the count of all scans in a query. + ScanCount ScanCountType = iota + // ScanWithStatsCount is the count of scans with statistics in a query. + ScanWithStatsCount + // ScanWithStatsForecastCount is the count of scans which used forecasted + // statistics in a query. + ScanWithStatsForecastCount + // NumScanCountTypes is the total number of types of counts of scans. + NumScanCountTypes +) diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go index f5165cee1cd8..0856d1cf0ab3 100644 --- a/pkg/sql/plan_opt.go +++ b/pkg/sql/plan_opt.go @@ -638,9 +638,13 @@ func (opc *optPlanningCtx) runExecBuilder( } planTop.instrumentation.maxFullScanRows = bld.MaxFullScanRows planTop.instrumentation.totalScanRows = bld.TotalScanRows + planTop.instrumentation.totalScanRowsWithoutForecasts = bld.TotalScanRowsWithoutForecasts planTop.instrumentation.nanosSinceStatsCollected = bld.NanosSinceStatsCollected + planTop.instrumentation.nanosSinceStatsForecasted = bld.NanosSinceStatsForecasted planTop.instrumentation.joinTypeCounts = bld.JoinTypeCounts planTop.instrumentation.joinAlgorithmCounts = bld.JoinAlgorithmCounts + planTop.instrumentation.scanCounts = bld.ScanCounts + if gf != nil { planTop.instrumentation.planGist = gf.PlanGist() } diff --git a/pkg/sql/schema_change_plan_node.go b/pkg/sql/schema_change_plan_node.go index fa0eb642cd2e..084bc6c83749 100644 --- a/pkg/sql/schema_change_plan_node.go +++ b/pkg/sql/schema_change_plan_node.go @@ -269,7 +269,7 @@ func newSchemaChangerTxnRunDependencies( // nothing to save because nobody will ever try to resume. scdeps.NewNoOpBackfillerTracker(execCfg.Codec), scdeps.NewNoopPeriodicProgressFlusher(), - execCfg.IndexValidator, + execCfg.Validator, scdeps.NewConstantClock(evalContext.GetTxnTimestamp(time.Microsecond).Time), metaDataUpdater, NewSchemaChangerEventLogger(txn, execCfg, 1), diff --git a/pkg/sql/schemachanger/scdeps/BUILD.bazel b/pkg/sql/schemachanger/scdeps/BUILD.bazel index 1c68c264f92f..c214a653b60f 100644 --- a/pkg/sql/schemachanger/scdeps/BUILD.bazel +++ b/pkg/sql/schemachanger/scdeps/BUILD.bazel @@ -6,8 +6,8 @@ go_library( srcs = [ "build_deps.go", "exec_deps.go", - "index_validator.go", "run_deps.go", + "validator.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scdeps", visibility = ["//visibility:public"], diff --git a/pkg/sql/schemachanger/scdeps/exec_deps.go b/pkg/sql/schemachanger/scdeps/exec_deps.go index 64f2f3108866..c1f823178884 100644 --- a/pkg/sql/schemachanger/scdeps/exec_deps.go +++ b/pkg/sql/schemachanger/scdeps/exec_deps.go @@ -63,7 +63,7 @@ func NewExecutorDependencies( merger scexec.Merger, backfillTracker scexec.BackfillerTracker, backfillFlusher scexec.PeriodicProgressFlusher, - indexValidator scexec.IndexValidator, + validator scexec.Validator, clock scmutationexec.Clock, metadataUpdater scexec.DescriptorMetadataUpdater, eventLogger scexec.EventLogger, @@ -79,7 +79,7 @@ func NewExecutorDependencies( codec: codec, descsCollection: descsCollection, jobRegistry: jobRegistry, - indexValidator: indexValidator, + validator: validator, eventLogger: eventLogger, statsRefresher: statsRefresher, schemaChangerJobID: schemaChangerJobID, @@ -105,7 +105,7 @@ type txnDeps struct { descsCollection *descs.Collection jobRegistry JobRegistry createdJobs []jobspb.JobID - indexValidator scexec.IndexValidator + validator scexec.Validator statsRefresher scexec.StatsRefresher tableStatsToRefresh []descpb.ID eventLogger scexec.EventLogger @@ -395,8 +395,8 @@ func (d *execDeps) PeriodicProgressFlusher() scexec.PeriodicProgressFlusher { return d.periodicProgressFlusher } -func (d *execDeps) IndexValidator() scexec.IndexValidator { - return d.indexValidator +func (d *execDeps) Validator() scexec.Validator { + return d.validator } // IndexSpanSplitter implements the scexec.Dependencies interface. diff --git a/pkg/sql/schemachanger/scdeps/run_deps.go b/pkg/sql/schemachanger/scdeps/run_deps.go index c88db6115f4b..348184c27437 100644 --- a/pkg/sql/schemachanger/scdeps/run_deps.go +++ b/pkg/sql/schemachanger/scdeps/run_deps.go @@ -43,7 +43,7 @@ func NewJobRunDependencies( job *jobs.Job, codec keys.SQLCodec, settings *cluster.Settings, - indexValidator scexec.IndexValidator, + indexValidator scexec.Validator, metadataUpdaterFactory MetadataUpdaterFactory, statsRefresher scexec.StatsRefresher, testingKnobs *scexec.TestingKnobs, @@ -87,7 +87,7 @@ type jobExecutionDeps struct { job *jobs.Job kvTrace bool - indexValidator scexec.IndexValidator + indexValidator scexec.Validator codec keys.SQLCodec settings *cluster.Settings @@ -117,7 +117,7 @@ func (d *jobExecutionDeps) WithTxnInJob(ctx context.Context, fn scrun.JobTxnFunc codec: d.codec, descsCollection: descriptors, jobRegistry: d.jobRegistry, - indexValidator: d.indexValidator, + validator: d.indexValidator, eventLogger: d.eventLoggerFactory(txn), statsRefresher: d.statsRefresher, schemaChangerJobID: d.job.ID(), diff --git a/pkg/sql/schemachanger/scdeps/sctestdeps/test_deps.go b/pkg/sql/schemachanger/scdeps/sctestdeps/test_deps.go index be3ebe276be6..29bb6f4e2d8e 100644 --- a/pkg/sql/schemachanger/scdeps/sctestdeps/test_deps.go +++ b/pkg/sql/schemachanger/scdeps/sctestdeps/test_deps.go @@ -935,7 +935,7 @@ func (s *TestState) WithTxnInJob(ctx context.Context, fn scrun.JobTxnFunc) (err return err } -// ValidateForwardIndexes implements the index validator interface. +// ValidateForwardIndexes implements the validator interface. func (s *TestState) ValidateForwardIndexes( _ context.Context, tbl catalog.TableDescriptor, @@ -950,7 +950,7 @@ func (s *TestState) ValidateForwardIndexes( return nil } -// ValidateInvertedIndexes implements the index validator interface. +// ValidateInvertedIndexes implements the validator interface. func (s *TestState) ValidateInvertedIndexes( _ context.Context, tbl catalog.TableDescriptor, @@ -965,8 +965,8 @@ func (s *TestState) ValidateInvertedIndexes( return nil } -// IndexValidator implements the scexec.Dependencies interface. -func (s *TestState) IndexValidator() scexec.IndexValidator { +// Validator implements the scexec.Dependencies interface. +func (s *TestState) Validator() scexec.Validator { return s } diff --git a/pkg/sql/schemachanger/scdeps/index_validator.go b/pkg/sql/schemachanger/scdeps/validator.go similarity index 83% rename from pkg/sql/schemachanger/scdeps/index_validator.go rename to pkg/sql/schemachanger/scdeps/validator.go index aac2e01eff1f..99e8709e1fae 100644 --- a/pkg/sql/schemachanger/scdeps/index_validator.go +++ b/pkg/sql/schemachanger/scdeps/validator.go @@ -50,7 +50,7 @@ type ValidateInvertedIndexesFn func( // for the internal executor. type NewFakeSessionDataFn func(sv *settings.Values) *sessiondata.SessionData -type indexValidator struct { +type validator struct { db *kv.DB codec keys.SQLCodec settings *cluster.Settings @@ -61,7 +61,7 @@ type indexValidator struct { } // ValidateForwardIndexes checks that the indexes have entries for all the rows. -func (iv indexValidator) ValidateForwardIndexes( +func (vd validator) ValidateForwardIndexes( ctx context.Context, tbl catalog.TableDescriptor, indexes []catalog.Index, @@ -70,14 +70,14 @@ func (iv indexValidator) ValidateForwardIndexes( const withFirstMutationPublic = true const gatherAllInvalid = false - return iv.validateForwardIndexes( - ctx, tbl, indexes, iv.makeHistoricalInternalExecTxnRunner(), + return vd.validateForwardIndexes( + ctx, tbl, indexes, vd.makeHistoricalInternalExecTxnRunner(), withFirstMutationPublic, gatherAllInvalid, override, ) } // ValidateInvertedIndexes checks that the indexes have entries for all the rows. -func (iv indexValidator) ValidateInvertedIndexes( +func (vd validator) ValidateInvertedIndexes( ctx context.Context, tbl catalog.TableDescriptor, indexes []catalog.Index, @@ -86,8 +86,8 @@ func (iv indexValidator) ValidateInvertedIndexes( const withFirstMutationPublic = true const gatherAllInvalid = false - return iv.validateInvertedIndexes( - ctx, iv.codec, tbl, indexes, iv.makeHistoricalInternalExecTxnRunner(), + return vd.validateInvertedIndexes( + ctx, vd.codec, tbl, indexes, vd.makeHistoricalInternalExecTxnRunner(), withFirstMutationPublic, gatherAllInvalid, override, ) } @@ -95,21 +95,21 @@ func (iv indexValidator) ValidateInvertedIndexes( // makeHistoricalInternalExecTxnRunner creates a new transaction runner which // always runs at the same time and that time is the current time as of when // this constructor was called. -func (iv indexValidator) makeHistoricalInternalExecTxnRunner() sqlutil.HistoricalInternalExecTxnRunner { - now := iv.db.Clock().Now() +func (vd validator) makeHistoricalInternalExecTxnRunner() sqlutil.HistoricalInternalExecTxnRunner { + now := vd.db.Clock().Now() return func(ctx context.Context, fn sqlutil.InternalExecFn) error { - validationTxn := iv.db.NewTxn(ctx, "validation") + validationTxn := vd.db.NewTxn(ctx, "validation") err := validationTxn.SetFixedTimestamp(ctx, now) if err != nil { return err } - return fn(ctx, validationTxn, iv.ieFactory.NewInternalExecutor(iv.newFakeSessionData(&iv.settings.SV))) + return fn(ctx, validationTxn, vd.ieFactory.NewInternalExecutor(vd.newFakeSessionData(&vd.settings.SV))) } } -// NewIndexValidator creates a IndexValidator interface +// NewValidator creates a Validator interface // for the new schema changer. -func NewIndexValidator( +func NewValidator( db *kv.DB, codec keys.SQLCodec, settings *cluster.Settings, @@ -117,8 +117,8 @@ func NewIndexValidator( validateForwardIndexes ValidateForwardIndexesFn, validateInvertedIndexes ValidateInvertedIndexesFn, newFakeSessionData NewFakeSessionDataFn, -) scexec.IndexValidator { - return indexValidator{ +) scexec.Validator { + return validator{ db: db, codec: codec, settings: settings, diff --git a/pkg/sql/schemachanger/scexec/dependencies.go b/pkg/sql/schemachanger/scexec/dependencies.go index 4a1450b1d54a..a0b256774c58 100644 --- a/pkg/sql/schemachanger/scexec/dependencies.go +++ b/pkg/sql/schemachanger/scexec/dependencies.go @@ -37,7 +37,7 @@ type Dependencies interface { IndexMerger() Merger BackfillProgressTracker() BackfillerTracker PeriodicProgressFlusher() PeriodicProgressFlusher - IndexValidator() IndexValidator + Validator() Validator IndexSpanSplitter() IndexSpanSplitter EventLogger() EventLogger DescriptorMetadataUpdater(ctx context.Context) DescriptorMetadataUpdater @@ -196,8 +196,8 @@ type Merger interface { ) error } -// IndexValidator provides interfaces that allow indexes to be validated. -type IndexValidator interface { +// Validator provides interfaces that allow indexes and check constraints to be validated. +type Validator interface { ValidateForwardIndexes( ctx context.Context, tbl catalog.TableDescriptor, diff --git a/pkg/sql/schemachanger/scexec/exec_validation.go b/pkg/sql/schemachanger/scexec/exec_validation.go index 60247ec0f9e0..801b7f4169df 100644 --- a/pkg/sql/schemachanger/scexec/exec_validation.go +++ b/pkg/sql/schemachanger/scexec/exec_validation.go @@ -43,9 +43,9 @@ func executeValidateUniqueIndex( User: username.RootUserName(), } if index.GetType() == descpb.IndexDescriptor_FORWARD { - err = deps.IndexValidator().ValidateForwardIndexes(ctx, table, []catalog.Index{index}, execOverride) + err = deps.Validator().ValidateForwardIndexes(ctx, table, []catalog.Index{index}, execOverride) } else { - err = deps.IndexValidator().ValidateInvertedIndexes(ctx, table, []catalog.Index{index}, execOverride) + err = deps.Validator().ValidateInvertedIndexes(ctx, table, []catalog.Index{index}, execOverride) } if err != nil { return scerrors.SchemaChangerUserError(err) diff --git a/pkg/sql/schemachanger/scexec/executor_external_test.go b/pkg/sql/schemachanger/scexec/executor_external_test.go index 8fe363b87b26..4a442f3886d8 100644 --- a/pkg/sql/schemachanger/scexec/executor_external_test.go +++ b/pkg/sql/schemachanger/scexec/executor_external_test.go @@ -75,7 +75,7 @@ func (ti testInfra) newExecDeps( noopMerger{}, scdeps.NewNoOpBackfillerTracker(ti.lm.Codec()), scdeps.NewNoopPeriodicProgressFlusher(), - noopIndexValidator{}, + noopValidator{}, scdeps.NewConstantClock(timeutil.Now()), noopMetadataUpdater{}, noopEventLogger{}, @@ -461,11 +461,11 @@ func (n noopMerger) MergeIndexes( return nil } -type noopIndexValidator struct{} +type noopValidator struct{} -var _ scexec.IndexValidator = noopIndexValidator{} +var _ scexec.Validator = noopValidator{} -func (noopIndexValidator) ValidateForwardIndexes( +func (noopValidator) ValidateForwardIndexes( ctx context.Context, tableDesc catalog.TableDescriptor, indexes []catalog.Index, @@ -474,7 +474,7 @@ func (noopIndexValidator) ValidateForwardIndexes( return nil } -func (noopIndexValidator) ValidateInvertedIndexes( +func (noopValidator) ValidateInvertedIndexes( ctx context.Context, tableDesc catalog.TableDescriptor, indexes []catalog.Index, @@ -571,7 +571,7 @@ func (noopMetadataUpdater) UpsertZoneConfig( } var _ scexec.Backfiller = noopBackfiller{} -var _ scexec.IndexValidator = noopIndexValidator{} +var _ scexec.Validator = noopValidator{} var _ scexec.EventLogger = noopEventLogger{} var _ scexec.StatsRefresher = noopStatsReferesher{} var _ scexec.DescriptorMetadataUpdater = noopMetadataUpdater{} diff --git a/pkg/sql/schemachanger/scexec/mocks_generated_test.go b/pkg/sql/schemachanger/scexec/mocks_generated_test.go index 49a3189d201d..418c3049ee4f 100644 --- a/pkg/sql/schemachanger/scexec/mocks_generated_test.go +++ b/pkg/sql/schemachanger/scexec/mocks_generated_test.go @@ -264,20 +264,6 @@ func (mr *MockDependenciesMockRecorder) IndexSpanSplitter() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexSpanSplitter", reflect.TypeOf((*MockDependencies)(nil).IndexSpanSplitter)) } -// IndexValidator mocks base method. -func (m *MockDependencies) IndexValidator() scexec.IndexValidator { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IndexValidator") - ret0, _ := ret[0].(scexec.IndexValidator) - return ret0 -} - -// IndexValidator indicates an expected call of IndexValidator. -func (mr *MockDependenciesMockRecorder) IndexValidator() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexValidator", reflect.TypeOf((*MockDependencies)(nil).IndexValidator)) -} - // PeriodicProgressFlusher mocks base method. func (m *MockDependencies) PeriodicProgressFlusher() scexec.PeriodicProgressFlusher { m.ctrl.T.Helper() @@ -362,6 +348,20 @@ func (mr *MockDependenciesMockRecorder) User() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "User", reflect.TypeOf((*MockDependencies)(nil).User)) } +// Validator mocks base method. +func (m *MockDependencies) Validator() scexec.Validator { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Validator") + ret0, _ := ret[0].(scexec.Validator) + return ret0 +} + +// Validator indicates an expected call of Validator. +func (mr *MockDependenciesMockRecorder) Validator() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Validator", reflect.TypeOf((*MockDependencies)(nil).Validator)) +} + // MockBackfiller is a mock of Backfiller interface. type MockBackfiller struct { ctrl *gomock.Controller diff --git a/pkg/sql/schemachanger/scjob/job.go b/pkg/sql/schemachanger/scjob/job.go index 23beb8f01369..13b81afa056a 100644 --- a/pkg/sql/schemachanger/scjob/job.go +++ b/pkg/sql/schemachanger/scjob/job.go @@ -81,7 +81,7 @@ func (n *newSchemaChangeResumer) run(ctx context.Context, execCtxI interface{}) n.job, execCfg.Codec, execCfg.Settings, - execCfg.IndexValidator, + execCfg.Validator, func(ctx context.Context, descriptors *descs.Collection, txn *kv.Txn) scexec.DescriptorMetadataUpdater { return descmetadata.NewMetadataUpdater(ctx, execCfg.InternalExecutorFactory, diff --git a/pkg/sql/schemachanger/scop/validation.go b/pkg/sql/schemachanger/scop/validation.go index 2cb8c4ce0fa5..ccd299de0836 100644 --- a/pkg/sql/schemachanger/scop/validation.go +++ b/pkg/sql/schemachanger/scop/validation.go @@ -31,8 +31,8 @@ type ValidateIndex struct { // ValidateCheckConstraint validates a check constraint on a table's columns. type ValidateCheckConstraint struct { validationOp - TableID descpb.ID - Name string + TableID descpb.ID + ConstraintID descpb.ConstraintID } // Make sure baseOp is used for linter. diff --git a/pkg/sql/sem/tree/constant.go b/pkg/sql/sem/tree/constant.go index a7f7bec30b75..2a55f1da0eda 100644 --- a/pkg/sql/sem/tree/constant.go +++ b/pkg/sql/sem/tree/constant.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/errors" "github.com/lib/pq/oid" ) @@ -435,10 +436,10 @@ func intersectTypeSlices(xs, ys []*types.T) (out []*types.T) { // The function takes a slice of Exprs and indexes, but expects all the indexed // Exprs to wrap a Constant. The reason it does no take a slice of Constants // instead is to avoid forcing callers to allocate separate slices of Constant. -func commonConstantType(vals []Expr, idxs []int) (*types.T, bool) { +func commonConstantType(vals []Expr, idxs util.FastIntSet) (*types.T, bool) { var candidates []*types.T - for _, i := range idxs { + for i, ok := idxs.Next(0); ok; i, ok = idxs.Next(i + 1) { availableTypes := vals[i].(Constant).DesirableTypes() if candidates == nil { candidates = availableTypes diff --git a/pkg/sql/sem/tree/overload.go b/pkg/sql/sem/tree/overload.go index 8deaf6925b89..05059f24c93b 100644 --- a/pkg/sql/sem/tree/overload.go +++ b/pkg/sql/sem/tree/overload.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "github.com/lib/pq/oid" @@ -567,9 +568,9 @@ type typeCheckOverloadState struct { overloadIdxs []uint8 // index into overloads exprs []Expr typedExprs []TypedExpr - resolvableIdxs []int // index into exprs/typedExprs - constIdxs []int // index into exprs/typedExprs - placeholderIdxs []int // index into exprs/typedExprs + resolvableIdxs util.FastIntSet // index into exprs/typedExprs + constIdxs util.FastIntSet // index into exprs/typedExprs + placeholderIdxs util.FastIntSet // index into exprs/typedExprs } // typeCheckOverloadedExprs determines the correct overload to use for the given set of @@ -619,11 +620,11 @@ func typeCheckOverloadedExprs( // Hold the resolved type expressions of the provided exprs, in order. s.typedExprs = make([]TypedExpr, len(exprs)) - s.constIdxs, s.placeholderIdxs, s.resolvableIdxs = typeCheckSplitExprs(ctx, semaCtx, exprs) + s.constIdxs, s.placeholderIdxs, s.resolvableIdxs = typeCheckSplitExprs(semaCtx, exprs) // If no overloads are provided, just type check parameters and return. if len(overloads) == 0 { - for _, i := range s.resolvableIdxs { + for i, ok := s.resolvableIdxs.Next(0); ok; i, ok = s.resolvableIdxs.Next(i + 1) { typ, err := exprs[i].TypeCheck(ctx, semaCtx, types.Any) if err != nil { return nil, nil, pgerror.Wrapf(err, pgcode.InvalidParameterValue, @@ -649,7 +650,7 @@ func typeCheckOverloadedExprs( }) // Filter out overloads which constants cannot become. - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { constExpr := exprs[i].(Constant) s.overloadIdxs = filterOverloads(s.overloads, s.overloadIdxs, func(o overloadImpl) bool { @@ -662,7 +663,7 @@ func typeCheckOverloadedExprs( // f(int, float) is not a possible candidate for the expression f($1, $1). // Filter out overloads on resolved types. - for _, i := range s.resolvableIdxs { + for i, ok := s.resolvableIdxs.Next(0); ok; i, ok = s.resolvableIdxs.Next(i + 1) { paramDesired := types.Any // If all remaining candidates require the same type for this parameter, @@ -720,9 +721,10 @@ func typeCheckOverloadedExprs( } var homogeneousTyp *types.T - if len(s.resolvableIdxs) > 0 { - homogeneousTyp = s.typedExprs[s.resolvableIdxs[0]].ResolvedType() - for _, i := range s.resolvableIdxs[1:] { + if !s.resolvableIdxs.Empty() { + idx, _ := s.resolvableIdxs.Next(0) + homogeneousTyp = s.typedExprs[idx].ResolvedType() + for i, ok := s.resolvableIdxs.Next(idx); ok; i, ok = s.resolvableIdxs.Next(i + 1) { if !homogeneousTyp.Equivalent(s.typedExprs[i].ResolvedType()) { homogeneousTyp = nil break @@ -730,7 +732,7 @@ func typeCheckOverloadedExprs( } } - if len(s.constIdxs) > 0 { + if !s.constIdxs.Empty() { allConstantsAreHomogenous := false if ok, typedExprs, fns, err := filterAttempt(ctx, semaCtx, &s, func() { // The second heuristic is to prefer candidates where all constants can @@ -739,14 +741,14 @@ func typeCheckOverloadedExprs( // homogeneously up to this point. if homogeneousTyp != nil { allConstantsAreHomogenous = true - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { if !canConstantBecome(exprs[i].(Constant), homogeneousTyp) { allConstantsAreHomogenous = false break } } if allConstantsAreHomogenous { - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { s.overloadIdxs = filterOverloads(s.overloads, s.overloadIdxs, func(o overloadImpl) bool { return o.params().GetAt(i).Equivalent(homogeneousTyp) @@ -761,7 +763,7 @@ func typeCheckOverloadedExprs( if ok, typedExprs, fns, err := filterAttempt(ctx, semaCtx, &s, func() { // The third heuristic is to prefer candidates where all constants can // become their "natural" types. - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { natural := naturalConstantType(exprs[i].(Constant)) if natural != nil { s.overloadIdxs = filterOverloads(s.overloads, s.overloadIdxs, @@ -804,7 +806,7 @@ func typeCheckOverloadedExprs( if len(s.overloadIdxs) == 1 && allConstantsAreHomogenous { overloadParamsAreHomogenous := true p := s.overloads[s.overloadIdxs[0]].params() - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { if !p.GetAt(i).Equivalent(homogeneousTyp) { overloadParamsAreHomogenous = false break @@ -814,7 +816,7 @@ func typeCheckOverloadedExprs( // Type check our constants using the homogeneous type rather than // the type in overload parameter. This lets us type check user defined // types with a concrete type instance, rather than an ambiguous type. - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { typ, err := s.exprs[i].TypeCheck(ctx, semaCtx, homogeneousTyp) if err != nil { return nil, nil, err @@ -825,7 +827,7 @@ func typeCheckOverloadedExprs( return typedExprs, fn, err } } - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { constExpr := exprs[i].(Constant) s.overloadIdxs = filterOverloads(s.overloads, s.overloadIdxs, func(o overloadImpl) bool { @@ -845,7 +847,7 @@ func typeCheckOverloadedExprs( // keep track of previous overload indexes to return ambiguous error (>1 overloads) // instead of unsupported error (0 overloads) when applicable. prevOverloadIdxs := s.overloadIdxs - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { s.overloadIdxs = filterOverloads(s.overloads, s.overloadIdxs, func(o overloadImpl) bool { return o.params().GetAt(i).Equivalent(bestConstType) @@ -885,12 +887,12 @@ func typeCheckOverloadedExprs( // given the same type as all constants and resolvable expressions. This is // only possible if all constants and resolvable expressions were resolved // homogeneously up to this point. - if homogeneousTyp != nil && len(s.placeholderIdxs) > 0 { + if homogeneousTyp != nil && !s.placeholderIdxs.Empty() { // Before we continue, try to propagate the homogeneous type to the // placeholders. This might not have happened yet, if the overloads' // parameter types are ambiguous (like in the case of tuple-tuple binary // operators). - for _, i := range s.placeholderIdxs { + for i, ok := s.placeholderIdxs.Next(0); ok; i, ok = s.placeholderIdxs.Next(i + 1) { if _, err := exprs[i].TypeCheck(ctx, semaCtx, homogeneousTyp); err != nil { return nil, nil, err } @@ -951,7 +953,7 @@ func typeCheckOverloadedExprs( sCopy.exprs = make([]Expr, len(s.exprs)) copy(sCopy.exprs, s.exprs) if ok, typedExprs, fns, err := filterAttempt(ctx, semaCtx, &sCopy, func() { - for _, idx := range append(s.constIdxs, s.placeholderIdxs...) { + work := func(idx int) { p := params.GetAt(idx) typCast := knownEnum if p.Family() == types.ArrayFamily { @@ -959,6 +961,12 @@ func typeCheckOverloadedExprs( } sCopy.exprs[idx] = &CastExpr{Expr: sCopy.exprs[idx], Type: typCast, SyntaxMode: CastShort} } + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { + work(i) + } + for i, ok := s.placeholderIdxs.Next(0); ok; i, ok = s.placeholderIdxs.Next(i + 1) { + work(i) + } }); ok { return typedExprs, fns, err } @@ -1105,7 +1113,7 @@ func filterOverloads( func defaultTypeCheck( ctx context.Context, semaCtx *SemaContext, s *typeCheckOverloadState, errorOnPlaceholders bool, ) error { - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { typ, err := s.exprs[i].TypeCheck(ctx, semaCtx, types.Any) if err != nil { return pgerror.Wrapf(err, pgcode.InvalidParameterValue, @@ -1113,7 +1121,7 @@ func defaultTypeCheck( } s.typedExprs[i] = typ } - for _, i := range s.placeholderIdxs { + for i, ok := s.placeholderIdxs.Next(0); ok; i, ok = s.placeholderIdxs.Next(i + 1) { if errorOnPlaceholders { _, err := s.exprs[i].TypeCheck(ctx, semaCtx, types.Any) return err @@ -1147,7 +1155,7 @@ func checkReturn( idx := s.overloadIdxs[0] o := s.overloads[idx] p := o.params() - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { des := p.GetAt(i) typ, err := s.exprs[i].TypeCheck(ctx, semaCtx, des) if err != nil { @@ -1180,7 +1188,7 @@ func checkReturnPlaceholdersAtIdx( ) (bool, []TypedExpr, []overloadImpl, error) { o := s.overloads[idx] p := o.params() - for _, i := range s.placeholderIdxs { + for i, ok := s.placeholderIdxs.Next(0); ok; i, ok = s.placeholderIdxs.Next(i + 1) { des := p.GetAt(i) typ, err := s.exprs[i].TypeCheck(ctx, semaCtx, des) if err != nil { diff --git a/pkg/sql/sem/tree/type_check.go b/pkg/sql/sem/tree/type_check.go index 3a662ba032c3..d0dc424c71d3 100644 --- a/pkg/sql/sem/tree/type_check.go +++ b/pkg/sql/sem/tree/type_check.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treecmp" "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/duration" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate" @@ -2310,9 +2311,9 @@ type typeCheckExprsState struct { exprs []Expr typedExprs []TypedExpr - constIdxs []int // index into exprs/typedExprs - placeholderIdxs []int // index into exprs/typedExprs - resolvableIdxs []int // index into exprs/typedExprs + constIdxs util.FastIntSet // index into exprs/typedExprs + placeholderIdxs util.FastIntSet // index into exprs/typedExprs + resolvableIdxs util.FastIntSet // index into exprs/typedExprs } // TypeCheckSameTypedExprs type checks a list of expressions, asserting that all @@ -2347,7 +2348,7 @@ func TypeCheckSameTypedExprs( // TODO(nvanbenschoten): Look into reducing allocations here. typedExprs := make([]TypedExpr, len(exprs)) - constIdxs, placeholderIdxs, resolvableIdxs := typeCheckSplitExprs(ctx, semaCtx, exprs) + constIdxs, placeholderIdxs, resolvableIdxs := typeCheckSplitExprs(semaCtx, exprs) s := typeCheckExprsState{ ctx: ctx, @@ -2360,22 +2361,22 @@ func TypeCheckSameTypedExprs( } switch { - case len(resolvableIdxs) == 0 && len(constIdxs) == 0: + case resolvableIdxs.Empty() && constIdxs.Empty(): if err := typeCheckSameTypedPlaceholders(s, desired); err != nil { return nil, nil, err } return typedExprs, desired, nil - case len(resolvableIdxs) == 0: + case resolvableIdxs.Empty(): return typeCheckConstsAndPlaceholdersWithDesired(s, desired) default: firstValidIdx := -1 firstValidType := types.Unknown - for i, j := range resolvableIdxs { - typedExpr, err := exprs[j].TypeCheck(ctx, semaCtx, desired) + for i, ok := s.resolvableIdxs.Next(0); ok; i, ok = s.resolvableIdxs.Next(i + 1) { + typedExpr, err := exprs[i].TypeCheck(ctx, semaCtx, desired) if err != nil { return nil, nil, err } - typedExprs[j] = typedExpr + typedExprs[i] = typedExpr if returnType := typedExpr.ResolvedType(); returnType.Family() != types.UnknownFamily { firstValidType = returnType firstValidIdx = i @@ -2386,10 +2387,11 @@ func TypeCheckSameTypedExprs( if firstValidType.Family() == types.UnknownFamily { // We got to the end without finding a non-null expression. switch { - case len(constIdxs) > 0: + case !constIdxs.Empty(): return typeCheckConstsAndPlaceholdersWithDesired(s, desired) - case len(placeholderIdxs) > 0: - p := s.exprs[placeholderIdxs[0]].(*Placeholder) + case !placeholderIdxs.Empty(): + next, _ := placeholderIdxs.Next(0) + p := s.exprs[next].(*Placeholder) return nil, nil, placeholderTypeAmbiguityError(p.Idx) default: if desired != types.Any { @@ -2399,7 +2401,7 @@ func TypeCheckSameTypedExprs( } } - for _, i := range resolvableIdxs[firstValidIdx+1:] { + for i, ok := s.resolvableIdxs.Next(firstValidIdx + 1); ok; i, ok = s.resolvableIdxs.Next(i + 1) { typedExpr, err := exprs[i].TypeCheck(ctx, semaCtx, firstValidType) if err != nil { return nil, nil, err @@ -2411,12 +2413,12 @@ func TypeCheckSameTypedExprs( } typedExprs[i] = typedExpr } - if len(constIdxs) > 0 { + if !constIdxs.Empty() { if _, err := typeCheckSameTypedConsts(s, firstValidType, true); err != nil { return nil, nil, err } } - if len(placeholderIdxs) > 0 { + if !placeholderIdxs.Empty() { if err := typeCheckSameTypedPlaceholders(s, firstValidType); err != nil { return nil, nil, err } @@ -2427,7 +2429,7 @@ func TypeCheckSameTypedExprs( // Used to set placeholders to the desired typ. func typeCheckSameTypedPlaceholders(s typeCheckExprsState, typ *types.T) error { - for _, i := range s.placeholderIdxs { + for i, ok := s.placeholderIdxs.Next(0); ok; i, ok = s.placeholderIdxs.Next(i + 1) { typedExpr, err := typeCheckAndRequire(s.ctx, s.semaCtx, s.exprs[i], typ, "placeholder") if err != nil { return err @@ -2444,7 +2446,7 @@ func typeCheckSameTypedConsts( s typeCheckExprsState, typ *types.T, required bool, ) (*types.T, error) { setTypeForConsts := func(typ *types.T) (*types.T, error) { - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { typedExpr, err := typeCheckAndRequire(s.ctx, s.semaCtx, s.exprs[i], typ, "constant") if err != nil { // In this case, even though the constExpr has been shown to be @@ -2460,7 +2462,7 @@ func typeCheckSameTypedConsts( // If typ is not a wildcard, all consts try to become typ. if typ.Family() != types.AnyFamily { all := true - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { if !canConstantBecome(s.exprs[i].(Constant), typ) { if required { typedExpr, err := s.exprs[i].TypeCheck(s.ctx, s.semaCtx, types.Any) @@ -2489,7 +2491,7 @@ func typeCheckSameTypedConsts( // If not, we want to force an error because the constants cannot all // become the same type. reqTyp := typ - for _, i := range s.constIdxs { + for i, ok := s.constIdxs.Next(0); ok; i, ok = s.constIdxs.Next(i + 1) { typedExpr, err := s.exprs[i].TypeCheck(s.ctx, s.semaCtx, reqTyp) if err != nil { return nil, err @@ -2513,7 +2515,7 @@ func typeCheckConstsAndPlaceholdersWithDesired( if err != nil { return nil, nil, err } - if len(s.placeholderIdxs) > 0 { + if !s.placeholderIdxs.Empty() { if err := typeCheckSameTypedPlaceholders(s, typ); err != nil { return nil, nil, err } @@ -2526,16 +2528,16 @@ func typeCheckConstsAndPlaceholdersWithDesired( // - Placeholders // - All other Exprs func typeCheckSplitExprs( - ctx context.Context, semaCtx *SemaContext, exprs []Expr, -) (constIdxs []int, placeholderIdxs []int, resolvableIdxs []int) { + semaCtx *SemaContext, exprs []Expr, +) (constIdxs util.FastIntSet, placeholderIdxs util.FastIntSet, resolvableIdxs util.FastIntSet) { for i, expr := range exprs { switch { case isConstant(expr): - constIdxs = append(constIdxs, i) + constIdxs.Add(i) case semaCtx.isUnresolvedPlaceholder(expr): - placeholderIdxs = append(placeholderIdxs, i) + placeholderIdxs.Add(i) default: - resolvableIdxs = append(resolvableIdxs, i) + resolvableIdxs.Add(i) } } return constIdxs, placeholderIdxs, resolvableIdxs diff --git a/pkg/sql/sqlstats/persistedsqlstats/appStats.go b/pkg/sql/sqlstats/persistedsqlstats/appStats.go index e0e95cb928f0..108782a26761 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/appStats.go +++ b/pkg/sql/sqlstats/persistedsqlstats/appStats.go @@ -44,11 +44,11 @@ func (s *ApplicationStats) RecordStatement( return fingerprintID, err } -// ShouldSaveLogicalPlanDesc implements sqlstats.ApplicationStats interface. -func (s *ApplicationStats) ShouldSaveLogicalPlanDesc( +// ShouldSample implements sqlstats.ApplicationStats interface. +func (s *ApplicationStats) ShouldSample( fingerprint string, implicitTxn bool, database string, -) bool { - return s.ApplicationStats.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, database) +) (bool, bool) { + return s.ApplicationStats.ShouldSample(fingerprint, implicitTxn, database) } // RecordTransaction implements sqlstats.ApplicationStats interface and saves diff --git a/pkg/sql/sqlstats/persistedsqlstats/datadriven_test.go b/pkg/sql/sqlstats/persistedsqlstats/datadriven_test.go index d418b1de14cf..ccf3be56bd00 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/datadriven_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/datadriven_test.go @@ -61,7 +61,7 @@ const ( // system table. // - set-time: this changes the clock time perceived by SQL Stats subsystem. // This is useful when unit tests need to manipulate times. -// - should-sample-logical-plan: this checks if the given tuple of +// - should-sample: this checks if the given tuple of // (db, implicitTxn, fingerprint) will be sampled // next time it is being executed. func TestSQLStatsDataDriven(t *testing.T) { @@ -129,7 +129,7 @@ func TestSQLStatsDataDriven(t *testing.T) { } stubTime.setTime(tm) return stubTime.Now().String() - case "should-sample-logical-plan": + case "should-sample": mustHaveArgsOrFatal(t, d, fingerprintArgs, implicitTxnArgs, dbNameArgs) var dbName string @@ -145,13 +145,12 @@ func TestSQLStatsDataDriven(t *testing.T) { // them. fingerprint = strings.Replace(fingerprint, "%", " ", -1) - return fmt.Sprintf("%t", - appStats.ShouldSaveLogicalPlanDesc( - fingerprint, - implicitTxn, - dbName, - ), + previouslySampled, savePlanForStats := appStats.ShouldSample( + fingerprint, + implicitTxn, + dbName, ) + return fmt.Sprintf("%t, %t", previouslySampled, savePlanForStats) } return "" diff --git a/pkg/sql/sqlstats/persistedsqlstats/testdata/logical_plan_sampling_for_explicit_txn b/pkg/sql/sqlstats/persistedsqlstats/testdata/logical_plan_sampling_for_explicit_txn index d752d06d4860..9bda9efe7732 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/testdata/logical_plan_sampling_for_explicit_txn +++ b/pkg/sql/sqlstats/persistedsqlstats/testdata/logical_plan_sampling_for_explicit_txn @@ -14,9 +14,9 @@ set-time time=2021-09-20T15:00:00Z # Logical plan should be sampled here, since we have not collected logical plan # at all. -should-sample-logical-plan db=defaultdb implicitTxn=true fingerprint=SELECT%_ +should-sample db=defaultdb implicitTxn=true fingerprint=SELECT%_ ---- -true +false, true # Execute the query to trigger a collection of logical plan. # (db_name=defaultdb implicitTxn=true fingerprint=SELECT _) @@ -26,15 +26,15 @@ SELECT 1 # Ensure that if a query is to be subsequently executed, it will not cause # logical plan sampling. -should-sample-logical-plan db=defaultdb implicitTxn=true fingerprint=SELECT%_ +should-sample db=defaultdb implicitTxn=true fingerprint=SELECT%_ ---- -false +true, false # However, if we are to execute the same statement but under explicit # transaction, the plan will still need to be sampled. -should-sample-logical-plan db=defaultdb implicitTxn=false fingerprint=SELECT%_ +should-sample db=defaultdb implicitTxn=false fingerprint=SELECT%_ ---- -true +false, true # Execute the statement under explicit transaction. # (db_name=defaultdb implicitTxn=false fingerprint=SELECT _) @@ -46,9 +46,9 @@ COMMIT # Ensure that the subsequent execution of the query will not cause logical plan # collection. -should-sample-logical-plan db=defaultdb implicitTxn=false fingerprint=SELECT%_ +should-sample db=defaultdb implicitTxn=false fingerprint=SELECT%_ ---- -false +true, false # Set the time to the future and ensure we will resample the logical plan. set-time time=2021-09-20T15:05:01Z @@ -56,22 +56,22 @@ set-time time=2021-09-20T15:05:01Z 2021-09-20 15:05:01 +0000 UTC -should-sample-logical-plan db=defaultdb implicitTxn=true fingerprint=SELECT%_ +should-sample db=defaultdb implicitTxn=true fingerprint=SELECT%_ ---- -true +true, true # implicit txn exec-sql SELECT 1 ---- -should-sample-logical-plan db=defaultdb implicitTxn=true fingerprint=SELECT%_ +should-sample db=defaultdb implicitTxn=true fingerprint=SELECT%_ ---- -false +true, true -should-sample-logical-plan db=defaultdb implicitTxn=false fingerprint=SELECT%_ +should-sample db=defaultdb implicitTxn=false fingerprint=SELECT%_ ---- -true +true, true # explicit txn exec-sql @@ -80,6 +80,6 @@ SELECT 1 COMMIT ---- -should-sample-logical-plan db=defaultdb implicitTxn=false fingerprint=SELECT%_ +should-sample db=defaultdb implicitTxn=false fingerprint=SELECT%_ ---- -false +true, true diff --git a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go index 8d734d33fa27..89655683f276 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go @@ -115,18 +115,21 @@ func (s *StatsCollector) EndTransaction( s.flushTarget = nil } -// ShouldSaveLogicalPlanDesc implements sqlstats.StatsCollector interface. -func (s *StatsCollector) ShouldSaveLogicalPlanDesc( +// ShouldSample implements sqlstats.StatsCollector interface. +func (s *StatsCollector) ShouldSample( fingerprint string, implicitTxn bool, database string, -) bool { - foundInFlushTarget := true +) (previouslySampled bool, savePlanForStats bool) { + sampledInFlushTarget := false + savePlanForStatsInFlushTarget := true if s.flushTarget != nil { - foundInFlushTarget = s.flushTarget.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, database) + sampledInFlushTarget, savePlanForStatsInFlushTarget = s.flushTarget.ShouldSample(fingerprint, implicitTxn, database) } - return foundInFlushTarget && - s.ApplicationStats.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, database) + sampledInAppStats, savePlanForStatsInAppStats := s.ApplicationStats.ShouldSample(fingerprint, implicitTxn, database) + previouslySampled = sampledInFlushTarget || sampledInAppStats + savePlanForStats = savePlanForStatsInFlushTarget && savePlanForStatsInAppStats + return previouslySampled, savePlanForStats } // UpgradeImplicitTxn implements sqlstats.StatsCollector interface. diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index a07c15774e28..f6e26f28f1e8 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -692,7 +692,7 @@ func (s *Container) MergeApplicationStatementStats( defer stmtStats.mu.Unlock() stmtStats.mergeStatsLocked(statistics) - planLastSampled := s.getLogicalPlanLastSampled(key.sampledPlanKey) + planLastSampled, _ := s.getLogicalPlanLastSampled(key.sampledPlanKey) if planLastSampled.Before(stmtStats.mu.data.SensitiveInfo.MostRecentPlanTimestamp) { s.setLogicalPlanLastSampled(key.sampledPlanKey, stmtStats.mu.data.SensitiveInfo.MostRecentPlanTimestamp) } @@ -922,16 +922,13 @@ func (s *transactionCounts) recordTransactionCounts( } } -func (s *Container) getLogicalPlanLastSampled(key sampledPlanKey) time.Time { +func (s *Container) getLogicalPlanLastSampled( + key sampledPlanKey, +) (lastSampled time.Time, found bool) { s.mu.Lock() defer s.mu.Unlock() - - lastSampled, found := s.mu.sampledPlanMetadataCache[key] - if !found { - return time.Time{} - } - - return lastSampled + lastSampled, found = s.mu.sampledPlanMetadataCache[key] + return lastSampled, found } func (s *Container) setLogicalPlanLastSampled(key sampledPlanKey, time time.Time) { diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 4c5056637097..0ec4c82831c3 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -227,16 +227,17 @@ func (s *Container) RecordStatementExecStats( return nil } -// ShouldSaveLogicalPlanDesc implements sqlstats.Writer interface. -func (s *Container) ShouldSaveLogicalPlanDesc( +// ShouldSample implements sqlstats.Writer interface. +func (s *Container) ShouldSample( fingerprint string, implicitTxn bool, database string, -) bool { - lastSampled := s.getLogicalPlanLastSampled(sampledPlanKey{ +) (previouslySampled, savePlanForStats bool) { + lastSampled, previouslySampled := s.getLogicalPlanLastSampled(sampledPlanKey{ stmtNoConstants: fingerprint, implicitTxn: implicitTxn, database: database, }) - return s.shouldSaveLogicalPlanDescription(lastSampled) + savePlanForStats = s.shouldSaveLogicalPlanDescription(lastSampled) + return previouslySampled, savePlanForStats } // RecordTransaction implements sqlstats.Writer interface and saves diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go index 59fa3d79fc01..5f80ebf575be 100644 --- a/pkg/sql/sqlstats/ssprovider.go +++ b/pkg/sql/sqlstats/ssprovider.go @@ -37,9 +37,11 @@ type Writer interface { // This is sampled and not recorded for every single statement. RecordStatementExecStats(key roachpb.StatementStatisticsKey, stats execstats.QueryLevelStats) error - // ShouldSaveLogicalPlanDesc returns whether we should save the logical plan - // description for a given combination of statement metadata. - ShouldSaveLogicalPlanDesc(fingerprint string, implicitTxn bool, database string) bool + // ShouldSample returns two booleans, the first one indicates whether we + // ever sampled (i.e. collected statistics for) the given combination of + // statement metadata, and the second one whether we should save the logical + // plan description for it. + ShouldSample(fingerprint string, implicitTxn bool, database string) (previouslySampled, savePlanForStats bool) // RecordTransaction records statistics for a transaction. RecordTransaction(ctx context.Context, key roachpb.TransactionFingerprintID, value RecordedTxnStats) error diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index 9e58752fa0f6..5be7972051cb 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -12,6 +12,7 @@ package sql import ( "context" + "encoding/json" "fmt" "math" "regexp" @@ -897,3 +898,276 @@ func TestTelemetryLogJoinTypesAndAlgorithms(t *testing.T) { } } } + +// TestTelemetryScanCounts tests that scans with and without forecasted +// statistics are counted correctly. It also tests that other statistics +// forecasting telemetry is counted correctly. +func TestTelemetryScanCounts(t *testing.T) { + defer leaktest.AfterTest(t)() + sc := log.ScopeWithoutShowLogs(t) + defer sc.Close(t) + + cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t) + defer cleanup() + + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + db := sqlutils.MakeSQLRunner(sqlDB) + defer s.Stopper().Stop(context.Background()) + + db.Exec(t, "SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true;") + db.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false;") + db.Exec(t, "CREATE TABLE d (d PRIMARY KEY) AS SELECT generate_series(10, 16);") + db.Exec(t, "CREATE TABLE e (e PRIMARY KEY) AS SELECT generate_series(0, 19);") + db.Exec(t, "CREATE TABLE f (f PRIMARY KEY) AS SELECT generate_series(5, 8) * 2;") + db.Exec(t, `ALTER TABLE e INJECT STATISTICS '[ + { + "avg_size": 1, + "columns": [ + "e" + ], + "created_at": "2017-08-05 00:00:00.000000", + "distinct_count": 20, + "histo_buckets": [ + { + "distinct_range": 0, + "num_eq": 1, + "num_range": 0, + "upper_bound": "0" + }, + { + "distinct_range": 18, + "num_eq": 1, + "num_range": 18, + "upper_bound": "20" + } + ], + "histo_col_type": "INT8", + "histo_version": 2, + "name": "__auto__", + "null_count": 0, + "row_count": 20 + } +]';`) + db.Exec(t, `ALTER TABLE f INJECT STATISTICS '[ + { + "avg_size": 1, + "columns": [ + "f" + ], + "created_at": "2017-05-07 00:00:00.000000", + "distinct_count": 1, + "histo_buckets": [ + { + "distinct_range": 0, + "num_eq": 0, + "num_range": 0, + "upper_bound": "1" + }, + { + "distinct_range": 1, + "num_eq": 0, + "num_range": 1, + "upper_bound": "11" + } + ], + "histo_col_type": "INT8", + "histo_version": 2, + "name": "__auto__", + "null_count": 0, + "row_count": 1 + }, + { + "avg_size": 1, + "columns": [ + "f" + ], + "created_at": "2017-05-08 00:00:00.000000", + "distinct_count": 2, + "histo_buckets": [ + { + "distinct_range": 0, + "num_eq": 0, + "num_range": 0, + "upper_bound": "3" + }, + { + "distinct_range": 2, + "num_eq": 0, + "num_range": 2, + "upper_bound": "13" + } + ], + "histo_col_type": "INT8", + "histo_version": 2, + "name": "__auto__", + "null_count": 0, + "row_count": 2 + }, + { + "avg_size": 1, + "columns": [ + "f" + ], + "created_at": "2017-05-09 00:00:00.000000", + "distinct_count": 3, + "histo_buckets": [ + { + "distinct_range": 0, + "num_eq": 0, + "num_range": 0, + "upper_bound": "5" + }, + { + "distinct_range": 3, + "num_eq": 0, + "num_range": 3, + "upper_bound": "15" + } + ], + "histo_col_type": "INT8", + "histo_version": 2, + "name": "__auto__", + "null_count": 0, + "row_count": 3 + } +]';`) + + testData := []struct { + query string + logStmt string + scanCount float64 + scanWithStatsCount float64 + scanWithStatsForecastCount float64 + totalScanRowsEstimate float64 + totalScanRowsWithoutForecastsEstimate float64 + }{ + { + query: "SELECT 1", + logStmt: "SELECT ‹1›", + }, + { + query: "SELECT * FROM d WHERE true", + logStmt: `SELECT * FROM \"\".\"\".d WHERE ‹true›`, + + scanCount: 1, + }, + { + query: "SELECT * FROM e WHERE true", + logStmt: `SELECT * FROM \"\".\"\".e WHERE ‹true›`, + + scanCount: 1, + scanWithStatsCount: 1, + totalScanRowsEstimate: 20, + totalScanRowsWithoutForecastsEstimate: 20, + }, + { + query: "SELECT * FROM f WHERE true", + logStmt: `SELECT * FROM \"\".\"\".f WHERE ‹true›`, + + scanCount: 1, + scanWithStatsCount: 1, + scanWithStatsForecastCount: 1, + totalScanRowsEstimate: 4, + totalScanRowsWithoutForecastsEstimate: 3, + }, + { + query: "SELECT * FROM d INNER HASH JOIN e ON d = e INNER HASH JOIN f ON e = f", + logStmt: `SELECT * FROM \"\".\"\".d INNER HASH JOIN \"\".\"\".e ON d = e INNER HASH JOIN \"\".\"\".f ON e = f`, + + scanCount: 3, + scanWithStatsCount: 2, + scanWithStatsForecastCount: 1, + totalScanRowsEstimate: 24, + totalScanRowsWithoutForecastsEstimate: 23, + }, + } + + for _, tc := range testData { + db.Exec(t, tc.query) + } + + log.Flush() + + entries, err := log.FetchEntriesFromFiles( + 0, + math.MaxInt64, + 10000, + regexp.MustCompile(`"EventType":"sampled_query"`), + log.WithMarkedSensitiveData, + ) + + if err != nil { + t.Fatal(err) + } + + if len(entries) == 0 { + t.Fatal(errors.Newf("no entries found")) + } + + t.Log("testcases") +cases: + for _, tc := range testData { + for i := len(entries) - 1; i >= 0; i-- { + if strings.Contains(entries[i].Message, tc.logStmt) { + var entry map[string]interface{} + if err := json.Unmarshal([]byte(entries[i].Message), &entry); err != nil { + t.Error(err) + continue cases + } + get := func(key string) float64 { + if val, ok := entry[key]; ok { + return val.(float64) + } + return 0 + } + + if get("ScanCount") != tc.scanCount { + t.Errorf( + "query `%s` expected ScanCount %v, was: %v", + tc.query, tc.scanCount, get("ScanCount"), + ) + } + if get("ScanWithStatsCount") != tc.scanWithStatsCount { + t.Errorf( + "query `%s` expected ScanWithStatsCount %v, was: %v", + tc.query, tc.scanWithStatsCount, get("ScanWithStatsCount"), + ) + } + if get("ScanWithStatsForecastCount") != tc.scanWithStatsForecastCount { + t.Errorf( + "query `%s` expected ScanWithStatsForecastCount %v, was: %v", + tc.query, tc.scanWithStatsForecastCount, get("ScanWithStatsForecastCount"), + ) + } + if get("TotalScanRowsEstimate") != tc.totalScanRowsEstimate { + t.Errorf( + "query `%s` expected TotalScanRowsEstimate %v, was: %v", + tc.query, tc.totalScanRowsEstimate, get("TotalScanRowsEstimate"), + ) + } + if get("TotalScanRowsWithoutForecastsEstimate") != tc.totalScanRowsWithoutForecastsEstimate { + t.Errorf( + "query `%s` expected TotalScanRowsWithoutForecastsEstimate %v, was: %v", + tc.query, tc.totalScanRowsWithoutForecastsEstimate, get("TotalScanRowsWithoutForecastsEstimate"), + ) + } + if tc.scanWithStatsForecastCount > 0 { + if get("NanosSinceStatsForecasted") <= 0 { + t.Errorf( + "query `%s` expected NanosSinceStatsForecasted > 0, was: %v", + tc.query, get("NanosSinceStatsForecasted"), + ) + } + if get("NanosSinceStatsForecasted") >= get("NanosSinceStatsCollected") { + t.Errorf( + "query `%s` expected NanosSinceStatsForecasted < NanosSinceStatsCollected: %v, %v", + tc.query, get("NanosSinceStatsForecasted"), get("NanosSinceStatsCollected"), + ) + } + } + continue cases + } + } + t.Errorf("couldn't find log entry containing `%s`", tc.logStmt) + } +} diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index 902f507bf1b5..683fb6641c77 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -4266,6 +4266,51 @@ func (m *SampledQuery) AppendJSONFields(printComma bool, b redact.RedactableByte b = append(b, ']') } + if m.ScanCount != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"ScanCount\":"...) + b = strconv.AppendInt(b, int64(m.ScanCount), 10) + } + + if m.ScanWithStatsCount != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"ScanWithStatsCount\":"...) + b = strconv.AppendInt(b, int64(m.ScanWithStatsCount), 10) + } + + if m.ScanWithStatsForecastCount != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"ScanWithStatsForecastCount\":"...) + b = strconv.AppendInt(b, int64(m.ScanWithStatsForecastCount), 10) + } + + if m.TotalScanRowsWithoutForecastsEstimate != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"TotalScanRowsWithoutForecastsEstimate\":"...) + b = strconv.AppendFloat(b, float64(m.TotalScanRowsWithoutForecastsEstimate), 'f', -1, 64) + } + + if m.NanosSinceStatsForecasted != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"NanosSinceStatsForecasted\":"...) + b = strconv.AppendInt(b, int64(m.NanosSinceStatsForecasted), 10) + } + return printComma, b } diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 390f07c5329a..4e662cdca710 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -164,6 +164,26 @@ message SampledQuery { // Generated index recommendations for this query. repeated string index_recommendations = 45 [(gogoproto.jsontag) = ',omitempty', (gogoproto.moretags) = "redact:\"nonsensitive\""]; + // The number of scans in the query plan. + int64 scan_count = 46 [(gogoproto.jsontag) = ",omitempty"]; + + // The number of scans using statistics (including forecasted statistics) in + // the query plan. + int64 scan_with_stats_count = 47 [(gogoproto.jsontag) = ",omitempty"]; + + // The number of scans using forecasted statistics in the query plan. + int64 scan_with_stats_forecast_count = 48 [(gogoproto.jsontag) = ",omitempty"]; + + // Total number of rows read by all scans in the query, as estimated by the + // optimizer without using forecasts. + double total_scan_rows_without_forecasts_estimate = 49 [(gogoproto.jsontag) = ",omitempty"]; + + // The greatest quantity of nanoseconds that have passed since the forecast + // time (or until the forecast time, if it is in the future, in which case it + // will be negative) for any table with forecasted stats scanned by this + // query. + int64 nanos_since_stats_forecasted = 50 [(gogoproto.jsontag) = ",omitempty"]; + reserved 12; }