Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move movementConstraint to MoveCost #1531

Merged
merged 24 commits into from
Mar 12, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 9 additions & 34 deletions app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.ClusterBean;
Expand All @@ -51,24 +49,12 @@
import org.astraea.common.cost.HasClusterCost;
import org.astraea.common.cost.HasMoveCost;
import org.astraea.common.cost.MoveCost;
import org.astraea.common.cost.RecordSizeCost;
import org.astraea.common.cost.ReplicaLeaderCost;
import org.astraea.common.cost.ReplicaLeaderSizeCost;
import org.astraea.common.cost.ReplicaNumberCost;
import org.astraea.common.json.TypeRef;
import org.astraea.common.metrics.collector.MetricCollector;
import org.astraea.common.metrics.collector.MetricSensor;

class BalancerHandler implements Handler {

static final HasMoveCost DEFAULT_MOVE_COST_FUNCTIONS =
HasMoveCost.of(
List.of(
new ReplicaNumberCost(),
new ReplicaLeaderCost(),
new RecordSizeCost(),
new ReplicaLeaderSizeCost()));

private final Admin admin;
private final BalancerConsole balancerConsole;
private final Map<String, PostRequestWrapper> taskMetadata = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -351,40 +337,23 @@ static PostRequestWrapper parsePostRequestWrapper(
Configuration.of(balancerPostRequest.balancerConfig),
AlgorithmConfig.builder()
.clusterCost(balancerPostRequest.clusterCost())
.moveCost(DEFAULT_MOVE_COST_FUNCTIONS)
.moveCost(balancerPostRequest.moveCost())
.timeout(balancerPostRequest.timeout)
.movementConstraint(movementConstraint(balancerPostRequest))
.topicFilter(topics::contains)
.build(),
currentClusterInfo);
}

// TODO: There needs to be a way for"GU" and Web to share this function.
static Predicate<MoveCost> movementConstraint(BalancerPostRequest request) {
return cost -> {
if (request.maxMigratedSize.bytes()
< cost.movedRecordSize().values().stream().mapToLong(DataSize::bytes).sum()) return false;
if (request.maxMigratedLeader
< cost.changedReplicaLeaderCount().values().stream().mapToLong(s -> s).sum())
return false;
return true;
};
}

static class BalancerPostRequest implements Request {

String balancer = GreedyBalancer.class.getName();

Map<String, String> balancerConfig = Map.of();

Map<String, String> costConfig = Map.of();
Duration timeout = Duration.ofSeconds(3);
Set<String> topics = Set.of();

DataSize maxMigratedSize = DataSize.Byte.of(Long.MAX_VALUE);

long maxMigratedLeader = Long.MAX_VALUE;

List<CostWeight> clusterCosts = List.of();
Set<String> moveCosts = Set.of();

HasClusterCost clusterCost() {
if (clusterCosts.isEmpty())
Expand All @@ -396,6 +365,12 @@ HasClusterCost clusterCost() {
HasClusterCost.class,
Configuration.EMPTY));
}

HasMoveCost moveCost() {
var config = Configuration.of(costConfig);
var cf = Utils.costFunctions(moveCosts, HasMoveCost.class, config);
return HasMoveCost.of(cf);
}
}

