Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: storage: Support scattering replicas in addition to leases #26438

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
714 changes: 376 additions & 338 deletions pkg/roachpb/api.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,7 @@ message AdminScatterRequest {

RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
bool randomize_leases = 2;
bool randomize_replicas = 3;
}

// ScatterResponse is the response to a Scatter() operation.
Expand Down
62 changes: 44 additions & 18 deletions pkg/sql/scatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -116,40 +117,65 @@ func (p *planner) Scatter(ctx context.Context, n *tree.Scatter) (planNode, error
}
}

var rspan roachpb.RSpan
rspan.Key, err = keys.Addr(span.Key)
if err != nil {
return nil, err
}
rspan.EndKey, err = keys.Addr(span.EndKey)
if err != nil {
return nil, err
}

return &scatterNode{
run: scatterRun{
span: span,
span: rspan,
},
}, nil
}

// scatterRun contains the run-time state of scatterNode during local execution.
type scatterRun struct {
span roachpb.Span
span roachpb.RSpan

rangeIdx int
ranges []roachpb.AdminScatterResponse_Range
ri *kv.RangeIterator
firstRange bool
}

func (n *scatterNode) startExec(params runParams) error {
n.run.ri = kv.NewRangeIterator(params.p.ExecCfg().DistSender)
n.run.ri.Seek(params.ctx, n.run.span.Key, kv.Ascending)
n.run.firstRange = true
return nil
}

func (n *scatterNode) Next(params runParams) (bool, error) {
if !n.run.ri.Valid() {
return false, n.run.ri.Error().GoError()
}
if !n.run.ri.NeedAnother(n.run.span) {
return false, nil
}
if n.run.firstRange {
n.run.firstRange = false
} else {
n.run.ri.Next(params.ctx)
}
desc := n.run.ri.Desc()

db := params.p.ExecCfg().DB
req := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeader{Key: n.run.span.Key, EndKey: n.run.span.EndKey},
RequestHeader: roachpb.RequestHeader{Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey()},
RandomizeLeases: true,
// TODO(XXX): Require an additional option for this?
RandomizeReplicas: true,
}
res, pErr := client.SendWrapped(params.ctx, db.GetSender(), req)
_, pErr := client.SendWrapped(params.ctx, db.GetSender(), req)
if pErr != nil {
return pErr.GoError()
return false, pErr.GoError()
}
n.run.rangeIdx = -1
n.run.ranges = res.(*roachpb.AdminScatterResponse).Ranges
return nil
}

func (n *scatterNode) Next(params runParams) (bool, error) {
n.run.rangeIdx++
hasNext := n.run.rangeIdx < len(n.run.ranges)
return hasNext, nil
return true, nil
}

