Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
83020: stmtdiagnostics: support continuous bundle collection r=irfansharif a=irfansharif

..until expiry. Informs #82896 (more specifically this is a short-term
alternative to the part pertaining to continuous tail capture). The
issue has more background, but we repeat some below for posterity.

It's desirable to draw from a set of tail execution traces collected
over time when investigating tail latencies. #82750 introduced a
probabilistic mechanism to capture a single tail event for a individual
stmts with bounded overhead (determined by the sampling probability,
trading off how long until a single capture is obtained). This PR
introduces a `sql.stmt_diagnostics.collect_continuously_until_expired`
to collect captures continuously over some period of time for aggregate
analysis. Longer term we'd want:
- Controls over the maximum number of captures we'd want stored over
  some period of time;
- Eviction of older bundles, assuming they're less relevant, making room
  for newer captures.

To safeguard against misuse (in this current form we should only use it
for experiments or escalations under controlled environments), we only
act on this setting provided the diagnostics request has an expiration
timestamp and a specified probability, crude measures to prevent
unbounded growth.

---

To get some idea of how this can be used, consider the kinds of
experiments we're running as part of #75066. Specifically we have a
reproduction where we can observe spikes in latencies for foreground
traffic in the presence of concurrent backups (incremental/full). In an
experiment with incremental backups running every 10m, with full backups
running every 35m (`RECURRING '*/10 * * * *' FULL BACKUP '35 * * * *'`),
we observe latency spikes during overlap periods. With this cluster
setting we were able to set up trace captures over a 10h window to get a
set of representative outlier traces to investigate further.

    > SELECT crdb_internal.request_statement_bundle(
      'INSERT INTO new_order(no_o_id, ...)', -- stmt fingerprint
      0.05,                                  -- 5% sampling probability
      '30ms'::INTERVAL,                      -- 30ms target (p99.9)
      '10h'::INTERVAL                        -- capture window
    );

    > WITH histogram AS
         (SELECT extract('minute', collected_at) AS minute,
                 count(*) FROM system.statement_diagnostics
          GROUP BY minute)
    SELECT minute, repeat('*', (30 * count/(max(count) OVER ()))::INT8) AS freq
    FROM histogram
    ORDER BY count DESC
    LIMIT 10;

      minute |              freq
    ---------+---------------------------------
          36 | ******************************
          38 | *********************
          35 | *********************
          00 | *********************
          37 | ********************
          30 | ********************
          40 | *****************
          20 | **************
          10 | *************
          50 | ***********
    (10 rows)

We see that we captured just the set of bundles/traces we were interested in.

Release note: None

86591: kvserver: sync checksum computation with long poll r=erikgrinaker a=pavelkalinnikov

Previously, the checksum computation would run until completion unconditionally
(unless the collection request comes before it). This is not the best spend of
the limited pool capacity, because the result of this computation may never be
requested.

After this commit, the checksum computation task is synchronized with the
checksum collection request. Both wait at most 5 seconds until the other party
has joined. Once joined, the computation starts, otherwise skips.

If any party abandons the request, then the `replicaChecksum` record is preserved
in the state, and is scheduled for a GC later. This is to help the other party
to fail fast, instead of waiting, if it arrives late.

This change also removes the no longer needed concurrency limit for the tasks,
because tasks are canceled reliably and will not pile up.

Fixes #77432

Release note (performance improvement): consistency checks are now properly
cancelled on timeout, preventing them from piling up.

88768: ci: add MacOS ARM CI config r=jlinder a=rail

Previously, MacOS64 ARM64 platform was added, but CI wouldn't run it.

This PR adds a CI platform to build MacOS ARM64 binaries.

Release note: None

Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
Co-authored-by: Rail Aliiev <[email protected]>
  • Loading branch information
4 people committed Sep 27, 2022
4 parents f85c69f + 0c9b96f + 46fe099 + 416037f commit cdd8b6e
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 124 deletions.
12 changes: 12 additions & 0 deletions build/teamcity/cockroach/ci/builds/build_macos_arm64.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env bash

set -euo pipefail

dir="$(dirname $(dirname $(dirname $(dirname $(dirname "${0}")))))"

source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

tc_start_block "Run Bazel build"
run_bazel build/teamcity/cockroach/ci/builds/build_impl.sh crossmacosarm
tc_end_block "Run Bazel build"
29 changes: 4 additions & 25 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,10 @@ const consistencyCheckRateBurstFactor = 8
// churn on timers.
const consistencyCheckRateMinWait = 100 * time.Millisecond