static class CostWeight implements Request {
Expand Down
61 changes: 49 additions & 12 deletions app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
import org.astraea.common.cost.ClusterCost;
import org.astraea.common.cost.HasClusterCost;
import org.astraea.common.cost.HasMoveCost;
import org.astraea.common.cost.MoveCost;
import org.astraea.common.cost.NoSufficientMetricsException;
import org.astraea.common.cost.RecordSizeCost;
import org.astraea.common.json.JsonConverter;
import org.astraea.common.json.TypeRef;
import org.astraea.common.metrics.collector.MetricSensor;
Expand Down Expand Up @@ -124,6 +126,8 @@ void testReport() {
var request = new BalancerPostRequest();
request.balancer = GreedyBalancer.class.getName();
request.balancerConfig = Map.of("a", "b");
request.moveCosts = Set.of("org.astraea.common.cost.RecordSizeCost");
request.costConfig = Map.of(RecordSizeCost.class.getName(), "10GB");
request.timeout = Duration.ofMillis(1234);
var progress = submitPlanGeneration(handler, request);
var report = progress.plan;
Expand Down Expand Up @@ -167,6 +171,8 @@ void testTopics() {
var request = new BalancerPostRequest();
request.balancerConfig = Map.of("iteration", "30");
request.topics = Set.copyOf(allowedTopics);
request.moveCosts = Set.of("org.astraea.common.cost.RecordSizeCost");
request.costConfig = Map.of(RecordSizeCost.class.getName(), "10GB");
var report = submitPlanGeneration(handler, request).plan;
Assertions.assertTrue(
report.changes.stream().map(x -> x.topic).allMatch(allowedTopics::contains),
Expand Down Expand Up @@ -259,6 +265,14 @@ void testBestPlan() {
HasClusterCost clusterCostFunction =
(clusterInfo, clusterBean) -> () -> clusterInfo == currentClusterInfo ? 100D : 10D;
HasMoveCost moveCostFunction = HasMoveCost.EMPTY;
HasMoveCost failMoveCostFunction =
(before, after, clusterBean) ->
new MoveCost() {
@Override
public boolean overflow() {
return true;
}
};

var Best =
Utils.construct(SingleStepBalancer.class, Configuration.EMPTY)
Expand All @@ -274,7 +288,6 @@ void testBestPlan() {
.clusterCost(clusterCostFunction)
.clusterConstraint((before, after) -> after.value() <= before.value())
.moveCost(moveCostFunction)
.movementConstraint(moveCosts -> true)
.build());

Assertions.assertNotEquals(Optional.empty(), Best);
Expand All @@ -296,7 +309,6 @@ void testBestPlan() {
.clusterCost(clusterCostFunction)
.clusterConstraint((before, after) -> true)
.moveCost(moveCostFunction)
.movementConstraint(moveCosts -> true)
.build()));

// test cluster cost predicate
Expand All @@ -315,7 +327,6 @@ void testBestPlan() {
.clusterCost(clusterCostFunction)
.clusterConstraint((before, after) -> false)
.moveCost(moveCostFunction)
.movementConstraint(moveCosts -> true)
.build())
.solution());

Expand All @@ -334,34 +345,43 @@ void testBestPlan() {
.timeout(Duration.ofSeconds(3))
.clusterCost(clusterCostFunction)
.clusterConstraint((before, after) -> true)
.moveCost(moveCostFunction)
.movementConstraint(moveCosts -> false)
.moveCost(failMoveCostFunction)
.build())
.solution());
}
}