var scatterNodeColumns = sqlbase.ResultColumns{
Expand All @@ -164,10 +190,10 @@ var scatterNodeColumns = sqlbase.ResultColumns{
}

func (n *scatterNode) Values() tree.Datums {
r := n.run.ranges[n.run.rangeIdx]
key := n.run.ri.Desc().StartKey
return tree.Datums{
tree.NewDBytes(tree.DBytes(r.Span.Key)),
tree.NewDString(keys.PrettyPrint(nil /* valDirs */, r.Span.Key)),
tree.NewDBytes(tree.DBytes(key)),
tree.NewDString(keys.PrettyPrint(nil /* valDirs */, key.AsRawKey())),
}
}

Expand Down
17 changes: 16 additions & 1 deletion pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ func (*allocatorError) purgatoryErrorMarker() {}

var _ purgatoryError = &allocatorError{}

type throttledStoresError struct {
numThrottledStores int
}

func (t *throttledStoresError) Error() string {
return fmt.Sprintf("%d matching stores are currently throttled", t.numThrottledStores)
}

// IsThrottledStoresError returns true iff the error indicates the allocator
// couldn't take action because too many stores were throttled.
func IsThrottledStoresError(err error) bool {
_, ok := err.(*throttledStoresError)
return ok
}

// allocatorRand pairs a rand.Rand with a mutex.
// NOTE: Allocator is typically only accessed from a single thread (the
// replication queue), but this assumption is broken in tests which force
Expand Down Expand Up @@ -361,7 +376,7 @@ func (a *Allocator) AllocateTarget(
// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
return nil, "", &throttledStoresError{numThrottledStores: throttledStoreCount}
}
return nil, "", &allocatorError{
constraints: zone.Constraints,
Expand Down
41 changes: 40 additions & 1 deletion pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2111,17 +2111,56 @@ func (r *Replica) adminScatter(
MaxRetries: 5,
}

if args.RandomizeReplicas {
_, liveStores, _ := r.store.cfg.StorePool.getStoreList(r.RangeID, storeFilterNone)
zone, err := sysCfg.GetZoneConfigForKey(r.Desc().StartKey)
if err != nil {
return roachpb.AdminScatterResponse{}, err
}
numReplicas := int(zone.NumReplicas)
replicasToAdd := rand.Intn(liveStores) + 1 - numReplicas
if replicasToAdd < 0 {
replicasToAdd = 0
} else if replicasToAdd > numReplicas {
replicasToAdd = numReplicas
}

var replicasAdded int
for re := retry.StartWithCtx(ctx, retryOpts); re.Next() && replicasAdded < replicasToAdd; {
requeue, err := rq.processOneChange(ctx, r, sysCfg, func() bool { return false } /* canTransferLease */, false /* dryRun */, true /* disableStatsBasedRebalancing */, true /* forceAddReplica */)
if err != nil {
if IsSnapshotError(err) || IsThrottledStoresError(err) {
// TODO: remove
log.Infof(ctx, "encountered retriable error when adding replica for scatter: %s", err)
continue
}
log.Warningf(ctx, "encountered non-retriable error when adding replica for scatter: %s", err)
break
}
replicasAdded++
if !requeue {
break
}
re.Reset()
}
// TODO: remove
log.Infof(ctx, "SCATTER RandomizeReplicas added %d out of %d desired replicas", replicasAdded, replicasToAdd)
}

// Loop until the replicate queue decides there is nothing left to do for the
// range. Note that we disable lease transfers until the final step as
// transferring the lease prevents any further action on this node.
var allowLeaseTransfer bool
canTransferLease := func() bool { return allowLeaseTransfer }
for re := retry.StartWithCtx(ctx, retryOpts); re.Next(); {
requeue, err := rq.processOneChange(ctx, r, sysCfg, canTransferLease, false /* dryRun */, true /* disableStatsBasedRebalancing */)
requeue, err := rq.processOneChange(ctx, r, sysCfg, canTransferLease, false /* dryRun */, true /* disableStatsBasedRebalancing */, false /* forceAddReplica */)
if err != nil {
if IsSnapshotError(err) {
// TODO: remove
log.Infof(ctx, "encountered snapshot error from replicate queue during scatter: %s", err)
continue
}
log.Warningf(ctx, "encountered non-retriable error from replicate queue during scatter: %s", err)
break
}
if !requeue {
Expand Down
20 changes: 17 additions & 3 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ func makeReplicateQueueMetrics() ReplicateQueueMetrics {
}
}

type replicateQueueOptions struct {
dryRun bool
forceAddReplica bool
disableStatsBasedRebalancing bool
}

// replicateQueue manages a queue of replicas which may need to add an
// additional replica to their range.
type replicateQueue struct {
Expand Down Expand Up @@ -215,7 +221,7 @@ func (rq *replicateQueue) process(
// snapshot errors, usually signaling that a rebalancing
// reservation could not be made with the selected target.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
if requeue, err := rq.processOneChange(ctx, repl, sysCfg, rq.canTransferLease, false /* dryRun */, false /* disableStatsBasedRebalancing */); err != nil {
if requeue, err := rq.processOneChange(ctx, repl, sysCfg, rq.canTransferLease, false /* dryRun */, false /* disableStatsBasedRebalancing */, false /* forceAddReplica */); err != nil {
if IsSnapshotError(err) {
// If ChangeReplicas failed because the preemptive snapshot failed, we
// log the error but then return success indicating we should retry the
Expand Down Expand Up @@ -243,6 +249,7 @@ func (rq *replicateQueue) processOneChange(
canTransferLease func() bool,
dryRun bool,
disableStatsBasedRebalancing bool,
forceAddReplica bool,
) (requeue bool, _ error) {
desc := repl.Desc()

Expand All @@ -263,7 +270,13 @@ func (rq *replicateQueue) processOneChange(
}

rangeInfo := rangeInfoForRepl(repl, desc)
switch action, _ := rq.allocator.ComputeAction(ctx, zone, rangeInfo, disableStatsBasedRebalancing); action {
var action AllocatorAction
if forceAddReplica {
action = AllocatorAdd
} else {
action, _ = rq.allocator.ComputeAction(ctx, zone, rangeInfo, disableStatsBasedRebalancing)
}
switch action {
case AllocatorNoop:
break
case AllocatorAdd:
Expand Down Expand Up @@ -351,7 +364,7 @@ func (rq *replicateQueue) processOneChange(
if removeReplica.StoreID == repl.store.StoreID() {
// The local replica was selected as the removal target, but that replica
// is the leaseholder, so transfer the lease instead. We don't check that
// the current store has too many leases in this case under the
// the current store has too few leases in this case under the
// assumption that replica balance is a greater concern. Also note that
// AllocatorRemove action takes preference over AllocatorConsiderRebalance
// (rebalancing) which is where lease transfer would otherwise occur. We
Expand All @@ -373,6 +386,7 @@ func (rq *replicateQueue) processOneChange(
}
// Do not requeue as we transferred our lease away.
if transferred {
log.Infof(ctx, "transferred lease away to facilitate replica removal")
return false, nil
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4204,7 +4204,7 @@ func (s *Store) AllocatorDryRun(
defer cancel()
canTransferLease := func() bool { return true }
_, err := s.replicateQueue.processOneChange(
ctx, repl, sysCfg, canTransferLease, true /* dryRun */, false /* disableStatsBasedRebalancing */)
ctx, repl, sysCfg, canTransferLease, true /* dryRun */, false /* disableStatsBasedRebalancing */, false /* forceAddReplica */)
if err != nil {
log.Eventf(ctx, "error simulating allocator on replica %s: %s", repl, err)
}
Expand Down