diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java index 0e59f2cacf4..eab16e6d0f1 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java @@ -138,4 +138,8 @@ public interface Constants { String RULE_VERSION_V27 = "v2.7"; String RULE_VERSION_V30 = "v3.0"; + + String RULE_VERSION_V31 = "v3.1"; + + public static final int DefaultRouteConditionSubSetWeight = 100; } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/MultiDestConditionRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/MultiDestConditionRouter.java new file mode 100644 index 00000000000..fa986ba147d --- /dev/null +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/MultiDestConditionRouter.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.router.condition; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.common.utils.Holder; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.cluster.router.RouterSnapshotNode; +import org.apache.dubbo.rpc.cluster.router.condition.config.model.ConditionSubSet; +import org.apache.dubbo.rpc.cluster.router.condition.config.model.DestinationSet; +import org.apache.dubbo.rpc.cluster.router.condition.config.model.MultiDestCondition; +import org.apache.dubbo.rpc.cluster.router.condition.matcher.ConditionMatcher; +import org.apache.dubbo.rpc.cluster.router.condition.matcher.ConditionMatcherFactory; +import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter; +import org.apache.dubbo.rpc.cluster.router.state.BitList; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_CONDITIONAL_ROUTE_LIST_EMPTY; +import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_EXEC_CONDITION_ROUTER; +import static org.apache.dubbo.rpc.cluster.Constants.DefaultRouteConditionSubSetWeight; +import static org.apache.dubbo.rpc.cluster.Constants.RULE_KEY; + +public class MultiDestConditionRouter extends AbstractStateRouter { + public static final String NAME = "multi_condition"; + + private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractStateRouter.class); + protected static final Pattern ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)"); + private Map whenCondition; + private List thenCondition; + private boolean force; + protected List matcherFactories; + private boolean enabled; + + public MultiDestConditionRouter(URL url, MultiDestCondition multiDestCondition, boolean force, boolean enabled) { + super(url); + this.setForce(force); + this.enabled = enabled; + matcherFactories = + moduleModel.getExtensionLoader(ConditionMatcherFactory.class).getActivateExtensions(); + this.init(multiDestCondition.getFrom(), multiDestCondition.getTo()); + } + + public void init(Map from, List> to) { + try { + if (from == null || to == null) { + throw new IllegalArgumentException("Illegal route rule!"); + } + String whenRule = from.get("match"); + Map when = + StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<>() : parseRule(whenRule); + this.whenCondition = when; + + List thenConditions = new ArrayList<>(); + for (Map toMap : to) { + String thenRule = toMap.get("match"); + Map then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) + ? new HashMap<>() + : parseRule(thenRule); + // NOTE: It should be determined on the business level whether the `When condition` can be empty or not. + + thenConditions.add(new ConditionSubSet( + then, + Integer.valueOf( + toMap.getOrDefault("weight", String.valueOf(DefaultRouteConditionSubSetWeight))))); + } + this.thenCondition = thenConditions; + } catch (ParseException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + private Map parseRule(String rule) throws ParseException { + Map condition = new HashMap<>(); + if (StringUtils.isBlank(rule)) { + return condition; + } + // Key-Value pair, stores both match and mismatch conditions + ConditionMatcher matcherPair = null; + // Multiple values + Set values = null; + final Matcher matcher = ROUTE_PATTERN.matcher(rule); + while (matcher.find()) { // Try to match one by one + String separator = matcher.group(1); + String content = matcher.group(2); + // Start part of the condition expression. + if (StringUtils.isEmpty(separator)) { + matcherPair = this.getMatcher(content); + condition.put(content, matcherPair); + } + // The KV part of the condition expression + else if ("&".equals(separator)) { + if (condition.get(content) == null) { + matcherPair = this.getMatcher(content); + condition.put(content, matcherPair); + } else { + matcherPair = condition.get(content); + } + } + // The Value in the KV part. + else if ("=".equals(separator)) { + if (matcherPair == null) { + throw new ParseException( + "Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + + matcher.start() + " before \"" + content + "\".", + matcher.start()); + } + + values = matcherPair.getMatches(); + values.add(content); + } + // The Value in the KV part. + else if ("!=".equals(separator)) { + if (matcherPair == null) { + throw new ParseException( + "Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + + matcher.start() + " before \"" + content + "\".", + matcher.start()); + } + + values = matcherPair.getMismatches(); + values.add(content); + } + // The Value in the KV part, if Value have more than one items. + else if (",".equals(separator)) { // Should be separated by ',' + if (values == null || values.isEmpty()) { + throw new ParseException( + "Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + + matcher.start() + " before \"" + content + "\".", + matcher.start()); + } + values.add(content); + } else { + throw new ParseException( + "Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + + matcher.start() + " before \"" + content + "\".", + matcher.start()); + } + } + return condition; + } + + private ConditionMatcher getMatcher(String key) { + for (ConditionMatcherFactory factory : matcherFactories) { + if (factory.shouldMatch(key)) { + return factory.createMatcher(key, moduleModel); + } + } + return moduleModel + .getExtensionLoader(ConditionMatcherFactory.class) + .getExtension("param") + .createMatcher(key, moduleModel); + } + + @Override + protected BitList> doRoute( + BitList> invokers, + URL url, + Invocation invocation, + boolean needToPrintMessage, + Holder> routerSnapshotNodeHolder, + Holder messageHolder) + throws RpcException { + + if (!enabled) { + if (needToPrintMessage) { + messageHolder.set("Directly return. Reason: ConditionRouter disabled."); + } + return invokers; + } + + if (CollectionUtils.isEmpty(invokers)) { + if (needToPrintMessage) { + messageHolder.set("Directly return. Reason: Invokers from previous router is empty."); + } + return invokers; + } + + try { + if (!matchWhen(url, invocation)) { + if (needToPrintMessage) { + messageHolder.set("Directly return. Reason: WhenCondition not match."); + } + return invokers; + } + if (thenCondition == null || thenCondition.size() == 0) { + logger.warn( + CLUSTER_CONDITIONAL_ROUTE_LIST_EMPTY, + "condition state router thenCondition is empty", + "", + "The current consumer in the service blocklist. consumer: " + NetUtils.getLocalHost() + + ", service: " + url.getServiceKey()); + if (needToPrintMessage) { + messageHolder.set("Empty return. Reason: ThenCondition is empty."); + } + return BitList.emptyList(); + } + + DestinationSet destinations = new DestinationSet(); + for (ConditionSubSet condition : thenCondition) { + BitList> res = invokers.clone(); + + for (Invoker invoker : invokers) { + if (!doMatch(invoker.getUrl(), url, null, condition.getCondition(), false)) { + res.remove(invoker); + } + } + if (!res.isEmpty()) { + destinations.addDestination( + condition.getSubSetWeight() == null + ? DefaultRouteConditionSubSetWeight + : condition.getSubSetWeight(), + res.clone()); + } + } + + if (!destinations.getDestinations().isEmpty()) { + BitList> res = destinations.randDestination(); + return res; + } else if (this.isForce()) { + logger.warn( + CLUSTER_CONDITIONAL_ROUTE_LIST_EMPTY, + "execute condition state router result list is " + "empty. and force=true", + "", + "The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + + ", service: " + url.getServiceKey() + ", router: " + + url.getParameterAndDecoded(RULE_KEY)); + if (needToPrintMessage) { + messageHolder.set("Empty return. Reason: Empty result from condition and condition is force."); + } + return BitList.emptyList(); + } + + } catch (Throwable t) { + logger.error( + CLUSTER_FAILED_EXEC_CONDITION_ROUTER, + "execute condition state router exception", + "", + "Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + + t.getMessage(), + t); + } + if (needToPrintMessage) { + messageHolder.set("Directly return. Reason: Error occurred ( or result is empty )."); + } + return invokers; + } + + boolean matchWhen(URL url, Invocation invocation) { + if (CollectionUtils.isEmptyMap(whenCondition)) { + return true; + } + + return doMatch(url, null, invocation, whenCondition, true); + } + + private boolean doMatch( + URL url, + URL param, + Invocation invocation, + Map conditions, + boolean isWhenCondition) { + Map sample = url.toOriginalMap(); + for (Map.Entry entry : conditions.entrySet()) { + ConditionMatcher matchPair = entry.getValue(); + + if (!matchPair.isMatch(sample, param, invocation, isWhenCondition)) { + return false; + } + } + return true; + } + + public void setWhenCondition(Map whenCondition) { + this.whenCondition = whenCondition; + } + + public void setThenCondition(List thenCondition) { + this.thenCondition = thenCondition; + } + + public void setForce(boolean force) { + this.force = force; + } + + public Map getWhenCondition() { + return whenCondition; + } + + public boolean isForce() { + return force; + } + + public List getThenCondition() { + return thenCondition; + } + + public List getMatcherFactories() { + return matcherFactories; + } + + public void setMatcherFactories(List matcherFactories) { + this.matcherFactories = matcherFactories; + } + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } +} diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ListenableStateRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ListenableStateRouter.java index 575bd462822..a74214d2dcd 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ListenableStateRouter.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ListenableStateRouter.java @@ -29,10 +29,13 @@ import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.cluster.router.AbstractRouterRule; import org.apache.dubbo.rpc.cluster.router.RouterSnapshotNode; import org.apache.dubbo.rpc.cluster.router.condition.ConditionStateRouter; +import org.apache.dubbo.rpc.cluster.router.condition.MultiDestConditionRouter; import org.apache.dubbo.rpc.cluster.router.condition.config.model.ConditionRouterRule; import org.apache.dubbo.rpc.cluster.router.condition.config.model.ConditionRuleParser; +import org.apache.dubbo.rpc.cluster.router.condition.config.model.MultiDestConditionRouterRule; import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter; import org.apache.dubbo.rpc.cluster.router.state.BitList; import org.apache.dubbo.rpc.cluster.router.state.TailStateRouter; @@ -52,8 +55,11 @@ public abstract class ListenableStateRouter extends AbstractStateRouter im private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(ListenableStateRouter.class); - private volatile ConditionRouterRule routerRule; + private volatile AbstractRouterRule routerRule; private volatile List> conditionRouters = Collections.emptyList(); + + // for v3.1 + private volatile List> multiDestConditionRouters = Collections.emptyList(); private final String ruleKey; public ListenableStateRouter(URL url, String ruleKey) { @@ -73,6 +79,8 @@ public synchronized void process(ConfigChangedEvent event) { if (event.getChangeType().equals(ConfigChangeType.DELETED)) { routerRule = null; conditionRouters = Collections.emptyList(); + // for v3.1 + multiDestConditionRouters = Collections.emptyList(); } else { try { routerRule = ConditionRuleParser.parse(event.getContent()); @@ -99,7 +107,8 @@ public BitList> doRoute( Holder> nodeHolder, Holder messageHolder) throws RpcException { - if (CollectionUtils.isEmpty(invokers) || conditionRouters.size() == 0) { + if (CollectionUtils.isEmpty(invokers) + || (conditionRouters.size() == 0 && multiDestConditionRouters.size() == 0)) { if (needToPrintMessage) { messageHolder.set( "Directly return. Reason: Invokers from previous router is empty or conditionRouters is empty."); @@ -112,7 +121,15 @@ public BitList> doRoute( if (needToPrintMessage) { resultMessage = new StringBuilder(); } - for (AbstractStateRouter router : conditionRouters) { + + List> routers; + if (routerRule instanceof MultiDestConditionRouterRule) { + routers = multiDestConditionRouters; + } else { + routers = conditionRouters; + } + + for (AbstractStateRouter router : routers) { invokers = router.route(invokers, url, invocation, needToPrintMessage, nodeHolder); if (needToPrintMessage) { resultMessage.append(messageHolder.get()); @@ -135,15 +152,31 @@ private boolean isRuleRuntime() { return routerRule != null && routerRule.isValid() && routerRule.isRuntime(); } - private void generateConditions(ConditionRouterRule rule) { - if (rule != null && rule.isValid()) { - this.conditionRouters = rule.getConditions().stream() - .map(condition -> - new ConditionStateRouter(getUrl(), condition, rule.isForce(), rule.isEnabled())) - .collect(Collectors.toList()); + private void generateConditions(AbstractRouterRule rule) { + if (rule == null || !rule.isValid()) { + return; + } + + if (rule instanceof ConditionRouterRule) { + this.conditionRouters = ((ConditionRouterRule) rule) + .getConditions().stream() + .map(condition -> + new ConditionStateRouter(getUrl(), condition, rule.isForce(), rule.isEnabled())) + .collect(Collectors.toList()); + for (ConditionStateRouter conditionRouter : this.conditionRouters) { conditionRouter.setNextRouter(TailStateRouter.getInstance()); } + } else if (rule instanceof MultiDestConditionRouterRule) { + this.multiDestConditionRouters = ((MultiDestConditionRouterRule) rule) + .getConditions().stream() + .map(condition -> new MultiDestConditionRouter( + getUrl(), condition, rule.isForce(), rule.isEnabled())) + .collect(Collectors.toList()); + + for (MultiDestConditionRouter conditionRouter : this.multiDestConditionRouters) { + conditionRouter.setNextRouter(TailStateRouter.getInstance()); + } } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/ConditionRouterRule.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/ConditionRouterRule.java index 7d9c936c3f8..08469da0855 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/ConditionRouterRule.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/ConditionRouterRule.java @@ -28,7 +28,7 @@ public class ConditionRouterRule extends AbstractRouterRule { private List conditions; @SuppressWarnings("unchecked") - public static ConditionRouterRule parseFromMap(Map map) { + public static AbstractRouterRule parseFromMap(Map map) { ConditionRouterRule conditionRouterRule = new ConditionRouterRule(); conditionRouterRule.parseFromMap0(map); diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/ConditionRuleParser.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/ConditionRuleParser.java index 9aa80f908bb..297acb7d1d0 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/ConditionRuleParser.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/ConditionRuleParser.java @@ -16,7 +16,10 @@ */ package org.apache.dubbo.rpc.cluster.router.condition.config.model; +import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; +import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.rpc.cluster.router.AbstractRouterRule; import java.util.Map; @@ -24,6 +27,10 @@ import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.SafeConstructor; +import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_RULE_PARSING; +import static org.apache.dubbo.rpc.cluster.Constants.CONFIG_VERSION_KEY; +import static org.apache.dubbo.rpc.cluster.Constants.RULE_VERSION_V31; + /** * %YAML1.2 * @@ -40,13 +47,36 @@ */ public class ConditionRuleParser { - public static ConditionRouterRule parse(String rawRule) { + private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(ConditionRuleParser.class); + + public static AbstractRouterRule parse(String rawRule) { + AbstractRouterRule rule; Yaml yaml = new Yaml(new SafeConstructor(new LoaderOptions())); Map map = yaml.load(rawRule); - ConditionRouterRule rule = ConditionRouterRule.parseFromMap(map); - rule.setRawRule(rawRule); - if (CollectionUtils.isEmpty(rule.getConditions())) { - rule.setValid(false); + String confVersion = (String) map.get(CONFIG_VERSION_KEY); + + if (confVersion != null && confVersion.toLowerCase().startsWith(RULE_VERSION_V31)) { + rule = MultiDestConditionRouterRule.parseFromMap(map); + if (CollectionUtils.isEmpty(((MultiDestConditionRouterRule) rule).getConditions())) { + rule.setValid(false); + } + } else if (confVersion != null && confVersion.compareToIgnoreCase(RULE_VERSION_V31) > 0) { + logger.warn( + CLUSTER_FAILED_RULE_PARSING, + "Invalid condition config version number.", + "", + "Ignore this configuration. Only " + RULE_VERSION_V31 + " and below are supported in this release"); + rule = null; + } else { + // for under v3.1 + rule = ConditionRouterRule.parseFromMap(map); + if (CollectionUtils.isEmpty(((ConditionRouterRule) rule).getConditions())) { + rule.setValid(false); + } + } + + if (rule != null) { + rule.setRawRule(rawRule); } return rule; diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/ConditionSubSet.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/ConditionSubSet.java new file mode 100644 index 00000000000..3f1577222d2 --- /dev/null +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/ConditionSubSet.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.router.condition.config.model; + +import org.apache.dubbo.rpc.cluster.router.condition.matcher.ConditionMatcher; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.dubbo.rpc.cluster.Constants.DefaultRouteConditionSubSetWeight; + +public class ConditionSubSet { + private Map condition = new HashMap<>(); + private Integer subSetWeight; + + public ConditionSubSet() {} + + public ConditionSubSet(Map condition, Integer subSetWeight) { + this.condition = condition; + this.subSetWeight = subSetWeight; + if (subSetWeight <= 0) { + this.subSetWeight = DefaultRouteConditionSubSetWeight; + } + } + + public Map getCondition() { + return condition; + } + + public void setCondition(Map condition) { + this.condition = condition; + } + + public Integer getSubSetWeight() { + return subSetWeight; + } + + public void setSubSetWeight(int subSetWeight) { + this.subSetWeight = subSetWeight; + } + + @Override + public String toString() { + return "ConditionSubSet{" + "cond=" + condition + ", subSetWeight=" + subSetWeight + '}'; + } +} diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/Destination.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/Destination.java new file mode 100644 index 00000000000..f5bb74c7e2c --- /dev/null +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/Destination.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.router.condition.config.model; + +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.cluster.router.state.BitList; + +public class Destination { + private int weight; + private BitList> invokers; + + Destination(int weight, BitList> invokers) { + this.weight = weight; + this.invokers = invokers; + } + + public int getWeight() { + return weight; + } + + public void setWeight(int weight) { + this.weight = weight; + } + + public BitList> getInvokers() { + return invokers; + } + + public void setInvokers(BitList> invokers) { + this.invokers = invokers; + } +} diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/DestinationSet.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/DestinationSet.java new file mode 100644 index 00000000000..42f7eef2a29 --- /dev/null +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/DestinationSet.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.router.condition.config.model; + +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.cluster.router.state.BitList; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +public class DestinationSet { + private final List> destinations; + private long weightSum; + private final ThreadLocalRandom random; + + public DestinationSet() { + this.destinations = new ArrayList<>(); + this.weightSum = 0; + this.random = ThreadLocalRandom.current(); + } + + public void addDestination(int weight, BitList> invokers) { + destinations.add(new Destination(weight, invokers)); + weightSum += weight; + } + + public BitList> randDestination() { + if (destinations.size() == 1) { + return destinations.get(0).getInvokers(); + } + + long sum = random.nextLong(weightSum); + for (Destination destination : destinations) { + sum -= destination.getWeight(); + if (sum <= 0) { + return destination.getInvokers(); + } + } + return BitList.emptyList(); + } + + public List> getDestinations() { + return destinations; + } + + public long getWeightSum() { + return weightSum; + } + + public void setWeightSum(long weightSum) { + this.weightSum = weightSum; + } + + public Random getRandom() { + return random; + } +} diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/MultiDestCondition.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/MultiDestCondition.java new file mode 100644 index 00000000000..b6c20893e34 --- /dev/null +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/MultiDestCondition.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.router.condition.config.model; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MultiDestCondition { + private Map from = new HashMap<>(); + private List> to = new ArrayList<>(); + + public Map getFrom() { + return from; + } + + public void setFrom(Map from) { + this.from = from; + } + + public List> getTo() { + return to; + } + + public void setTo(List> to) { + this.to = to; + } + + @Override + public String toString() { + return "MultiDestCondition{" + "from=" + from + ", to=" + to + '}'; + } +} diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/MultiDestConditionRouterRule.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/MultiDestConditionRouterRule.java new file mode 100644 index 00000000000..317b1da243e --- /dev/null +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/model/MultiDestConditionRouterRule.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.router.condition.config.model; + +import org.apache.dubbo.common.utils.JsonUtils; +import org.apache.dubbo.rpc.cluster.router.AbstractRouterRule; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.dubbo.rpc.cluster.Constants.CONDITIONS_KEY; + +public class MultiDestConditionRouterRule extends AbstractRouterRule { + + private List conditions; + + public static AbstractRouterRule parseFromMap(Map map) { + + MultiDestConditionRouterRule multiDestConditionRouterRule = new MultiDestConditionRouterRule(); + multiDestConditionRouterRule.parseFromMap0(map); + List> conditions = (List>) map.get(CONDITIONS_KEY); + List multiDestConditions = new ArrayList<>(); + + for (Map condition : conditions) { + multiDestConditions.add((MultiDestCondition) JsonUtils.convertObject(condition, MultiDestCondition.class)); + } + multiDestConditionRouterRule.setConditions(multiDestConditions); + + return multiDestConditionRouterRule; + } + + public List getConditions() { + return conditions; + } + + public void setConditions(List conditions) { + this.conditions = conditions; + } +} diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/config/ConditionStateRouterTestV31.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/config/ConditionStateRouterTestV31.java new file mode 100644 index 00000000000..e4db2abf87f --- /dev/null +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/config/ConditionStateRouterTestV31.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.router.condition.config; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.configcenter.ConfigChangeType; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; +import org.apache.dubbo.common.utils.Holder; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.cluster.router.AbstractRouterRule; +import org.apache.dubbo.rpc.cluster.router.MockInvoker; +import org.apache.dubbo.rpc.cluster.router.condition.config.model.ConditionRuleParser; +import org.apache.dubbo.rpc.cluster.router.condition.config.model.MultiDestConditionRouterRule; +import org.apache.dubbo.rpc.cluster.router.state.BitList; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ConditionStateRouterTestV31 { + + private static BitList> invokers; + + @BeforeAll + public static void setUp() { + + List providerUrls = Arrays.asList( + "dubbo://127.0.0.1/com.foo.BarService", + "dubbo://127.0.0.1/com.foo.BarService", + "dubbo://127.0.0.1/com.foo.BarService?env=normal", + "dubbo://127.0.0.1/com.foo.BarService?env=normal", + "dubbo://127.0.0.1/com.foo.BarService?env=normal", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=normal", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService", + "dubbo://dubbo.apache.org/com.foo.BarService", + "dubbo://dubbo.apache.org/com.foo.BarService?env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal"); + + List> invokerList = providerUrls.stream() + .map(url -> new MockInvoker(URL.valueOf(url))) + .collect(Collectors.toList()); + + invokers = new BitList<>(invokerList); + } + + @Test + public void testParseRawRule() { + String config = + "configVersion: v3.1\n" + "scope: service\n" + "force: false\n" + "runtime: true\n" + "enabled: true\n" + + "key: shop\n" + "conditions:\n" + " - from:\n" + " match:\n" + " to:\n" + + " - match: region=$region & version=v1\n" + + " - match: region=$region & version=v2\n" + " weight: 200\n" + + " - match: region=$region & version=v3\n" + " weight: 300\n" + " - from:\n" + + " match: region=beijing & version=v1\n" + " to:\n" + + " - match: env=$env & region=beijing\n"; + + AbstractRouterRule routerRule = ConditionRuleParser.parse(config); + Assertions.assertInstanceOf(MultiDestConditionRouterRule.class, routerRule); + MultiDestConditionRouterRule rule = (MultiDestConditionRouterRule) routerRule; + Assertions.assertEquals(rule.getConditions().size(), 2); + Assertions.assertEquals(rule.getConditions().get(0).getTo().size(), 3); + Assertions.assertEquals(rule.getConditions().get(1).getTo().size(), 1); + System.out.println("rule.getConditions() = " + rule.getConditions()); + } + + @Test + public void testMultiplyConditionRoute() { + + String rawRule = "configVersion: v3.1\n" + "scope: service\n" + + "key: com.foo.BarService\n" + + "force: false\n" + + "runtime: true\n" + + "enabled: true\n" + + "conditions:\n" + + " - from:\n" + + " match: env=gray\n" + + " to:\n" + + " - match: env!=gray\n" + + " weight: 100"; + + ServiceStateRouter router = new ServiceStateRouter<>( + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")); + router.process(new ConfigChangedEvent("com.foo.BarService", "", rawRule, ConfigChangeType.ADDED)); + + RpcInvocation invocation = new RpcInvocation(); + invocation.setMethodName("getComment"); + + BitList> result = router.route( + invokers.clone(), + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing&version=v1"), + invocation, + false, + new Holder<>()); + + int count = 0; + for (Invoker invoker : invokers) { + String url = invoker.getUrl().toString(); + if (url.contains("env") && !url.contains("gray")) { + count++; + } + } + Assertions.assertEquals(count, result.size()); + } + + @Test + public void testRemoveDuplicatesCondition() { + String rawRule = "configVersion: v3.1\n" + "scope: service\n" + + "key: org.apache.dubbo.samples.CommentService\n" + + "force: false\n" + + "runtime: true\n" + + "enabled: true\n" + + "conditions:\n" + + " - from:\n" + + " match: env=gray\n" + + " to:\n" + + " - match: env!=gray\n" + + " weight: 100\n" + + " - from:\n" + + " match: env=gray\n" + + " to:\n" + + " - match: env!=gray\n" + + " weight: 100"; + ServiceStateRouter router = new ServiceStateRouter<>( + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")); + router.process(new ConfigChangedEvent("com.foo.BarService", "", rawRule, ConfigChangeType.ADDED)); + + RpcInvocation invocation = new RpcInvocation(); + invocation.setMethodName("getComment"); + + BitList> result = router.route( + invokers.clone(), + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation, + false, + new Holder<>()); + + int count = 0; + for (Invoker invoker : invokers) { + String url = invoker.getUrl().toString(); + if (url.contains("env") && !url.contains("gray")) { + count++; + } + } + Assertions.assertEquals(count, result.size()); + } + + @Test + public void testConsequentCondition() { + String rawRule = "configVersion: v3.1\n" + "scope: service\n" + + "key: org.apache.dubbo.samples.CommentService\n" + + "force: false\n" + + "runtime: true\n" + + "enabled: true\n" + + "conditions:\n" + + " - from:\n" + + " match: env=gray\n" + + " to:\n" + + " - match: env!=gray\n" + + " weight: 100\n" + + " - from:\n" + + " match: region=beijing\n" + + " to:\n" + + " - match: region=beijing\n" + + " weight: 100\n" + + " - from:\n" + + " to:\n" + + " - match: host!=127.0.0.1"; + + ServiceStateRouter router = new ServiceStateRouter<>( + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")); + router.process(new ConfigChangedEvent("com.foo.BarService", "", rawRule, ConfigChangeType.ADDED)); + + RpcInvocation invocation = new RpcInvocation(); + invocation.setMethodName("getComment"); + + BitList> result = router.route( + invokers.clone(), + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation, + false, + new Holder<>()); + + int count = 0; + for (Invoker invoker : invokers) { + String url = invoker.getUrl().toString(); + if ((url.contains("env") && !url.contains("gray")) + && url.contains("region=beijing") + && !url.contains("127.0.0.1")) { + count++; + } + } + Assertions.assertEquals(count, result.size()); + } + + @Test + public void testUnMatchCondition() { + String rawRule = "configVersion: v3.1\n" + "scope: service\n" + + "key: org.apache.dubbo.samples.CommentService\n" + + "force: false\n" + + "runtime: true\n" + + "enabled: true\n" + + "conditions:\n" + + " - from:\n" + + " match: env!=gray\n" + + " to:\n" + + " - match: env=gray\n" + + " weight: 100\n" + + " - from:\n" + + " match: region!=beijing\n" + + " to:\n" + + " - match: region=beijing\n" + + " weight: 100\n" + + " - from:\n" + + " to:\n" + + " - match: host!=127.0.0.1"; + + ServiceStateRouter router = new ServiceStateRouter<>( + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")); + router.process(new ConfigChangedEvent("com.foo.BarService", "", rawRule, ConfigChangeType.ADDED)); + + RpcInvocation invocation = new RpcInvocation(); + invocation.setMethodName("getComment"); + + BitList> result = router.route( + invokers.clone(), + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation, + false, + new Holder<>()); + + int count = 0; + for (Invoker invoker : invokers) { + String url = invoker.getUrl().toString(); + if (!url.contains("127.0.0.1")) { + count++; + } + } + Assertions.assertEquals(count, result.size()); + } + + @Test + public void testMatchAndRouteZero() { + String rawRule = "configVersion: v3.1\n" + "scope: service\n" + + "key: org.apache.dubbo.samples.CommentService\n" + + "force: true\n" + + "runtime: true\n" + + "enabled: true\n" + + "conditions:\n" + + " - from:\n" + + " match: env=gray\n" + + " to:\n" + + " - match: env=ErrTag\n" + + " weight: 100\n" + + " - from:\n" + + " match: region!=beijing\n" + + " to:\n" + + " - match: region=beijing\n" + + " weight: 100\n" + + " - from:\n" + + " to:\n" + + " - match: host!=127.0.0.1"; + + ServiceStateRouter router = new ServiceStateRouter<>( + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")); + router.process(new ConfigChangedEvent("com.foo.BarService", "", rawRule, ConfigChangeType.ADDED)); + + RpcInvocation invocation = new RpcInvocation(); + invocation.setMethodName("getComment"); + + BitList> result = router.route( + invokers.clone(), + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation, + false, + new Holder<>()); + Assertions.assertEquals(0, result.size()); + } + + @Test + public void testMatchRouteZeroAndIgnore() { + String rawRule = "configVersion: v3.1\n" + "scope: service\n" + + "key: org.apache.dubbo.samples.CommentService\n" + + "force: false\n" + + "runtime: true\n" + + "enabled: true\n" + + "conditions:\n" + + " - from:\n" + + " match: region=beijing\n" + + " to:\n" + + " - match: region!=beijing\n" + + " weight: 100\n" + + " - from:\n" + + " to:\n" + + " - match: host!=127.0.0.1\n" + + " - from:\n" + + " match: env=gray\n" + + " to:\n" + + " - match: env=ErrTag\n" + + " weight: 100"; + + ServiceStateRouter router = new ServiceStateRouter<>( + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")); + router.process(new ConfigChangedEvent("com.foo.BarService", "", rawRule, ConfigChangeType.ADDED)); + + RpcInvocation invocation = new RpcInvocation(); + invocation.setMethodName("getComment"); + + BitList> result = router.route( + invokers.clone(), + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation, + false, + new Holder<>()); + + int count = 0; + for (Invoker invoker : invokers) { + String url = invoker.getUrl().toString(); + if ((url.contains("region") && !url.contains("beijing") && !url.contains("127.0.0.1"))) { + count++; + } + } + Assertions.assertEquals(count, result.size()); + } + + @Test + public void testTrafficDisabledAndIgnoreConditionRouteForce() { + String rawRule = "configVersion: v3.1\n" + "scope: service\n" + + "key: org.apache.dubbo.samples.CommentService\n" + + "force: false\n" + + "runtime: true\n" + + "enabled: true\n" + + "conditions:\n" + + " - from:\n" + + " match: host=127.0.0.1\n" + + " - from:\n" + + " match: env=gray\n" + + " to:\n" + + " - match: env!=gray\n" + + " weight: 100\n" + + " - to:\n" + + " - match: region!=beijing"; + + ServiceStateRouter router = new ServiceStateRouter<>( + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")); + router.process(new ConfigChangedEvent("com.foo.BarService", "", rawRule, ConfigChangeType.ADDED)); + + RpcInvocation invocation = new RpcInvocation(); + invocation.setMethodName("getComment"); + + BitList> result = router.route( + invokers.clone(), + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation, + false, + new Holder<>()); + + Assertions.assertEquals(0, result.size()); + } + + @Test + public void testMultiplyDestination() { + String rawRule = "configVersion: v3.1\n" + "scope: service\n" + + "key: org.apache.dubbo.samples.CommentService\n" + + "force: false\n" + + "runtime: true\n" + + "enabled: true\n" + + "conditions:\n" + + " - from:\n" + + " match: env=gray\n" + + " to:\n" + + " - match: env!=gray\n" + + " weight: 100\n" + + " - match: env=gray\n" + + " weight: 900\n" + + " - from:\n" + + " match: region=beijing\n" + + " to:\n" + + " - match: region!=beijing\n" + + " weight: 100\n" + + " - match: region=beijing\n" + + " weight: 200"; + + ServiceStateRouter router = new ServiceStateRouter<>( + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")); + router.process(new ConfigChangedEvent("com.foo.BarService", "", rawRule, ConfigChangeType.ADDED)); + + RpcInvocation invocation = new RpcInvocation(); + invocation.setMethodName("getComment"); + + Map actualDistribution = new HashMap<>(); + for (int i = 0; i < 1000; i++) { + BitList> result = router.route( + invokers.clone(), + URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation, + false, + new Holder<>()); + + actualDistribution.put(result.size(), actualDistribution.getOrDefault(result.size(), 0) + 1); + } + System.out.println("actualDistribution = " + actualDistribution); + int sum = 0; + for (Map.Entry entry : actualDistribution.entrySet()) { + sum += entry.getValue(); + } + assertEquals(actualDistribution.size(), 4); // 8 6 4 2 + Assertions.assertNotNull(actualDistribution.get(8)); + Assertions.assertNotNull(actualDistribution.get(6)); + Assertions.assertNotNull(actualDistribution.get(4)); + Assertions.assertNotNull(actualDistribution.get(2)); + assertEquals(sum, 1000); + } +}