@CsvSource(value = {"2,100Byte", "2,500Byte", "2,1GB", "5,100Byte", "5,500Byte", "5,1GB"})
@CsvSource(value = {"5,500Byte", "10,500Byte", "5,1GB"})
@ParameterizedTest
void testMoveCost(String leaderLimit, String sizeLimit) {
createAndProduceTopic(3);
try (var admin = Admin.of(SERVICE.bootstrapServers())) {
var handler = new BalancerHandler(admin);
var request = new BalancerHandler.BalancerPostRequest();
request.maxMigratedSize = DataSize.of(sizeLimit);
request.maxMigratedLeader = Long.parseLong(leaderLimit);
request.costConfig =
Map.of(
"org.astraea.common.cost.ReplicaLeaderCost",
leaderLimit,
"org.astraea.common.cost.RecordSizeCost",
sizeLimit);
var report = submitPlanGeneration(handler, request).plan;
report.migrationCosts.forEach(
migrationCost -> {
switch (migrationCost.name) {
case BalancerHandler.MOVED_SIZE:
Assertions.assertTrue(
migrationCost.brokerCosts.values().stream().mapToLong(Double::intValue).sum()
migrationCost.brokerCosts.values().stream()
.map(Math::abs)
.mapToLong(Double::intValue)
.sum()
<= DataSize.of(sizeLimit).bytes());
break;
case BalancerHandler.CHANGED_LEADERS:
Assertions.assertTrue(
migrationCost.brokerCosts.values().stream().mapToLong(Double::byteValue).sum()
migrationCost.brokerCosts.values().stream()
.map(Math::abs)
.mapToLong(Double::byteValue)
.sum()
<= Integer.parseInt(leaderLimit));
break;
}
Expand Down Expand Up @@ -1373,19 +1393,36 @@ private static Channel httpRequest(Map<String, ?> payload) {
@Test
void testJsonToBalancerPostRequest() {
var json =
"{\"balancer\":\"org.astraea.common.balancer.algorithms.GreedyBalancer\", \"topics\":[\"aa\"], \"clusterCosts\":[{\"cost\":\"aaa\"}]}";
"{\"balancer\":\"org.astraea.common.balancer.algorithms.GreedyBalancer\""
+ ", \"topics\":[\"aa\"]"
+ ", \"clusterCosts\":[{\"cost\":\"aaa\"}],"
+ "\"moveCosts\":["
+ " \"org.astraea.common.cost.RecordSizeCost\","
+ " \"org.astraea.common.cost.ReplicaLeaderCost\""
+ " ],"
+ " \"costConfig\":"
+ " {"
+ " \"maxMigratedSize\": \"500MB\","
+ " \"maxMigratedLeader\": \"50\""
+ " }"
+ "}";
var request =
JsonConverter.defaultConverter().fromJson(json, TypeRef.of(BalancerPostRequest.class));
Assertions.assertEquals(
"org.astraea.common.balancer.algorithms.GreedyBalancer", request.balancer);
Assertions.assertNotNull(request.balancerConfig);
Assertions.assertNotNull(request.timeout);
Assertions.assertEquals(Set.of("aa"), request.topics);
Assertions.assertNotNull(request.maxMigratedSize);

Assertions.assertEquals(1, request.clusterCosts.size());
Assertions.assertEquals("aaa", request.clusterCosts.get(0).cost);
Assertions.assertEquals(1D, request.clusterCosts.get(0).weight);

Assertions.assertEquals(2, request.moveCosts.size());
Assertions.assertEquals(2, request.costConfig.size());
Assertions.assertEquals("500MB", request.costConfig.get("maxMigratedSize"));
Assertions.assertEquals("50", request.costConfig.get("maxMigratedLeader"));

var noCostRequest =
JsonConverter.defaultConverter()
.fromJson(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class Builder {
private HasMoveCost moveCostFunction = HasMoveCost.EMPTY;
private BiPredicate<ClusterCost, ClusterCost> clusterConstraint =
(before, after) -> after.value() < before.value();
private Predicate<MoveCost> movementConstraint = ignore -> true;
private Predicate<MoveCost> movementConstraint = moveCost -> !moveCost.overflow();
private Predicate<String> topicFilter = ignore -> true;

private ClusterInfo clusterInfo;
Expand Down Expand Up @@ -163,18 +163,6 @@ public Builder clusterConstraint(BiPredicate<ClusterCost, ClusterCost> clusterCo
return this;
}

/**
* Specify the movement cost constraint for any rebalance plan.
*
* @param moveConstraint a {@link Predicate} to determine if the rebalance result is
* acceptable(in terms of the ongoing cost caused by execute this rebalance plan).
* @return this
*/
public Builder movementConstraint(Predicate<MoveCost> moveConstraint) {
this.movementConstraint = Objects.requireNonNull(moveConstraint);
return this;
}

/**
* Specify the topics that are eligible for rebalance.
*
Expand Down
7 changes: 7 additions & 0 deletions common/src/main/java/org/astraea/common/cost/HasMoveCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ static HasMoveCost of(Collection<HasMoveCost> hasMoveCosts) {
.map(Optional::get)
.collect(Collectors.toUnmodifiableList()));
return new HasMoveCost() {

@Override
public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
var costs =
Expand Down Expand Up @@ -70,6 +71,7 @@ public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clus
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey, Map.Entry::getValue, (l, r) -> l + r));
var overflow = costs.stream().anyMatch(MoveCost::overflow);
return new MoveCost() {
@Override
public Map<Integer, DataSize> movedReplicaLeaderSize() {
Expand All @@ -90,6 +92,11 @@ public Map<Integer, Integer> changedReplicaCount() {
public Map<Integer, Integer> changedReplicaLeaderCount() {
return changedReplicaLeaderCount;
}

@Override
public boolean overflow() {
return overflow;
}
};
}

Expand Down
35 changes: 31 additions & 4 deletions common/src/main/java/org/astraea/common/cost/MoveCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,59 @@ public interface MoveCost {

MoveCost EMPTY = new MoveCost() {};

static MoveCost movedReplicaLeaderSize(Map<Integer, DataSize> value) {
static MoveCost movedReplicaLeaderSize(Map<Integer, DataSize> value, boolean overflow) {
return new MoveCost() {
@Override
public Map<Integer, DataSize> movedReplicaLeaderSize() {
return value;
}

@Override
public boolean overflow() {
return overflow;
}
};
}

static MoveCost movedRecordSize(Map<Integer, DataSize> value) {
static MoveCost movedRecordSize(Map<Integer, DataSize> value, boolean overflow) {
return new MoveCost() {
@Override
public Map<Integer, DataSize> movedRecordSize() {
return value;
}

@Override
public boolean overflow() {
return overflow;
}
};
}

static MoveCost changedReplicaCount(Map<Integer, Integer> value) {
static MoveCost changedReplicaCount(Map<Integer, Integer> value, boolean overflow) {
return new MoveCost() {
@Override
public Map<Integer, Integer> changedReplicaCount() {
return value;
}

@Override
public boolean overflow() {
return overflow;
}
};
}

static MoveCost changedReplicaLeaderCount(Map<Integer, Integer> value) {
static MoveCost changedReplicaLeaderCount(Map<Integer, Integer> value, boolean overflow) {
return new MoveCost() {
@Override
public Map<Integer, Integer> changedReplicaLeaderCount() {
return value;
}

@Override
public boolean overflow() {
return overflow;
}
};
}

Expand Down Expand Up @@ -90,4 +110,11 @@ default Map<Integer, Integer> changedReplicaCount() {
default Map<Integer, Integer> changedReplicaLeaderCount() {
return Map.of();
}

/**
* @return check if the cost exceeds the limit value of the user
*/
default boolean overflow() {
return false;
}
}
Loading