Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Break gossip dependency #101096

Merged
merged 1 commit into from
Apr 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
changefeedccl: Break gossip dependency
Break the dependency on Gossip library which was
used to determine the number of nodes in the cluster
in order to limit scanner concurrency.
Instead, rely on the range descriptors in order
to determine how many nodes host the ranges that
need to be scanned.

Fixes #47971

Release note: None
Yevgeniy Miretskiy committed Apr 10, 2023
commit ad7d868fa0578a42fc89fd398854727fad7d2386
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
@@ -523,7 +523,8 @@ type distResolver struct {
func (r *distResolver) getRangesForSpans(
ctx context.Context, spans []roachpb.Span,
) ([]roachpb.Span, error) {
return kvfeed.AllRangeSpans(ctx, r.DistSender, spans)
spans, _, err := kvfeed.AllRangeSpans(ctx, r.DistSender, spans)
return spans, err
}

func rebalanceSpanPartitions(
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/sql/covering",
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
61 changes: 26 additions & 35 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -70,7 +71,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg

sender := p.db.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
spans, err := getSpansToProcess(ctx, distSender, cfg.Spans)
spans, numNodesHint, err := getRangesToProcess(ctx, distSender, cfg.Spans)
if err != nil {
return err
}
@@ -81,7 +82,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg
defer backfillClear()
}

maxConcurrentScans := maxConcurrentScanRequests(p.gossip, &p.settings.SV)
maxConcurrentScans := maxConcurrentScanRequests(numNodesHint, &p.settings.SV)
exportLim := limit.MakeConcurrentRequestLimiter("changefeedScanRequestLimiter", maxConcurrentScans)

lastScanLimitUserSetting := changefeedbase.ScanRequestLimit.Get(&p.settings.SV)
@@ -95,7 +96,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg
// If the user defined scan request limit has changed, recalculate it
if currentUserScanLimit := changefeedbase.ScanRequestLimit.Get(&p.settings.SV); currentUserScanLimit != lastScanLimitUserSetting {
lastScanLimitUserSetting = currentUserScanLimit
exportLim.SetLimit(maxConcurrentScanRequests(p.gossip, &p.settings.SV))
exportLim.SetLimit(maxConcurrentScanRequests(numNodesHint, &p.settings.SV))
}

limAlloc, err := exportLim.Begin(ctx)
@@ -249,12 +250,14 @@ func (p *scanRequestScanner) exportSpan(
return nil
}

func getSpansToProcess(
// getRangesToProcess returns the list of ranges covering input list of spans.
// Returns the number of nodes that are leaseholders for those spans.
func getRangesToProcess(
ctx context.Context, ds *kvcoord.DistSender, targetSpans []roachpb.Span,
) ([]roachpb.Span, error) {
ranges, err := AllRangeSpans(ctx, ds, targetSpans)
) ([]roachpb.Span, int, error) {
ranges, numNodes, err := AllRangeSpans(ctx, ds, targetSpans)
if err != nil {
return nil, err
return nil, 0, err
}

type spanMarker struct{}
@@ -289,7 +292,7 @@ func getSpansToProcess(
}
requests = append(requests, roachpb.Span{Key: chunk.Start, EndKey: chunk.End})
}
return requests, nil
return requests, numNodes, nil
}

// slurpScanResponse iterates the ScanResponse and inserts the contained kvs into
@@ -319,66 +322,54 @@ func slurpScanResponse(
return nil
}

// AllRangeSpans returns the list of all ranges that for the specified list of spans.
// AllRangeSpans returns the list of all ranges that cover input spans along with the
// nodeCountHint indicating the number of nodes that host those ranges.
func AllRangeSpans(
ctx context.Context, ds *kvcoord.DistSender, spans []roachpb.Span,
) ([]roachpb.Span, error) {

) (_ []roachpb.Span, nodeCountHint int, _ error) {
ranges := make([]roachpb.Span, 0, len(spans))

it := kvcoord.MakeRangeIterator(ds)
var replicas util.FastIntMap

for i := range spans {
rSpan, err := keys.SpanAddr(spans[i])
if err != nil {
return nil, err
return nil, 0, err
}
for it.Seek(ctx, rSpan.Key, kvcoord.Ascending); ; it.Next(ctx) {
if !it.Valid() {
return nil, it.Error()
return nil, 0, it.Error()
}
ranges = append(ranges, roachpb.Span{
Key: it.Desc().StartKey.AsRawKey(), EndKey: it.Desc().EndKey.AsRawKey(),
})
for _, r := range it.Desc().InternalReplicas {
replicas.Set(int(r.NodeID), 0)
}
if !it.NeedAnother(rSpan) {
break
}
}
}

return ranges, nil
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.OptionalGossip) int {
g, err := gw.OptionalErr(47971)
if err != nil {
// can't count nodes in tenants
return 1
}
var nodes int
_ = g.IterateInfos(gossip.KeyNodeDescPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
})
return nodes
return ranges, replicas.Len(), nil
}

// maxConcurrentScanRequests returns the number of concurrent scan requests.
func maxConcurrentScanRequests(gw gossip.OptionalGossip, sv *settings.Values) int {
func maxConcurrentScanRequests(numNodesHint int, sv *settings.Values) int {
// If the user specified ScanRequestLimit -- use that value.
if max := changefeedbase.ScanRequestLimit.Get(sv); max > 0 {
return int(max)
}
if numNodesHint < 1 {
return 1
}

// TODO(yevgeniy): Currently, issuing multiple concurrent updates scaled to the size of
// the cluster only make sense for the core change feeds. This configuration shoould
// be specified explicitly when creating scanner.
nodes := clusterNodeCount(gw)
// This is all hand-wavy: 3 per node used to be the default for a very long time.
// However, this could get out of hand if the clusters are large.
// So cap the max to an arbitrary value of a 100.
max := 3 * nodes
max := 3 * numNodesHint
if max > 100 {
max = 100
}