Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
88539: sql: add telemetry for statistics forecast usage r=rytaft a=michae2

Add a few fields to the sampled_query telemetry events that will help us
measure how useful table statistics forecasting is in practice.

Fixes: #86356

Release note (ops change): Add five new fields to the sampled_query
telemetry events:
- `ScanCount`: Number of scans in the query plan.
- `ScanWithStatsCount`: Number of scans using statistics (including
  forecasted statistics) in the query plan.
- `ScanWithStatsForecastCount`: Number of scans using forecasted
  statistics in the query plan.
- `TotalScanRowsWithoutForecastsEstimate`: Total number of rows read by
  all scans in the query, as estimated by the optimizer without using
  forecasts.
- `NanosSinceStatsForecasted`: The greatest quantity of nanoseconds that
  have passed since the forecast time (or until the forecast time, if it
  is in the future, in which case it will be negative) for any table
  with forecasted stats scanned by this query.

89418: sqlstats: always enable tracing first time fingerprint is seen r=j82w a=j82w

Fixes: #89185

The first time a fingerprint is seen tracing should be enabled. This currently is broken if
sql.metrics.statement_details.plan_collection.enabled is set to false. This can cause crdb_internal.transaction_contention_events to be empty because tracing was never enabled to the contention event was never recorded.

To properly fix this a new value needs to be returned on ShouldSample to tell if it is the first time a fingerprint is seen. This will remove the dependency on plan_collection feature switch.

Release justification: Bug fixes and low-risk updates to new functionality.

Release note (bug fix): Always enable tracing the frist time a fingerprint is seen.

89502: kvserver: do not report diff in consistency checks r=erikgrinaker,tbg a=pavelkalinnikov

This commit removes reporting of the diff between replicas in case of consistency check failures. With the increased range sizes the previous approach has become infeasible.

One possible way to inspect an inconsistency after this change is:
1. Run `cockroach debug range-data` tool to extract the range data from each replica's checkpoint.
2. Use standard OS tools like `diff` to analyse them.

In the meantime, we are researching the UX of this alternative approach, and seeing if there can be a better tooling support.

Part of #21128
Epic: none

Release note (sql change): The `crdb_internal.check_consistency` function now does not include the diff between inconsistent replicas, should they occur. If an inconsistency occurs, the storage engine checkpoints should be inspected. This change is made because previously the range size limit has been increased from 64 MiB to O(GiB), so inlining diffs in consistency checks does not scale.

89529: changefeedccl: update parallel consumer metrics r=jayshrivastava a=jayshrivastava

Previously, both changefeed.nprocs_flush_nanos and changefeed.nprocs_consume_event_nanos were counters that monotonically increased. This was not that useful when determining the average time it takes to consume or flush an event. Changing them to a histogram fixes this issue and allows for percentile values like p90, p99.

This change also updates changefeed.nprocs_in_flight_count to sample values when incrementing inFlight variable. Previously, it was showing up at 0 in the UI. This change makes it show the actual value.

Fixes #89654

Release note: None
Epic: none

89660: tree: use FastIntSet during typechecking r=jordanlewis a=jordanlewis

Previously, the typecheck phase used several slices of ordinals into lists. This is a perfect use case for FastIntSet, because the ordinals tend to be quite small. This commit switches to use FastIntSet instead.

```
name          old time/op    new time/op    delta
TypeCheck-10    3.68µs ± 3%    3.52µs ± 2%   -4.52%  (p=0.000 n=9+10)

name          old alloc/op   new alloc/op   delta
TypeCheck-10      744B ± 0%      576B ± 0%  -22.58%  (p=0.000 n=10+10)

name          old allocs/op  new allocs/op  delta
TypeCheck-10      32.0 ± 0%      18.0 ± 0%  -43.75%  (p=0.000 n=10+10)
```
Issue: None
Epic: None
Release note: None