// consistencyCheckAsyncConcurrency is the maximum number of asynchronous
// consistency checks to run concurrently per store below Raft. The
// server.consistency_check.max_rate limit is shared among these, so running too
// many at the same time will cause them to time out. The rate is multiplied by
// 10 (permittedRangeScanSlowdown) to obtain the per-check timeout. 7 gives
// reasonable headroom, and also handles clusters with high replication factor
// and/or many nodes -- recall that each node runs a separate consistency queue
// which can schedule checks on other nodes, e.g. a 7-node cluster with a
// replication factor of 7 could run 7 concurrent checks on every node.
//
// Note that checksum calculations below Raft are not tied to the caller's
// context, and may continue to run even after the caller has given up on them,
// which may cause them to build up. Although we do best effort to cancel the
// running task on the receiving end when the incoming request is aborted.
//
// CHECK_STATS checks do not count towards this limit, as they are cheap and the
// DistSender will parallelize them across all ranges (notably when calling
// crdb_internal.check_consistency()).
const consistencyCheckAsyncConcurrency = 7

// consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous
// consistency check calculations. These are not tied to the caller's context,
// and thus may continue to run even if the caller has given up on them, so we
// give them an upper timeout to prevent them from running forever.
const consistencyCheckAsyncTimeout = time.Hour
// consistencyCheckSyncTimeout is the max amount of time the consistency check
// computation and the checksum collection request will wait for each other
// before giving up.
const consistencyCheckSyncTimeout = 5 * time.Second

var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false)

