Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move movementConstraint to MoveCost #1531

Merged
merged 24 commits into from
Mar 12, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 22 additions & 24 deletions app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -61,14 +60,6 @@

class BalancerHandler implements Handler {

static final HasMoveCost DEFAULT_MOVE_COST_FUNCTIONS =
HasMoveCost.of(
List.of(
new ReplicaNumberCost(),
new ReplicaLeaderCost(),
new RecordSizeCost(),
new ReplicaLeaderSizeCost()));

private final Admin admin;
private final BalancerConsole balancerConsole;
private final Map<String, PostRequestWrapper> taskMetadata = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -337,6 +328,24 @@ static PostRequestWrapper parsePostRequestWrapper(
balancerPostRequest.topics.isEmpty()
? currentClusterInfo.topicNames()
: balancerPostRequest.topics;
var moveCostLimit =
Configuration.of(
Map.of(
ReplicaNumberCost.COST_LIMIT_KEY,
Long.toString(balancerPostRequest.maxMigratedReplicas),
ReplicaLeaderCost.COST_LIMIT_KEY,
Long.toString(balancerPostRequest.maxMigratedLeader),
RecordSizeCost.COST_LIMIT_KEY,
Long.toString(balancerPostRequest.maxMigratedSize.bytes()),
ReplicaLeaderSizeCost.COST_LIMIT_KEY,
Long.toString(balancerPostRequest.maxMigratedLeaderSize.bytes())));
var moveCost =
HasMoveCost.of(
List.of(
new ReplicaNumberCost(moveCostLimit),
new ReplicaLeaderCost(moveCostLimit),
new RecordSizeCost(moveCostLimit),
new ReplicaLeaderSizeCost(moveCostLimit)));

if (topics.isEmpty())
throw new IllegalArgumentException(
Expand All @@ -351,26 +360,14 @@ static PostRequestWrapper parsePostRequestWrapper(
Configuration.of(balancerPostRequest.balancerConfig),
AlgorithmConfig.builder()
.clusterCost(balancerPostRequest.clusterCost())
.moveCost(DEFAULT_MOVE_COST_FUNCTIONS)
.moveCost(moveCost)
.movementLimit(moveCostLimit)
.timeout(balancerPostRequest.timeout)
.movementConstraint(movementConstraint(balancerPostRequest))
.topicFilter(topics::contains)
.build(),
currentClusterInfo);
}

// TODO: There needs to be a way for"GU" and Web to share this function.
static Predicate<MoveCost> movementConstraint(BalancerPostRequest request) {
return cost -> {
if (request.maxMigratedSize.bytes()
< cost.movedRecordSize().values().stream().mapToLong(DataSize::bytes).sum()) return false;
if (request.maxMigratedLeader
< cost.changedReplicaLeaderCount().values().stream().mapToLong(s -> s).sum())
return false;
return true;
};
}

static class BalancerPostRequest implements Request {

String balancer = GreedyBalancer.class.getName();
Expand All @@ -381,8 +378,9 @@ static class BalancerPostRequest implements Request {
Set<String> topics = Set.of();

DataSize maxMigratedSize = DataSize.Byte.of(Long.MAX_VALUE);

DataSize maxMigratedLeaderSize = DataSize.Byte.of(Long.MAX_VALUE);
long maxMigratedLeader = Long.MAX_VALUE;
long maxMigratedReplicas = Long.MAX_VALUE;

List<CostWeight> costWeights = List.of();

Expand Down
15 changes: 10 additions & 5 deletions app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.astraea.common.cost.ClusterCost;
import org.astraea.common.cost.HasClusterCost;
import org.astraea.common.cost.HasMoveCost;
import org.astraea.common.cost.MoveCost;
import org.astraea.common.cost.NoSufficientMetricsException;
import org.astraea.common.json.JsonConverter;
import org.astraea.common.json.TypeRef;
Expand Down Expand Up @@ -259,6 +260,14 @@ void testBestPlan() {
HasClusterCost clusterCostFunction =
(clusterInfo, clusterBean) -> () -> clusterInfo == currentClusterInfo ? 100D : 10D;
HasMoveCost moveCostFunction = HasMoveCost.EMPTY;
HasMoveCost failMoveCostFunction =
(before, after, clusterBean) ->
new MoveCost() {
@Override
public boolean overflow() {
return true;
}
};

var Best =
Utils.construct(SingleStepBalancer.class, Configuration.EMPTY)
Expand All @@ -274,7 +283,6 @@ void testBestPlan() {
.clusterCost(clusterCostFunction)
.clusterConstraint((before, after) -> after.value() <= before.value())
.moveCost(moveCostFunction)
.movementConstraint(moveCosts -> true)
.build());

Assertions.assertNotEquals(Optional.empty(), Best);
Expand All @@ -296,7 +304,6 @@ void testBestPlan() {
.clusterCost(clusterCostFunction)
.clusterConstraint((before, after) -> true)
.moveCost(moveCostFunction)
.movementConstraint(moveCosts -> true)
.build()));

// test cluster cost predicate
Expand All @@ -315,7 +322,6 @@ void testBestPlan() {
.clusterCost(clusterCostFunction)
.clusterConstraint((before, after) -> false)
.moveCost(moveCostFunction)
.movementConstraint(moveCosts -> true)
.build())
.solution());

Expand All @@ -334,8 +340,7 @@ void testBestPlan() {
.timeout(Duration.ofSeconds(3))
.clusterCost(clusterCostFunction)
.clusterConstraint((before, after) -> true)
.moveCost(moveCostFunction)
.movementConstraint(moveCosts -> false)
.moveCost(failMoveCostFunction)
.build())
.solution());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.UUID;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
import org.astraea.common.Configuration;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.cost.ClusterCost;
Expand Down Expand Up @@ -65,6 +66,11 @@ static Builder builder(AlgorithmConfig config) {
*/
Predicate<MoveCost> movementConstraint();

/**
* @return the movement constraint that must be complied with by the algorithm solution
*/
Configuration movementLimit();

/**
* @return a {@link Predicate} that can indicate which topic is eligible for rebalance.
*/
Expand Down Expand Up @@ -92,7 +98,8 @@ class Builder {
private HasMoveCost moveCostFunction = HasMoveCost.EMPTY;
private BiPredicate<ClusterCost, ClusterCost> clusterConstraint =
(before, after) -> after.value() < before.value();
private Predicate<MoveCost> movementConstraint = ignore -> true;
private Predicate<MoveCost> movementConstraint = moveCost -> !moveCost.overflow();
private Configuration movementLimit;
private Predicate<String> topicFilter = ignore -> true;

private ClusterInfo clusterInfo;
Expand All @@ -106,6 +113,7 @@ private Builder(AlgorithmConfig config) {
this.moveCostFunction = config.moveCostFunction();
this.clusterConstraint = config.clusterConstraint();
this.movementConstraint = config.movementConstraint();
this.movementLimit = config.movementLimit();
this.topicFilter = config.topicFilter();
this.clusterInfo = config.clusterInfo();
this.clusterBean = config.clusterBean();
Expand Down Expand Up @@ -164,14 +172,14 @@ public Builder clusterConstraint(BiPredicate<ClusterCost, ClusterCost> clusterCo
}

/**
* Specify the movement cost constraint for any rebalance plan.
* Specify the movement cost limit for any rebalance plan.
*
* @param moveConstraint a {@link Predicate} to determine if the rebalance result is
* acceptable(in terms of the ongoing cost caused by execute this rebalance plan).
* @param movementLimit a {@link Configuration} to determine acceptable limits for rebalance
* plans
* @return this
*/
public Builder movementConstraint(Predicate<MoveCost> moveConstraint) {
this.movementConstraint = Objects.requireNonNull(moveConstraint);
public Builder movementLimit(Configuration movementLimit) {
this.movementLimit = movementLimit;
return this;
}

Expand Down Expand Up @@ -245,6 +253,11 @@ public Predicate<MoveCost> movementConstraint() {
return movementConstraint;
}

@Override
public Configuration movementLimit() {
return movementLimit;
}

@Override
public Predicate<String> topicFilter() {
return topicFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public Plan offer(AlgorithmConfig config) {
final var allocationTweaker = new ShuffleTweaker(minStep, maxStep);
final var clusterCostFunction = config.clusterCostFunction();
final var moveCostFunction = config.moveCostFunction();
final var limit = config.movementLimit();
final var initialCost = clusterCostFunction.clusterCost(currentClusterInfo, clusterBean);

final var loop = new AtomicInteger(iteration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public Plan offer(AlgorithmConfig config) {
final var allocationTweaker = new ShuffleTweaker(minStep, maxStep);
final var clusterCostFunction = config.clusterCostFunction();
final var moveCostFunction = config.moveCostFunction();
final var moveCostLimit = config.movementLimit();
final var currentCost =
config.clusterCostFunction().clusterCost(currentClusterInfo, clusterBean);
final var generatorClusterInfo = ClusterInfo.masked(currentClusterInfo, config.topicFilter());
Expand Down
7 changes: 7 additions & 0 deletions common/src/main/java/org/astraea/common/cost/HasMoveCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ static HasMoveCost of(Collection<HasMoveCost> hasMoveCosts) {
.map(Optional::get)
.collect(Collectors.toUnmodifiableList()));
return new HasMoveCost() {

@Override
public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
var costs =
Expand Down Expand Up @@ -70,6 +71,7 @@ public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clus
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey, Map.Entry::getValue, (l, r) -> l + r));
var overflow = costs.stream().anyMatch(MoveCost::overflow);
return new MoveCost() {
@Override
public Map<Integer, DataSize> movedReplicaLeaderSize() {
Expand All @@ -90,6 +92,11 @@ public Map<Integer, Integer> changedReplicaCount() {
public Map<Integer, Integer> changedReplicaLeaderCount() {
return changedReplicaLeaderCount;
}

@Override
public boolean overflow() {
return overflow;
}
};
}

Expand Down
35 changes: 31 additions & 4 deletions common/src/main/java/org/astraea/common/cost/MoveCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,59 @@ public interface MoveCost {

MoveCost EMPTY = new MoveCost() {};

static MoveCost movedReplicaLeaderSize(Map<Integer, DataSize> value) {
static MoveCost movedReplicaLeaderSize(Map<Integer, DataSize> value, boolean overflow) {
return new MoveCost() {
@Override
public Map<Integer, DataSize> movedReplicaLeaderSize() {
return value;
}

@Override
public boolean overflow() {
return overflow;
}
};
}

static MoveCost movedRecordSize(Map<Integer, DataSize> value) {
static MoveCost movedRecordSize(Map<Integer, DataSize> value, boolean overflow) {
return new MoveCost() {
@Override
public Map<Integer, DataSize> movedRecordSize() {
return value;
}

@Override
public boolean overflow() {
return overflow;
}
};
}

static MoveCost changedReplicaCount(Map<Integer, Integer> value) {
static MoveCost changedReplicaCount(Map<Integer, Integer> value, boolean overflow) {
return new MoveCost() {
@Override
public Map<Integer, Integer> changedReplicaCount() {
return value;
}

@Override
public boolean overflow() {
return overflow;
}
};
}

static MoveCost changedReplicaLeaderCount(Map<Integer, Integer> value) {
static MoveCost changedReplicaLeaderCount(Map<Integer, Integer> value, boolean overflow) {
return new MoveCost() {
@Override
public Map<Integer, Integer> changedReplicaLeaderCount() {
return value;
}

@Override
public boolean overflow() {
return overflow;
}
};
}

Expand Down Expand Up @@ -90,4 +110,11 @@ default Map<Integer, Integer> changedReplicaCount() {
default Map<Integer, Integer> changedReplicaLeaderCount() {
return Map.of();
}

/**
* @return check if the cost exceeds the limit value of the user
*/
default boolean overflow() {
return false;
}
}
27 changes: 25 additions & 2 deletions common/src/main/java/org/astraea/common/cost/RecordSizeCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.astraea.common.cost;

import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;
Expand All @@ -27,6 +29,17 @@

public class RecordSizeCost
implements HasClusterCost, HasBrokerCost, HasMoveCost, HasPartitionCost {
public static final String COST_LIMIT_KEY = "max.migrated.leader";
private final Configuration moveCostLimit;

public RecordSizeCost() {
this.moveCostLimit = Configuration.of(Map.of());
}

public RecordSizeCost(Configuration moveCostLimit) {
this.moveCostLimit = moveCostLimit;
}

@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var result =
Expand All @@ -40,7 +53,7 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {

@Override
public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
return MoveCost.movedRecordSize(
var moveCost =
Stream.concat(before.nodes().stream(), after.nodes().stream())
.map(NodeInfo::id)
.distinct()
Expand All @@ -51,7 +64,17 @@ public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clus
id ->
DataSize.Byte.of(
after.replicaStream(id).mapToLong(Replica::size).sum()
- before.replicaStream(id).mapToLong(Replica::size).sum()))));
- before.replicaStream(id).mapToLong(Replica::size).sum())));
var maxMigratedSize =
moveCostLimit.string(COST_LIMIT_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE);
var overflow =
maxMigratedSize
< moveCost.values().stream()
.map(DataSize::bytes)
.map(Math::abs)
.mapToLong(s -> s)
.sum();
return MoveCost.movedRecordSize(moveCost, overflow);
}

@Override
Expand Down
Loading