89662: scop, scdep: Rename `IndexValidator` to `Validator` r=Xiang-Gu a=Xiang-Gu

I've recently done work to enable adding/dropping check constraints in the declarative
schema changer. It is a big PR with many commits. I think it's nicer to separate them
further into multiple PRs. 

This is the first PR in that effort, which is merely renaming and should be easy to review:
commit 1: ValidateCheckConstraint now uses ConstraintID, instead of constraint name;

commit 2: Rename `IndexValidator` to `Validator`.
We previously had a file under scdep called `index_validator.go` where we implement
logic for validating an index. Now that we are going to validate a check constraint, we 
renamed them so they will also validate check constraints.

Informs #89665
Release note: None

Co-authored-by: Michael Erickson <[email protected]>
Co-authored-by: j82w <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Jordan Lewis <[email protected]>
Co-authored-by: Xiang Gu <[email protected]>
  • Loading branch information
7 people committed Oct 10, 2022
7 parents cd99e13 + ab38868 + 9edf2c8 + ec704f6 + a8319ee + a9d3c81 + 239786a commit 94e2a2b
Show file tree
Hide file tree
Showing 44 changed files with 780 additions and 246 deletions.
5 changes: 5 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2817,6 +2817,11 @@ contains common SQL event/execution details.
| `KVRowsRead` | The number of rows read at the KV layer for this query. | no |
| `NetworkMessages` | The number of network messages sent by nodes for this query. | no |
| `IndexRecommendations` | Generated index recommendations for this query. | no |
| `ScanCount` | The number of scans in the query plan. | no |
| `ScanWithStatsCount` | The number of scans using statistics (including forecasted statistics) in the query plan. | no |
| `ScanWithStatsForecastCount` | The number of scans using forecasted statistics in the query plan. | no |
| `TotalScanRowsWithoutForecastsEstimate` | Total number of rows read by all scans in the query, as estimated by the optimizer without using forecasts. | no |
| `NanosSinceStatsForecasted` | The greatest quantity of nanoseconds that have passed since the forecast time (or until the forecast time, if it is in the future, in which case it will be negative) for any table with forecasted stats scanned by this query. | no |


