Skip to content

Commit

Permalink
Merge #67268
Browse files Browse the repository at this point in the history
67268:  changefeedccl: changefeed concurrency and frontier observability #67206 r=miretskiy a=miretskiy

Stop relying on ExportRequestLimit to determine the number of concurrent
export requests, and introduce a decidated ScanRequestLimit setting.

If the setting is specified, uses that setting; otherwise, the default
value is computed as 3 * (number of nodes in the cluster), which is the
old behavior, but we cap this number so that concurrency does not get
out of hand if running in a very large cluster.

Fixes #67190

Improve Observability of of change frontier updates.

Add a metric to keep track of the number of frontier updates in the
changefeed. Add logging when job progress updates take excessive amount
of time.

Fixes #67192

Release Nodes: Provide a better configurability of scan request
concurrency. Scan requests are issued by changefeeds during the
backfill.

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Jul 6, 2021
2 parents 0555d36 + e5a133e commit d23d94c
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 17 deletions.
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,16 @@ func (cf *changeFrontier) checkpointJobProgress(
defer func() { cf.js.lastRunStatusUpdate = timeutil.Now() }()
}

updateStart := timeutil.Now()
defer func() {
elapsed := timeutil.Since(updateStart)
if elapsed > 5*time.Millisecond {
log.Warningf(cf.Ctx, "slow job progress update took %s", elapsed)
}
}()

cf.metrics.FrontierUpdates.Inc(1)

return cf.js.job.Update(cf.Ctx, nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,12 @@ var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting(
"controls the maximum size of the checkpoint as a total size of key bytes",
1<<20,
)

// ScanRequestLimit is the number of Scan requests that can run at once.
// Scan requests are issued when changefeed performs the backfill.
// If set to 0, a reasonable default will be chosen.
var ScanRequestLimit = settings.RegisterIntSetting(
"changefeed.backfill.concurrent_scan_requests",
"number of concurrent scan requests per node issued during a backfill",
0,
)
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/covering",
"//pkg/storage/enginepb",
Expand Down
43 changes: 28 additions & 15 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -64,17 +65,7 @@ func (p *scanRequestScanner) Scan(
return err
}

// Export requests for the various watched spans are executed in parallel,
// with a semaphore-enforced limit based on a cluster setting.
// The spans here generally correspond with range boundaries.
approxNodeCount, err := clusterNodeCount(p.gossip)
if err != nil {
// can't count nodes in tenants
approxNodeCount = 1
}

maxConcurrentExports := approxNodeCount *
int(kvserver.ExportRequestsLimit.Get(&p.settings.SV))
maxConcurrentExports := maxConcurrentExportRequests(p.gossip, &p.settings.SV)
exportLim := limit.MakeConcurrentRequestLimiter("changefeedExportRequestLimiter", maxConcurrentExports)
g := ctxgroup.WithContext(ctx)
// atomicFinished is used only to enhance debugging messages.
Expand Down Expand Up @@ -267,15 +258,37 @@ func allRangeSpans(
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.OptionalGossip) (int, error) {
func clusterNodeCount(gw gossip.OptionalGossip) int {
g, err := gw.OptionalErr(47971)
if err != nil {
return 0, err
// can't count nodes in tenants
return 1
}
var nodes int
_ = g.IterateInfos(gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
})
return nodes, nil
return nodes
}

// maxConcurrentExportRequests returns the number of concurrent scan requests.
func maxConcurrentExportRequests(gw gossip.OptionalGossip, sv *settings.Values) int {
// If the user specified ScanRequestLimit -- use that value.
if max := changefeedbase.ScanRequestLimit.Get(sv); max > 0 {
return int(max)
}

// TODO(yevgeniy): Currently, issuing multiple concurrent updates scaled to the size of
// the cluster only make sense for the core change feeds. This configuration shoould
// be specified explicitly when creating scanner.
nodes := clusterNodeCount(gw)
// This is all hand-wavy: 3 per node used to be the default for a very long time.
// However, this could get out of hand if the clusters are large.
// So cap the max to an arbitrary value of a 100.
max := 3 * nodes
if max > 100 {
max = 100
}
return max
}
12 changes: 11 additions & 1 deletion pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ var (
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

metaChangefeedFrontierUpdates = metric.Metadata{
Name: "changefeed.frontier_updates",
Help: "Number of change frontier updates across all feeds",
Measurement: "Updates",
Unit: metric.Unit_COUNT,
}
)

// Metrics are for production monitoring of changefeeds.
Expand All @@ -214,6 +221,8 @@ type Metrics struct {

Running *metric.Gauge

FrontierUpdates *metric.Counter

mu struct {
syncutil.Mutex
id int
Expand Down Expand Up @@ -247,7 +256,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
FlushHistNanos: metric.NewHistogram(metaChangefeedFlushHistNanos, histogramWindow,
changefeedFlushHistMaxLatency.Nanoseconds(), 2),

Running: metric.NewGauge(metaChangefeedRunning),
Running: metric.NewGauge(metaChangefeedRunning),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
}
m.mu.resolved = make(map[int]hlc.Timestamp)
m.mu.id = 1 // start the first id at 1 so we can detect initialization
Expand Down

0 comments on commit d23d94c

Please sign in to comment.