Skip to content

Commit

Permalink
[Issue #333] Support multiple load balance strategy in sdk (#342)
Browse files Browse the repository at this point in the history
* Support multiple load balance strategy in sdk #333

* Fix ut

* add log
  • Loading branch information
ruanwenjun authored May 20, 2021
1 parent 5df57ba commit 6496109
Show file tree
Hide file tree
Showing 13 changed files with 577 additions and 64 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.loadbalance;

/**
* LoadBalance Interface
*
* <p> see {@link RandomLoadBalanceSelector}
* <p> see {@link WeightRoundRobinLoadBalanceSelector}
*
* @param <T> Target type
*/
public interface LoadBalanceSelector<T> {

/**
* Select one
*
* @return target
*/
T select();

/**
* load balance type see {@link LoadBalanceType}
*
* @return load balance type of the selector
*/
LoadBalanceType getType();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.apache.eventmesh.common.loadbalance;

public enum LoadBalanceType {
RANDOM(0, "random load balance strategy"),
WEIGHT_ROUND_ROBIN(1, "weight round robin load balance strategy");

private int code;
private String desc;

LoadBalanceType(int code, String desc) {
this.code = code;
this.desc = desc;
}

public int getCode() {
return code;
}

public void setCode(int code) {
this.code = code;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.loadbalance;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* This selector use random strategy.
* Each selection will randomly give one from the given list
*
* @param <T>
*/
public class RandomLoadBalanceSelector<T> implements LoadBalanceSelector<T> {

private final Logger logger = LoggerFactory.getLogger(RandomLoadBalanceSelector.class);

private final List<T> clusterGroup;

public RandomLoadBalanceSelector(List<T> clusterGroup) {
this.clusterGroup = clusterGroup;
}

@Override
public T select() {
if (CollectionUtils.isEmpty(clusterGroup)) {
logger.warn("No servers available");
return null;
}
return clusterGroup.get(RandomUtils.nextInt(0, clusterGroup.size()));
}

@Override
public LoadBalanceType getType() {
return LoadBalanceType.RANDOM;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.loadbalance;

import java.util.concurrent.atomic.AtomicInteger;

public class Weight<T> {

private T target;

private final int weight;

private final AtomicInteger currentWeight;

public Weight(T target, int weight) {
this.target = target;
this.weight = weight;
this.currentWeight = new AtomicInteger(0);
}

public void decreaseTotal(int total) {
currentWeight.addAndGet(-1 * total);
}

public void increaseCurrentWeight() {
currentWeight.addAndGet(weight);
}


public T getTarget() {
return target;
}

public void setTarget(T target) {
this.target = target;
}

public int getWeight() {
return weight;
}


public AtomicInteger getCurrentWeight() {
return currentWeight;
}

@Override
public String toString() {
return "Wight{" +
"target=" + target +
", weight=" + weight +
", currentWeight=" + currentWeight +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.loadbalance;

import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* This selector use the weighted round robin strategy to select from list.
* If the weight is greater, the probability of being selected is larger.
*
* @param <T>
*/
public class WeightRoundRobinLoadBalanceSelector<T> implements LoadBalanceSelector<T> {

private final Logger logger = LoggerFactory.getLogger(WeightRoundRobinLoadBalanceSelector.class);

private final List<Weight<T>> clusterGroup;

private final int totalWeight;

public WeightRoundRobinLoadBalanceSelector(List<Weight<T>> clusterGroup) {
int totalWeight = 0;
for (Weight<T> weight : clusterGroup) {
totalWeight += weight.getWeight();
}
this.clusterGroup = clusterGroup;
this.totalWeight = totalWeight;
}


@Override
@SuppressWarnings("ConstantConditions")
public T select() {
if (CollectionUtils.isEmpty(clusterGroup)) {
logger.warn("No servers available");
return null;
}
Weight<T> targetWeight = null;
for (Weight<T> weight : clusterGroup) {
weight.increaseCurrentWeight();
if (targetWeight == null || targetWeight.getCurrentWeight().get() < weight.getCurrentWeight().get()) {
targetWeight = weight;
}
}
targetWeight.decreaseTotal(totalWeight);
return targetWeight.getTarget();
}

@Override
public LoadBalanceType getType() {
return LoadBalanceType.WEIGHT_ROUND_ROBIN;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.loadbalance;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RandomLoadBalanceSelectorTest {

private RandomLoadBalanceSelector<String> randomLoadBalanceSelector;

private Logger logger = LoggerFactory.getLogger(RandomLoadBalanceSelectorTest.class);

@Before
public void befor() {
List<String> address = new ArrayList<>();
address.add("A");
address.add("B");
address.add("C");
randomLoadBalanceSelector = new RandomLoadBalanceSelector<>(address);
}


@Test
public void testSelect() {
Map<String, Integer> addressToNum = new HashMap<>();
for (int i = 0; i < 100; i++) {
String select = randomLoadBalanceSelector.select();
addressToNum.put(select, addressToNum.getOrDefault(select, 0) + 1);
}
addressToNum.forEach((key, value) -> logger.info("{} : {}", key, value));
// just assert success if no exception
Assert.assertTrue(true);
}

@Test
public void testGetType() {
Assert.assertEquals(LoadBalanceType.RANDOM, randomLoadBalanceSelector.getType());
}
}
Loading

0 comments on commit 6496109

Please sign in to comment.