#### Common fields
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (c *parallelEventConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Eve
startTime := timeutil.Now().UnixNano()
defer func() {
time := timeutil.Now().UnixNano()
c.metrics.ParallelConsumerConsumeNanos.Inc(time - startTime)
c.metrics.ParallelConsumerConsumeNanos.RecordValue(time - startTime)
}()

bucket := c.getBucketForEvent(ev)
Expand Down Expand Up @@ -486,14 +486,13 @@ func (c *parallelEventConsumer) workerLoop(
func (c *parallelEventConsumer) incInFlight() {
c.mu.Lock()
c.mu.inFlight++
c.metrics.ParallelConsumerInFlightEvents.Update(int64(c.mu.inFlight))
c.mu.Unlock()
c.metrics.ParallelConsumerInFlightEvents.Inc(1)
}

func (c *parallelEventConsumer) decInFlight() {
c.mu.Lock()
c.mu.inFlight--
c.metrics.ParallelConsumerInFlightEvents.Dec(1)
notifyFlush := c.mu.waiting && c.mu.inFlight == 0
c.mu.Unlock()

Expand Down Expand Up @@ -521,7 +520,7 @@ func (c *parallelEventConsumer) Flush(ctx context.Context) error {
startTime := timeutil.Now().UnixNano()
defer func() {
time := timeutil.Now().UnixNano()
c.metrics.ParallelConsumerFlushNanos.Inc(time - startTime)
c.metrics.ParallelConsumerFlushNanos.RecordValue(time - startTime)
}()

needFlush := func() bool {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,8 @@ type Metrics struct {
FrontierUpdates *metric.Counter
ThrottleMetrics cdcutils.Metrics
ReplanCount *metric.Counter
ParallelConsumerFlushNanos *metric.Counter
ParallelConsumerConsumeNanos *metric.Counter
ParallelConsumerFlushNanos *metric.Histogram
ParallelConsumerConsumeNanos *metric.Histogram
ParallelConsumerInFlightEvents *metric.Gauge

mu struct {
Expand Down Expand Up @@ -609,8 +609,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
ReplanCount: metric.NewCounter(metaChangefeedReplanCount),
ParallelConsumerFlushNanos: metric.NewCounter(metaChangefeedEventConsumerFlushNanos),
ParallelConsumerConsumeNanos: metric.NewCounter(metaChangefeedEventConsumerConsumeNanos),
ParallelConsumerFlushNanos: metric.NewHistogram(metaChangefeedEventConsumerFlushNanos, histogramWindow, metric.IOLatencyBuckets),
ParallelConsumerConsumeNanos: metric.NewHistogram(metaChangefeedEventConsumerConsumeNanos, histogramWindow, metric.IOLatencyBuckets),
ParallelConsumerInFlightEvents: metric.NewGauge(metaChangefeedEventConsumerInFlightEvents),
}

Expand Down
12 changes: 2 additions & 10 deletions pkg/kv/kvserver/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,14 @@ message CollectChecksumRequest {
bytes checksum_id = 3 [(gogoproto.nullable) = false,
(gogoproto.customname) = "ChecksumID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
reserved 4;
// If true then the response must include the snapshot of the data from which
// the checksum is computed.
bool with_snapshot = 5;
reserved 4, 5;
}

message CollectChecksumResponse {
// The checksum is the sha512 hash of the requested computation. It is empty
// if the computation failed.
bytes checksum = 1;
// snapshot is set if the with_snapshot in CollectChecksumRequest is true. For
// example, it can be set by the caller when it has detected an inconsistency.
//
// TODO(tschottdorf): with larger ranges, this is no longer tenable.
// See https://github.com/cockroachdb/cockroach/issues/21128.
roachpb.RaftSnapshotData snapshot = 2;
reserved 2;
// delta carries the stats of the range minus the recomputed stats.
storage.enginepb.MVCCStatsDelta delta = 3 [(gogoproto.nullable) = false];
// persisted carries the persisted stats of the replica.
Expand Down
11 changes: 5 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_compute_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,11 @@ func ComputeChecksum(

var pd result.Result
pd.Replicated.ComputeChecksum = &kvserverpb.ComputeChecksum{
Version: args.Version,
ChecksumID: reply.ChecksumID,
SaveSnapshot: args.Snapshot,
Mode: args.Mode,
Checkpoint: args.Checkpoint,
Terminate: args.Terminate,
Version: args.Version,
ChecksumID: reply.ChecksumID,
Mode: args.Mode,
Checkpoint: args.Checkpoint,
Terminate: args.Terminate,
}
return pd, nil
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,12 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// TODO(pavelkalinnikov): not if we remove TestingSetRedactable below?
skip.UnderRaceWithIssue(t, 81819, "slow test, and TestingSetRedactable triggers race detector")

// This test prints a consistency checker diff, so it's
// good to make sure we're overly redacting said diff.
// TODO(pavelkalinnikov): remove this since we don't print diffs anymore?
defer log.TestingSetRedactable(true)()

// Test expects simple MVCC value encoding.
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ message ComputeChecksum {
// that hardcoded in the binary will a computation be carried out.
uint32 version = 5;

// SaveSnapshot indicates that the snapshot used to compute the checksum
// should be saved so that a diff of divergent replicas can later be computed.
bool save_snapshot = 2;
reserved 2;
roachpb.ChecksumMode mode = 3;
// If set, a checkpoint (i.e. cheap backup) of the engine will be taken. This
// is expected to be set only if we already know that there is an
Expand Down
57 changes: 22 additions & 35 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ type replicaChecksum struct {
// other replicas. These are inspected and a CheckConsistencyResponse is assembled.
//
// When req.Mode is CHECK_VIA_QUEUE and an inconsistency is detected, the
// consistency check will be re-run to collect a diff, which is then printed
// before calling `log.Fatal`. This behavior should be lifted to the consistency
// consistency check will be re-run to save storage engine checkpoints and
// terminate suspicious nodes. This behavior should be lifted to the consistency
// checker queue in the future.
func (r *Replica) CheckConsistency(
ctx context.Context, req roachpb.CheckConsistencyRequest,
Expand Down Expand Up @@ -146,12 +146,6 @@ func (r *Replica) checkConsistencyImpl(
&results[idx].Response.Delta,
)
}
minoritySnap := results[shaToIdxs[minoritySHA][0]].Response.Snapshot
curSnap := results[shaToIdxs[sha][0]].Response.Snapshot
if sha != minoritySHA && minoritySnap != nil && curSnap != nil {
diff := DiffRange(curSnap, minoritySnap)
buf.Printf("====== diff(%x, [minority]) ======\n%v", redact.Safe(sha), diff)
}
}

if isQueue {
Expand Down Expand Up @@ -246,18 +240,17 @@ func (r *Replica) checkConsistencyImpl(
return resp, roachpb.NewError(err)
}

if args.Snapshot {
// A diff was already printed. Return because all the code below will do
// is request another consistency check, with a diff and with
// instructions to terminate the minority nodes.
if args.Checkpoint {
// A checkpoint/termination request has already been sent. Return because
// all the code below will do is request another consistency check, with
// instructions to make a checkpoint and to terminate the minority nodes.
log.Errorf(ctx, "consistency check failed")
return resp, nil
}

// No diff was printed, so we want to re-run the check with snapshots
// requested, to build the diff. Note that this recursive call will be
// terminated in the `args.Snapshot` branch above.
args.Snapshot = true
// No checkpoint was requested, so we want to re-run the check with
// checkpoints and termination of suspicious nodes. Note that this recursive
// call will be terminated in the `args.Checkpoint` branch above.
args.Checkpoint = true
for _, idxs := range shaToIdxs[minoritySHA] {
args.Terminate = append(args.Terminate, results[idxs].Replica)
Expand All @@ -279,10 +272,11 @@ func (r *Replica) checkConsistencyImpl(
//
// See:
// https://github.com/cockroachdb/cockroach/issues/36861
// TODO(pavelkalinnikov): remove this now that diffs are not printed?
defer log.TemporarilyDisableFileGCForMainLogger()()

if _, pErr := r.checkConsistencyImpl(ctx, args); pErr != nil {
log.Errorf(ctx, "replica inconsistency detected; could not obtain actual diff: %s", pErr)
log.Errorf(ctx, "replica inconsistency detected; second round failed: %s", pErr)
}

return resp, nil
Expand All @@ -296,7 +290,7 @@ type ConsistencyCheckResult struct {
}

func (r *Replica) collectChecksumFromReplica(
ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, withSnap bool,
ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID,
) (CollectChecksumResponse, error) {
conn, err := r.store.cfg.NodeDialer.Dial(ctx, replica.NodeID, rpc.DefaultClass)
if err != nil {
Expand All @@ -308,7 +302,6 @@ func (r *Replica) collectChecksumFromReplica(
StoreRequestHeader: StoreRequestHeader{NodeID: replica.NodeID, StoreID: replica.StoreID},
RangeID: r.RangeID,
ChecksumID: id,
WithSnapshot: withSnap,
}
resp, err := client.CollectChecksum(ctx, req)
if err != nil {
Expand Down Expand Up @@ -350,7 +343,7 @@ func (r *Replica) runConsistencyCheck(
if err := r.store.Stopper().RunAsyncTask(ctx, "storage.Replica: checking consistency",
func(ctx context.Context) {
defer wg.Done()
resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, req.Snapshot)
resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID)
resultCh <- ConsistencyCheckResult{
Replica: replica,
Response: resp,
Expand Down Expand Up @@ -467,10 +460,8 @@ func (*Replica) checksumInitialWait(ctx context.Context) time.Duration {
}

// computeChecksumDone sends the checksum computation result to the receiver.
func (*Replica) computeChecksumDone(
rc *replicaChecksum, result *replicaHash, snapshot *roachpb.RaftSnapshotData,
) {
c := CollectChecksumResponse{Snapshot: snapshot}
func (*Replica) computeChecksumDone(rc *replicaChecksum, result *replicaHash) {
var c CollectChecksumResponse
if result != nil {
c.Checksum = result.SHA512[:]
delta := result.PersistedMS
Expand Down Expand Up @@ -649,7 +640,7 @@ func (*Replica) sha512(
return nil, err
}
if snapshot != nil {
// Add LeaseAppliedState to the diff.
// Add LeaseAppliedState to the snapshot.
kv := roachpb.RaftSnapshotData_KeyValue{
Timestamp: hlc.Timestamp{},
}
Expand Down Expand Up @@ -752,16 +743,12 @@ func (r *Replica) computeChecksumPostApply(
); err != nil {
log.Errorf(ctx, "checksum collection did not join: %v", err)
} else {
var snapshot *roachpb.RaftSnapshotData
if cc.SaveSnapshot {
snapshot = &roachpb.RaftSnapshotData{}
}
result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter)
result, err := r.sha512(ctx, desc, snap, nil /* snapshot */, cc.Mode, r.store.consistencyLimiter)
if err != nil {
log.Errorf(ctx, "checksum computation failed: %v", err)
result = nil
}
r.computeChecksumDone(c, result, snapshot)
r.computeChecksumDone(c, result)
}

var shouldFatal bool
Expand All @@ -776,10 +763,10 @@ func (r *Replica) computeChecksumPostApply(
}

// This node should fatal as a result of a previous consistency check (i.e.
// this round is carried out only to obtain a diff). If we fatal too early,
// the diff won't make it back to the leaseholder and thus won't be printed
// to the logs. Since we're already in a goroutine that's about to end,
// simply sleep for a few seconds and then terminate.
// this round only saves checkpoints and kills some nodes). If we fatal too
// early, the reply won't make it back to the leaseholder, so it will not be
// certain of completing the check. Since we're already in a goroutine
// that's about to end, just sleep for a few seconds and then terminate.
auxDir := r.store.engine.GetAuxiliaryDir()
_ = r.store.engine.MkdirAll(auxDir)
path := base.PreventedStartupFile(auxDir)
Expand Down
3 changes: 0 additions & 3 deletions pkg/kv/kvserver/stores_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ func (is Server) CollectChecksum(
if err != nil {
return err
}
if !req.WithSnapshot {
ccr.Snapshot = nil
}
resp = &ccr
return nil
})
Expand Down
5 changes: 1 addition & 4 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1435,10 +1435,7 @@ message ComputeChecksumRequest {
// The version used to pick the checksum method. It allows us to use a
// consistent checksumming method across replicas.
uint32 version = 2;
reserved 3;
// Compute a checksum along with a snapshot of the entire range, that will be
// used in logging a diff during checksum verification.
bool snapshot = 4;
reserved 3, 4;
// The type of checksum to compute. See ChecksumMode.
ChecksumMode mode = 5;
// If set, a checkpoint (i.e. cheap backup) of the storage engine will be
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
jobRegistry.SetInternalExecutorFactory(ieFactory)
execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg)
execCfg.IndexMerger = sql.NewIndexBackfillerMergePlanner(execCfg)
execCfg.IndexValidator = scdeps.NewIndexValidator(
execCfg.Validator = scdeps.NewValidator(
execCfg.DB,
execCfg.Codec,
execCfg.Settings,
Expand Down
Loading

0 comments on commit 94e2a2b

Please sign in to comment.