Skip to content

Commit

Permalink
[COMMON] remove ReplicaInfo and template from ClusterInfo (#1411)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Jan 10, 2023
1 parent e8adebe commit 81324cb
Show file tree
Hide file tree
Showing 48 changed files with 254 additions and 586 deletions.
13 changes: 5 additions & 8 deletions app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ Map<Integer, InetSocketAddress> freshJmxAddresses() {

// visible for test
static PostRequestWrapper parsePostRequestWrapper(
BalancerPostRequest balancerPostRequest, ClusterInfo<Replica> currentClusterInfo) {
BalancerPostRequest balancerPostRequest, ClusterInfo currentClusterInfo) {

var balancerClasspath = balancerPostRequest.balancer;
var balancerConfig = Configuration.of(balancerPostRequest.balancerConfig);
Expand Down Expand Up @@ -534,13 +534,13 @@ static class PostRequestWrapper {
final String balancerClasspath;
final Duration executionTime;
final AlgorithmConfig algorithmConfig;
final ClusterInfo<Replica> clusterInfo;
final ClusterInfo clusterInfo;

PostRequestWrapper(
String balancerClasspath,
Duration executionTime,
AlgorithmConfig algorithmConfig,
ClusterInfo<Replica> clusterInfo) {
ClusterInfo clusterInfo) {
this.balancerClasspath = balancerClasspath;
this.executionTime = executionTime;
this.algorithmConfig = algorithmConfig;
Expand Down Expand Up @@ -640,13 +640,10 @@ static class PlanReport implements Response {

static class PlanInfo {
private final PlanReport report;
private final ClusterInfo<Replica> associatedClusterInfo;
private final ClusterInfo associatedClusterInfo;
private final Balancer.Plan associatedPlan;

PlanInfo(
PlanReport report,
ClusterInfo<Replica> associatedClusterInfo,
Balancer.Plan associatedPlan) {
PlanInfo(PlanReport report, ClusterInfo associatedClusterInfo, Balancer.Plan associatedPlan) {
this.report = report;
this.associatedClusterInfo = associatedClusterInfo;
this.associatedPlan = associatedPlan;
Expand Down
26 changes: 11 additions & 15 deletions app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ void testRebalanceOnePlanAtATime() {
new NoOpExecutor() {
@Override
public CompletionStage<Void> run(
Admin admin, ClusterInfo<Replica> targetAllocation, Duration timeout) {
Admin admin, ClusterInfo targetAllocation, Duration timeout) {
return super.run(admin, targetAllocation, Duration.ofSeconds(5))
// Use another thread to block this completion to avoid deadlock in
// BalancerHandler#put
Expand Down Expand Up @@ -731,7 +731,7 @@ void testLookupRebalanceProgress() {

@Override
public CompletionStage<Void> run(
Admin admin, ClusterInfo<Replica> targetAllocation, Duration timeout) {
Admin admin, ClusterInfo targetAllocation, Duration timeout) {
return super.run(admin, targetAllocation, Duration.ofSeconds(5))
// Use another thread to block this completion to avoid deadlock in
// BalancerHandler#put
Expand Down Expand Up @@ -795,7 +795,7 @@ void testLookupBadExecutionProgress() {
new NoOpExecutor() {
@Override
public CompletionStage<Void> run(
Admin admin, ClusterInfo<Replica> targetAllocation, Duration timeout) {
Admin admin, ClusterInfo targetAllocation, Duration timeout) {
return super.run(admin, targetAllocation, Duration.ofSeconds(5))
.thenCompose(
ignored -> CompletableFuture.failedFuture(new RuntimeException("Boom")));
Expand Down Expand Up @@ -1258,8 +1258,7 @@ private static class NoOpExecutor implements RebalancePlanExecutor {
private final LongAdder executionCounter = new LongAdder();

@Override
public CompletionStage<Void> run(
Admin admin, ClusterInfo<Replica> targetAllocation, Duration timeout) {
public CompletionStage<Void> run(Admin admin, ClusterInfo targetAllocation, Duration timeout) {
executionCounter.increment();
return CompletableFuture.completedFuture(null);
}
Expand All @@ -1271,15 +1270,14 @@ int count() {

public static class DecreasingCost implements HasClusterCost {

private ClusterInfo<Replica> original;
private ClusterInfo original;

public DecreasingCost(Configuration configuration) {}

private double value0 = 1.0;

@Override
public synchronized ClusterCost clusterCost(
ClusterInfo<Replica> clusterInfo, ClusterBean clusterBean) {
public synchronized ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
if (original == null) original = clusterInfo;
if (ClusterInfo.findNonFulfilledAllocation(original, clusterInfo).isEmpty()) return () -> 1;
double theCost = value0;
Expand All @@ -1290,15 +1288,14 @@ public synchronized ClusterCost clusterCost(

public static class IncreasingCost implements HasClusterCost {

private ClusterInfo<Replica> original;
private ClusterInfo original;

public IncreasingCost(Configuration configuration) {}

private double value0 = 1.0;

@Override
public synchronized ClusterCost clusterCost(
ClusterInfo<Replica> clusterInfo, ClusterBean clusterBean) {
public synchronized ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
if (original == null) original = clusterInfo;
if (ClusterInfo.findNonFulfilledAllocation(original, clusterInfo).isEmpty()) return () -> 1;
double theCost = value0;
Expand All @@ -1321,16 +1318,15 @@ public Optional<Fetcher> fetcher() {
}

@Override
public synchronized ClusterCost clusterCost(
ClusterInfo<Replica> clusterInfo, ClusterBean clusterBean) {
public synchronized ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
callback.get().accept(clusterBean);
return super.clusterCost(clusterInfo, clusterBean);
}
}

public static class TimeoutCost implements HasClusterCost {
@Override
public ClusterCost clusterCost(ClusterInfo<Replica> clusterInfo, ClusterBean clusterBean) {
public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
throw new NoSufficientMetricsException(this, Duration.ofSeconds(10));
}
}
Expand All @@ -1348,7 +1344,7 @@ public SpyBalancer(AlgorithmConfig algorithmConfig) {
}

@Override
public Plan offer(ClusterInfo<Replica> currentClusterInfo, Duration timeout) {
public Plan offer(ClusterInfo currentClusterInfo, Duration timeout) {
offerCallbacks.forEach(Runnable::run);
offerCallbacks.clear();
return super.offer(currentClusterInfo, timeout);
Expand Down
7 changes: 2 additions & 5 deletions common/src/main/java/org/astraea/common/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ default CompletionStage<Map<Integer, Set<String>>> brokerFolders() {

CompletionStage<List<Transaction>> transactions(Set<String> transactionIds);

CompletionStage<ClusterInfo<Replica>> clusterInfo(Set<String> topics);
CompletionStage<ClusterInfo> clusterInfo(Set<String> topics);

default CompletionStage<Set<String>> idleTopic(List<TopicChecker> checkers) {
if (checkers.isEmpty()) {
Expand Down Expand Up @@ -449,10 +449,7 @@ default CompletionStage<Boolean> waitReplicasSynced(
* @return a background running loop
*/
default CompletionStage<Boolean> waitCluster(
Set<String> topics,
Predicate<ClusterInfo<Replica>> predicate,
Duration timeout,
int debounce) {
Set<String> topics, Predicate<ClusterInfo> predicate, Duration timeout, int debounce) {
return Utils.loop(
() ->
clusterInfo(topics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ public CompletionStage<List<Transaction>> transactions(Set<String> transactionId
}

@Override
public CompletionStage<ClusterInfo<Replica>> clusterInfo(Set<String> topics) {
public CompletionStage<ClusterInfo> clusterInfo(Set<String> topics) {
return FutureUtils.combine(
clusterIdAndBrokers(),
replicas(topics),
Expand Down
Loading

0 comments on commit 81324cb

Please sign in to comment.