Expand Down
133 changes: 70 additions & 63 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ import (
// Up to 22.1, the consistency check initiator used to synchronously collect the
// first replica's checksum before all others, so checksum collection requests
// could arrive late if the first one was slow. Since 22.2, all requests are
// parallel and likely arrive quickly.
// parallel and likely arrive quickly. Thus, in 23.1 the checksum task waits a
// short amount of time until the collection request arrives, and otherwise
// doesn't start.
//
// TODO(pavelkalinnikov): Consider removing GC behaviour in 23.1+, when all the
// incoming requests are from 22.2+ nodes (hence arrive timely).
// We still need the delayed GC in order to help a late arriving participant to
// learn that the other one gave up, and fail fast instead of waiting.
const replicaChecksumGCInterval = time.Hour

// fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A
Expand Down Expand Up @@ -422,7 +424,7 @@ func (r *Replica) getReplicaChecksum(id uuid.UUID, now time.Time) (*replicaCheck
c := r.mu.checksums[id]
if c == nil {
c = &replicaChecksum{
started: make(chan context.CancelFunc, 1), // allow an async send
started: make(chan context.CancelFunc), // require send/recv sync
result: make(chan CollectChecksumResponse, 1), // allow an async send
}
r.mu.checksums[id] = c
Expand Down Expand Up @@ -496,13 +498,13 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksu
}

// checksumInitialWait returns the amount of time to wait until the checksum
// computation has started. It is set to min of 5s and 10% of the remaining time
// in the passed-in context (if it has a deadline).
// computation has started. It is set to min of consistencyCheckSyncTimeout and
// 10% of the remaining time in the passed-in context (if it has a deadline).
//
// If it takes longer, chances are that the replica is being restored from
// snapshots, or otherwise too busy to handle this request soon.
func (*Replica) checksumInitialWait(ctx context.Context) time.Duration {
wait := 5 * time.Second
wait := consistencyCheckSyncTimeout
if d, ok := ctx.Deadline(); ok {
if dur := time.Duration(timeutil.Until(d).Nanoseconds() / 10); dur < wait {
wait = dur
Expand Down Expand Up @@ -747,72 +749,79 @@ func (r *Replica) computeChecksumPostApply(
}

// Compute SHA asynchronously and store it in a map by UUID. Concurrent checks
// share the rate limit in r.store.consistencyLimiter, so we also limit the
// number of concurrent checks via r.store.consistencySem.
// share the rate limit in r.store.consistencyLimiter, so if too many run at
// the same time, chances are they will time out.
//
// Don't use the proposal's context for this, as it likely to be canceled very
// soon.
// Each node's consistency queue runs a check for one range at a time, which
// it broadcasts to all replicas, so the average number of incoming in-flight
// collection requests per node is equal to the replication factor (typ. 3-7).
// Abandoned tasks are canceled eagerly within a few seconds, so there is very
// limited room for running above this figure. Thus we don't limit the number
// of concurrent tasks here.
//
// NB: CHECK_STATS checks are cheap and the DistSender will parallelize them
// across all ranges (notably when calling crdb_internal.check_consistency()).
const taskName = "kvserver.Replica: computing checksum"
sem := r.store.consistencySem
if cc.Mode == roachpb.ChecksumMode_CHECK_STATS {
// Stats-only checks are cheap, and the DistSender parallelizes these across
// ranges (in particular when calling crdb_internal.check_consistency()), so
// they don't count towards the semaphore limit.
sem = nil
}
stopper := r.store.Stopper()
// Don't use the proposal's context, as it is likely to be canceled very soon.
taskCtx, taskCancel := stopper.WithCancelOnQuiesce(r.AnnotateCtx(context.Background()))
if err := stopper.RunAsyncTaskEx(taskCtx, stop.TaskOpts{
TaskName: taskName,
Sem: sem,
WaitForSem: false,
TaskName: taskName,
}, func(ctx context.Context) {
defer taskCancel()
// There is only one writer to c.started (this task), so this doesn't block.
// But if by mistake there is another writer, one of us closes the channel
// eventually, and other send/close ops will crash. This is by design.
c.started <- taskCancel
close(c.started)

if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckAsyncTimeout,
defer snap.Close()
defer r.gcReplicaChecksum(cc.ChecksumID, c)
// Wait until the CollectChecksum request handler joins in and learns about
// the starting computation, and then start it.
if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckSyncTimeout,
func(ctx context.Context) error {
defer snap.Close()
var snapshot *roachpb.RaftSnapshotData
if cc.SaveSnapshot {
snapshot = &roachpb.RaftSnapshotData{}
// There is only one writer to c.started (this task), buf if by mistake
// there is another writer, one of us closes the channel eventually, and
// other writes to c.started will crash. By design.
defer close(c.started)
select {
case <-ctx.Done():
return ctx.Err()
case c.started <- taskCancel:
return nil
}

result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter)
if err != nil {
result = nil
}
r.computeChecksumDone(c, result, snapshot)
r.gcReplicaChecksum(cc.ChecksumID, c)
return err
},
); err != nil {
log.Errorf(ctx, "checksum computation failed: %v", err)
log.Errorf(ctx, "checksum collection did not join: %v", err)
} else {
var snapshot *roachpb.RaftSnapshotData
if cc.SaveSnapshot {
snapshot = &roachpb.RaftSnapshotData{}
}
result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter)
if err != nil {
log.Errorf(ctx, "checksum computation failed: %v", err)
result = nil
}
r.computeChecksumDone(c, result, snapshot)
}

var shouldFatal bool
for _, rDesc := range cc.Terminate {
if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID {
shouldFatal = true
break
}
}
if !shouldFatal {
return
}

if shouldFatal {
// This node should fatal as a result of a previous consistency
// check (i.e. this round is carried out only to obtain a diff).
// If we fatal too early, the diff won't make it back to the lease-
// holder and thus won't be printed to the logs. Since we're already
// in a goroutine that's about to end, simply sleep for a few seconds
// and then terminate.
auxDir := r.store.engine.GetAuxiliaryDir()
_ = r.store.engine.MkdirAll(auxDir)
path := base.PreventedStartupFile(auxDir)
// This node should fatal as a result of a previous consistency check (i.e.
// this round is carried out only to obtain a diff). If we fatal too early,
// the diff won't make it back to the leaseholder and thus won't be printed
// to the logs. Since we're already in a goroutine that's about to end,
// simply sleep for a few seconds and then terminate.
auxDir := r.store.engine.GetAuxiliaryDir()
_ = r.store.engine.MkdirAll(auxDir)
path := base.PreventedStartupFile(auxDir)

const attentionFmt = `ATTENTION:
const attentionFmt = `ATTENTION:
this node is terminating because a replica inconsistency was detected between %s
and its other replicas. Please check your cluster-wide log files for more
Expand All @@ -825,19 +834,17 @@ A checkpoints directory to aid (expert) debugging should be present in:
A file preventing this node from restarting was placed at:
%s
`
preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path)
if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil {
log.Warningf(ctx, "%v", err)
}

if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil {
p(*r.store.Ident)
} else {
time.Sleep(10 * time.Second)
log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path)
}
preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path)
if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil {
log.Warningf(ctx, "%v", err)
}

if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil {
p(*r.store.Ident)
} else {
time.Sleep(10 * time.Second)
log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path)
}
}); err != nil {
taskCancel()
snap.Close()
Expand Down
55 changes: 40 additions & 15 deletions pkg/kv/kvserver/replica_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestReplicaChecksumVersion(t *testing.T) {
Expand All @@ -54,8 +55,10 @@ func TestReplicaChecksumVersion(t *testing.T) {
} else {
cc.Version = 1
}
taskErr := tc.repl.computeChecksumPostApply(ctx, cc)
var g errgroup.Group
g.Go(func() error { return tc.repl.computeChecksumPostApply(ctx, cc) })
rc, err := tc.repl.getChecksum(ctx, cc.ChecksumID)
taskErr := g.Wait()
if !matchingVersion {
require.ErrorContains(t, taskErr, "incompatible versions")
require.ErrorContains(t, err, "checksum task failed to start")
Expand All @@ -79,13 +82,12 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {
defer stopper.Stop(ctx)
tc.Start(ctx, t, stopper)

requireChecksumTaskNotStarted := func(id uuid.UUID) {
require.ErrorContains(t,
tc.repl.computeChecksumPostApply(context.Background(), kvserverpb.ComputeChecksum{
ChecksumID: id,
Mode: roachpb.ChecksumMode_CHECK_FULL,
Version: batcheval.ReplicaChecksumVersion,
}), "checksum collection request gave up")
startChecksumTask := func(ctx context.Context, id uuid.UUID) error {
return tc.repl.computeChecksumPostApply(ctx, kvserverpb.ComputeChecksum{
ChecksumID: id,
Mode: roachpb.ChecksumMode_CHECK_FULL,
Version: batcheval.ReplicaChecksumVersion,
})
}

// Checksum computation failed to start.
Expand All @@ -99,28 +101,38 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {
// Checksum computation started, but failed.
id = uuid.FastMakeV4()
c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now())
c.started <- func() {}
close(c.started)
close(c.result)
var g errgroup.Group
g.Go(func() error {
c.started <- func() {}
close(c.started)
close(c.result)
return nil
})
rc, err = tc.repl.getChecksum(ctx, id)
require.ErrorContains(t, err, "no checksum found")
require.Nil(t, rc.Checksum)
require.NoError(t, g.Wait())

// The initial wait for the task start expires. This will take 10ms.
id = uuid.FastMakeV4()
rc, err = tc.repl.getChecksum(ctx, id)
require.ErrorContains(t, err, "checksum computation did not start")
require.Nil(t, rc.Checksum)
requireChecksumTaskNotStarted(id)
require.ErrorContains(t, startChecksumTask(context.Background(), id),
"checksum collection request gave up")

// The computation has started, but the request context timed out.
id = uuid.FastMakeV4()
c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now())
c.started <- func() {}
close(c.started)
g.Go(func() error {
c.started <- func() {}
close(c.started)
return nil
})
rc, err = tc.repl.getChecksum(ctx, id)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, rc.Checksum)
require.NoError(t, g.Wait())

// Context is canceled during the initial waiting.
id = uuid.FastMakeV4()
Expand All @@ -129,7 +141,20 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {
rc, err = tc.repl.getChecksum(ctx, id)
require.ErrorIs(t, err, context.Canceled)
require.Nil(t, rc.Checksum)
requireChecksumTaskNotStarted(id)
require.ErrorContains(t, startChecksumTask(context.Background(), id),
"checksum collection request gave up")

// The task failed to start because the checksum collection request did not
// join. Later, when it joins, it finds out that the task gave up.
id = uuid.FastMakeV4()
c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now())
require.NoError(t, startChecksumTask(context.Background(), id))
// TODO(pavelkalinnikov): Avoid this long wait in the test.
time.Sleep(2 * consistencyCheckSyncTimeout) // give the task time to give up
_, ok := <-c.started
require.False(t, ok) // ensure the task gave up
rc, err = tc.repl.getChecksum(context.Background(), id)
require.ErrorContains(t, err, "checksum task failed to start")
}

// TestReplicaChecksumSHA512 checks that a given dataset produces the expected
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,6 @@ type Store struct {
scanner *replicaScanner // Replica scanner
consistencyQueue *consistencyQueue // Replica consistency check queue
consistencyLimiter *quotapool.RateLimiter // Rate limits consistency checks
consistencySem *quotapool.IntPool // Limit concurrent consistency checks
metrics *StoreMetrics
intentResolver *intentresolver.IntentResolver
recoveryMgr txnrecovery.Manager
Expand Down Expand Up @@ -2090,9 +2089,6 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
rate := consistencyCheckRate.Get(&s.ClusterSettings().SV)
s.consistencyLimiter.UpdateLimit(quotapool.Limit(rate), rate*consistencyCheckRateBurstFactor)
})
s.consistencySem = quotapool.NewIntPool("concurrent async consistency checks",
consistencyCheckAsyncConcurrency)
s.stopper.AddCloser(s.consistencySem.Closer("stopper"))

// Set the started flag (for unittests).
atomic.StoreInt32(&s.started, 1)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
*cfg.collectionFactory = *collectionFactory
cfg.internalExecutorFactory = ieFactory
execCfg.InternalExecutor = cfg.circularInternalExecutor

stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry(
cfg.circularInternalExecutor,
cfg.db,
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,13 @@ func (bundle *diagnosticsBundle) insert(
ast tree.Statement,
stmtDiagRecorder *stmtdiagnostics.Registry,
diagRequestID stmtdiagnostics.RequestID,
req stmtdiagnostics.Request,
) {
var err error
bundle.diagID, err = stmtDiagRecorder.InsertStatementDiagnostics(
ctx,
diagRequestID,
req,
fingerprint,
tree.AsString(ast),
bundle.zip,
Expand Down
Loading

0 comments on commit cdd8b6e

Please sign in to comment.