Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move movementConstraint to MoveCost #1531

Merged
merged 24 commits into from
Mar 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 8 additions & 20 deletions app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -61,14 +60,6 @@

class BalancerHandler implements Handler {

static final HasMoveCost DEFAULT_MOVE_COST_FUNCTIONS =
HasMoveCost.of(
List.of(
new ReplicaNumberCost(),
new ReplicaLeaderCost(),
new RecordSizeCost(),
new ReplicaLeaderSizeCost()));

private final Admin admin;
private final BalancerConsole balancerConsole;
private final Map<String, PostRequestWrapper> taskMetadata = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -351,24 +342,21 @@ static PostRequestWrapper parsePostRequestWrapper(
Configuration.of(balancerPostRequest.balancerConfig),
AlgorithmConfig.builder()
.clusterCost(balancerPostRequest.clusterCost())
.moveCost(DEFAULT_MOVE_COST_FUNCTIONS)
.moveCost(moveCosts(balancerPostRequest))
.timeout(balancerPostRequest.timeout)
.movementConstraint(movementConstraint(balancerPostRequest))
.topicFilter(topics::contains)
.build(),
currentClusterInfo);
}

// TODO: There needs to be a way for"GU" and Web to share this function.
static Predicate<MoveCost> movementConstraint(BalancerPostRequest request) {
return cost -> {
if (request.maxMigratedSize.bytes()
< cost.movedRecordSize().values().stream().mapToLong(DataSize::bytes).sum()) return false;
if (request.maxMigratedLeader
< cost.changedReplicaLeaderCount().values().stream().mapToLong(s -> s).sum())
return false;
return true;
};
static HasMoveCost moveCosts(BalancerPostRequest request) {
return HasMoveCost.of(
List.of(
new ReplicaNumberCost(),
new ReplicaLeaderCost(request.maxMigratedLeader),
new RecordSizeCost(request.maxMigratedSize),
new ReplicaLeaderSizeCost()));
}

static class BalancerPostRequest implements Request {
Expand Down
12 changes: 11 additions & 1 deletion common/src/main/java/org/astraea/common/balancer/Balancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class Solution {
final ClusterInfo proposal;
final ClusterCost proposalClusterCost;
final MoveCost moveCost;
final boolean overflow;

public ClusterInfo proposal() {
return proposal;
Expand All @@ -80,10 +81,19 @@ public MoveCost moveCost() {
return moveCost;
}

public Solution(ClusterCost proposalClusterCost, MoveCost moveCost, ClusterInfo proposal) {
public boolean isOverflow() {
return overflow;
}

public Solution(
ClusterCost proposalClusterCost,
MoveCost moveCost,
boolean overflow,
ClusterInfo proposal) {
this.proposal = proposal;
this.proposalClusterCost = proposalClusterCost;
this.moveCost = moveCost;
this.overflow = overflow;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,14 @@ public Plan offer(AlgorithmConfig config) {
clusterCostFunction.clusterCost(newClusterInfo, clusterBean),
moveCostFunction.moveCost(
currentClusterInfo, newClusterInfo, clusterBean),
moveCostFunction.overflow(
currentClusterInfo, newClusterInfo, clusterBean),
newAllocation);
})
.filter(
plan ->
config.clusterConstraint().test(currentCost, plan.proposalClusterCost()))
.filter(plan -> config.movementConstraint().test(plan.moveCost()))
.filter(plan -> !plan.isOverflow())
.findFirst();
var currentCost = initialCost;
var currentAllocation = ClusterInfo.masked(currentClusterInfo, config.topicFilter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public Plan offer(AlgorithmConfig config) {
return new Solution(
clusterCostFunction.clusterCost(newClusterInfo, clusterBean),
moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, clusterBean),
moveCostFunction.overflow(currentClusterInfo, newClusterInfo, clusterBean),
newAllocation);
})
.filter(plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost()))
Expand Down
13 changes: 13 additions & 0 deletions common/src/main/java/org/astraea/common/cost/HasMoveCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ static HasMoveCost of(Collection<HasMoveCost> hasMoveCosts) {
.map(Optional::get)
.collect(Collectors.toUnmodifiableList()));
return new HasMoveCost() {
@Override
public boolean overflow(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
return hasMoveCosts.stream()
.anyMatch(hasMoveCost -> hasMoveCost.overflow(before, after, clusterBean));
}

@Override
public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
var costs =
Expand Down Expand Up @@ -118,4 +124,11 @@ public String toString() {
* @return the score of migrate cost
*/
MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean);

/**
* @return check if the cost exceeds the limit value of the user
*/
default boolean overflow(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
return false;
}
}
20 changes: 20 additions & 0 deletions common/src/main/java/org/astraea/common/cost/RecordSizeCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@

public class RecordSizeCost
implements HasClusterCost, HasBrokerCost, HasMoveCost, HasPartitionCost {
private final DataSize maxMigratedSize;

public RecordSizeCost(DataSize maxMigratedSize) {
this.maxMigratedSize = maxMigratedSize;
}

public RecordSizeCost() {
this.maxMigratedSize = DataSize.Byte.of(Long.MAX_VALUE);
}

@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var result =
Expand Down Expand Up @@ -54,6 +64,16 @@ public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clus
- before.replicaStream(id).mapToLong(Replica::size).sum()))));
}

@Override
public boolean overflow(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
var moveCost = moveCost(before, after, clusterBean);
return maxMigratedSize.bytes()
< moveCost.movedRecordSize().values().stream()
.map(x -> Math.abs(x.bytes()))
.mapToLong(x -> x)
.sum();
}

@Override
public PartitionCost partitionCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@
public class ReplicaLeaderCost implements HasBrokerCost, HasClusterCost, HasMoveCost {
private final Dispersion dispersion = Dispersion.cov();
public static final String COST_NAME = "leader";
private final long maxMigratedLeader;

public ReplicaLeaderCost(long maxMigratedLeader) {
this.maxMigratedLeader = maxMigratedLeader;
}

public ReplicaLeaderCost() {
this.maxMigratedLeader = Long.MAX_VALUE;
}

@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
Expand Down Expand Up @@ -130,6 +139,16 @@ public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clus
})));
}

@Override
public boolean overflow(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
var moveCost = moveCost(before, after, clusterBean);
return maxMigratedLeader
< moveCost.changedReplicaLeaderCount().values().stream()
.map(Math::abs)
.mapToLong(s -> s)
.sum();
}

@Override
public String toString() {
return this.getClass().getSimpleName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public Plan offer(AlgorithmConfig config) {
return new Plan(
config.clusterInfo(),
() -> 0,
new Solution(() -> 0, MoveCost.EMPTY, config.clusterInfo()));
new Solution(() -> 0, MoveCost.EMPTY, false, config.clusterInfo()));
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ void testResult() {
new Balancer.Solution(
new ReplicaLeaderCost().clusterCost(beforeClusterInfo, ClusterBean.EMPTY),
MoveCost.EMPTY,
false,
ClusterInfo.of("fake", allNodes, Map.of(), afterReplicas)));
Assertions.assertEquals(results.size(), 1);
Assertions.assertEquals(results.get(0).get("topic"), topic);
Expand Down