diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index aa6285955b2b..391a813ff3b4 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -523,7 +523,8 @@ type distResolver struct { func (r *distResolver) getRangesForSpans( ctx context.Context, spans []roachpb.Span, ) ([]roachpb.Span, error) { - return kvfeed.AllRangeSpans(ctx, r.DistSender, spans) + spans, _, err := kvfeed.AllRangeSpans(ctx, r.DistSender, spans) + return spans, err } func rebalanceSpanPartitions( diff --git a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel index 19b0bf0f5e7a..8afe1f034dda 100644 --- a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "//pkg/settings/cluster", "//pkg/sql/covering", "//pkg/storage/enginepb", + "//pkg/util", "//pkg/util/admission/admissionpb", "//pkg/util/ctxgroup", "//pkg/util/hlc", diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index a4f25be05672..9deeec96b98c 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/covering" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -70,7 +71,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg sender := p.db.NonTransactionalSender() distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender) - spans, err := getSpansToProcess(ctx, distSender, cfg.Spans) + spans, numNodesHint, err := getRangesToProcess(ctx, distSender, cfg.Spans) if err != nil { return err } @@ -81,7 +82,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg defer backfillClear() } - maxConcurrentScans := maxConcurrentScanRequests(p.gossip, &p.settings.SV) + maxConcurrentScans := maxConcurrentScanRequests(numNodesHint, &p.settings.SV) exportLim := limit.MakeConcurrentRequestLimiter("changefeedScanRequestLimiter", maxConcurrentScans) lastScanLimitUserSetting := changefeedbase.ScanRequestLimit.Get(&p.settings.SV) @@ -95,7 +96,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg // If the user defined scan request limit has changed, recalculate it if currentUserScanLimit := changefeedbase.ScanRequestLimit.Get(&p.settings.SV); currentUserScanLimit != lastScanLimitUserSetting { lastScanLimitUserSetting = currentUserScanLimit - exportLim.SetLimit(maxConcurrentScanRequests(p.gossip, &p.settings.SV)) + exportLim.SetLimit(maxConcurrentScanRequests(numNodesHint, &p.settings.SV)) } limAlloc, err := exportLim.Begin(ctx) @@ -249,12 +250,14 @@ func (p *scanRequestScanner) exportSpan( return nil } -func getSpansToProcess( +// getRangesToProcess returns the list of ranges covering input list of spans. +// Returns the number of nodes that are leaseholders for those spans. +func getRangesToProcess( ctx context.Context, ds *kvcoord.DistSender, targetSpans []roachpb.Span, -) ([]roachpb.Span, error) { - ranges, err := AllRangeSpans(ctx, ds, targetSpans) +) ([]roachpb.Span, int, error) { + ranges, numNodes, err := AllRangeSpans(ctx, ds, targetSpans) if err != nil { - return nil, err + return nil, 0, err } type spanMarker struct{} @@ -289,7 +292,7 @@ func getSpansToProcess( } requests = append(requests, roachpb.Span{Key: chunk.Start, EndKey: chunk.End}) } - return requests, nil + return requests, numNodes, nil } // slurpScanResponse iterates the ScanResponse and inserts the contained kvs into @@ -319,66 +322,54 @@ func slurpScanResponse( return nil } -// AllRangeSpans returns the list of all ranges that for the specified list of spans. +// AllRangeSpans returns the list of all ranges that cover input spans along with the +// nodeCountHint indicating the number of nodes that host those ranges. func AllRangeSpans( ctx context.Context, ds *kvcoord.DistSender, spans []roachpb.Span, -) ([]roachpb.Span, error) { - +) (_ []roachpb.Span, nodeCountHint int, _ error) { ranges := make([]roachpb.Span, 0, len(spans)) it := kvcoord.MakeRangeIterator(ds) + var replicas util.FastIntMap for i := range spans { rSpan, err := keys.SpanAddr(spans[i]) if err != nil { - return nil, err + return nil, 0, err } for it.Seek(ctx, rSpan.Key, kvcoord.Ascending); ; it.Next(ctx) { if !it.Valid() { - return nil, it.Error() + return nil, 0, it.Error() } ranges = append(ranges, roachpb.Span{ Key: it.Desc().StartKey.AsRawKey(), EndKey: it.Desc().EndKey.AsRawKey(), }) + for _, r := range it.Desc().InternalReplicas { + replicas.Set(int(r.NodeID), 0) + } if !it.NeedAnother(rSpan) { break } } } - return ranges, nil -} - -// clusterNodeCount returns the approximate number of nodes in the cluster. -func clusterNodeCount(gw gossip.OptionalGossip) int { - g, err := gw.OptionalErr(47971) - if err != nil { - // can't count nodes in tenants - return 1 - } - var nodes int - _ = g.IterateInfos(gossip.KeyNodeDescPrefix, func(_ string, _ gossip.Info) error { - nodes++ - return nil - }) - return nodes + return ranges, replicas.Len(), nil } // maxConcurrentScanRequests returns the number of concurrent scan requests. -func maxConcurrentScanRequests(gw gossip.OptionalGossip, sv *settings.Values) int { +func maxConcurrentScanRequests(numNodesHint int, sv *settings.Values) int { // If the user specified ScanRequestLimit -- use that value. if max := changefeedbase.ScanRequestLimit.Get(sv); max > 0 { return int(max) } + if numNodesHint < 1 { + return 1 + } - // TODO(yevgeniy): Currently, issuing multiple concurrent updates scaled to the size of - // the cluster only make sense for the core change feeds. This configuration shoould - // be specified explicitly when creating scanner. - nodes := clusterNodeCount(gw) // This is all hand-wavy: 3 per node used to be the default for a very long time. // However, this could get out of hand if the clusters are large. // So cap the max to an arbitrary value of a 100. - max := 3 * nodes + max := 3 * numNodesHint if max > 100 { max = 100 }