From 69913a14769cd774607820280c09896ad9c82cfe Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Sat, 3 Jul 2021 09:19:37 -0400 Subject: [PATCH 1/2] changfeedccl: Introduce a knob to control concurrency of scan requests. Stop relying on ExportRequestLimit to determine the number of concurrent export requests, and introduce a decidated ScanRequestLimit setting. If the setting is specified, uses that setting; otherwise, the default value is computed as 3 * (number of nodes in the cluster), which is the old behavior, but we cap this number so that concurrency does not get out of hand if running in a very large cluster. Fixes #67190 Release Nodes: Provide a better configurability of scan request concurrency. Scan requests are issued by changefeeds during the backfill. --- .../changefeedccl/changefeedbase/settings.go | 9 ++++ pkg/ccl/changefeedccl/kvfeed/BUILD.bazel | 2 +- pkg/ccl/changefeedccl/kvfeed/scanner.go | 43 ++++++++++++------- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 385b0243a67e..d2eb509d1a7c 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -80,3 +80,12 @@ var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting( "controls the maximum size of the checkpoint as a total size of key bytes", 1<<20, ) + +// ScanRequestLimit is the number of Scan requests that can run at once. +// Scan requests are issued when changefeed performs the backfill. +// If set to 0, a reasonable default will be chosen. +var ScanRequestLimit = settings.RegisterIntSetting( + "changefeed.backfill.concurrent_scan_requests", + "number of concurrent scan requests per node issued during a backfill", + 0, +) diff --git a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel index dd79b6a21e0a..5789db4986f8 100644 --- a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel @@ -19,8 +19,8 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/kvcoord", - "//pkg/kv/kvserver", "//pkg/roachpb", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/covering", "//pkg/storage/enginepb", diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index 9abca43a9d5c..cd42b11ef699 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -13,14 +13,15 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/covering" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -64,17 +65,7 @@ func (p *scanRequestScanner) Scan( return err } - // Export requests for the various watched spans are executed in parallel, - // with a semaphore-enforced limit based on a cluster setting. - // The spans here generally correspond with range boundaries. - approxNodeCount, err := clusterNodeCount(p.gossip) - if err != nil { - // can't count nodes in tenants - approxNodeCount = 1 - } - - maxConcurrentExports := approxNodeCount * - int(kvserver.ExportRequestsLimit.Get(&p.settings.SV)) + maxConcurrentExports := maxConcurrentExportRequests(p.gossip, &p.settings.SV) exportLim := limit.MakeConcurrentRequestLimiter("changefeedExportRequestLimiter", maxConcurrentExports) g := ctxgroup.WithContext(ctx) // atomicFinished is used only to enhance debugging messages. @@ -267,15 +258,37 @@ func allRangeSpans( } // clusterNodeCount returns the approximate number of nodes in the cluster. -func clusterNodeCount(gw gossip.OptionalGossip) (int, error) { +func clusterNodeCount(gw gossip.OptionalGossip) int { g, err := gw.OptionalErr(47971) if err != nil { - return 0, err + // can't count nodes in tenants + return 1 } var nodes int _ = g.IterateInfos(gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error { nodes++ return nil }) - return nodes, nil + return nodes +} + +// maxConcurrentExportRequests returns the number of concurrent scan requests. +func maxConcurrentExportRequests(gw gossip.OptionalGossip, sv *settings.Values) int { + // If the user specified ScanRequestLimit -- use that value. + if max := changefeedbase.ScanRequestLimit.Get(sv); max > 0 { + return int(max) + } + + // TODO(yevgeniy): Currently, issuing multiple concurrent updates scaled to the size of + // the cluster only make sense for the core change feeds. This configuration shoould + // be specified explicitly when creating scanner. + nodes := clusterNodeCount(gw) + // This is all hand-wavy: 3 per node used to be the default for a very long time. + // However, this could get out of hand if the clusters are large. + // So cap the max to an arbitrary value of a 100. + max := 3 * nodes + if max > 100 { + max = 100 + } + return max } From e5a133e16896fe13b1b13f9284b794423f6aa508 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Sat, 3 Jul 2021 09:29:51 -0400 Subject: [PATCH 2/2] changefeedccl: Improve observability of change frontier updates. Add a metric to keep track of the number of frontier updates in the changefeed. Add logging when job progress updates take excessive amount of time. Fixes #67192 Release Notes: None --- pkg/ccl/changefeedccl/changefeed_processors.go | 10 ++++++++++ pkg/ccl/changefeedccl/metrics.go | 12 +++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index f8c0257fb69b..c1bd64acd486 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1280,6 +1280,16 @@ func (cf *changeFrontier) checkpointJobProgress( defer func() { cf.js.lastRunStatusUpdate = timeutil.Now() }() } + updateStart := timeutil.Now() + defer func() { + elapsed := timeutil.Since(updateStart) + if elapsed > 5*time.Millisecond { + log.Warningf(cf.Ctx, "slow job progress update took %s", elapsed) + } + }() + + cf.metrics.FrontierUpdates.Inc(1) + return cf.js.job.Update(cf.Ctx, nil, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, ) error { diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index f00934830345..d5a8fb535316 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -191,6 +191,13 @@ var ( Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } + + metaChangefeedFrontierUpdates = metric.Metadata{ + Name: "changefeed.frontier_updates", + Help: "Number of change frontier updates across all feeds", + Measurement: "Updates", + Unit: metric.Unit_COUNT, + } ) // Metrics are for production monitoring of changefeeds. @@ -214,6 +221,8 @@ type Metrics struct { Running *metric.Gauge + FrontierUpdates *metric.Counter + mu struct { syncutil.Mutex id int @@ -247,7 +256,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { FlushHistNanos: metric.NewHistogram(metaChangefeedFlushHistNanos, histogramWindow, changefeedFlushHistMaxLatency.Nanoseconds(), 2), - Running: metric.NewGauge(metaChangefeedRunning), + Running: metric.NewGauge(metaChangefeedRunning), + FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates), } m.mu.resolved = make(map[int]hlc.Timestamp) m.mu.id = 1 // start the first id at 1 so we can detect initialization