diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java index f87b25e60db..ed35be62789 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java @@ -543,11 +543,7 @@ private void createInvokerForLocal(Map referenceParameters) { URL url = new ServiceConfigURL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName(), referenceParameters); url = url.setScopeModel(getScopeModel()); url = url.setServiceModel(consumerModel); - Invoker withFilter = protocolSPI.refer(interfaceClass, url); - // Local Invoke ( Support Cluster Filter / Filter ) - List> invokers = new ArrayList<>(); - invokers.add(withFilter); - invoker = Cluster.getCluster(url.getScopeModel(), Cluster.DEFAULT).join(new StaticDirectory(url, invokers), true); + invoker = protocolSPI.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using in jvm service " + interfaceClass.getName()); diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ReferenceConfigTest.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ReferenceConfigTest.java index 7d348836290..41943fc5872 100644 --- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ReferenceConfigTest.java +++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ReferenceConfigTest.java @@ -477,10 +477,9 @@ public void testCreateInvokerForLocalRefer() { .initialize(); referenceConfig.init(); - Assertions.assertTrue(referenceConfig.getInvoker() instanceof MockClusterInvoker); - Invoker withFilter = ((MockClusterInvoker) referenceConfig.getInvoker()).getDirectory().getAllInvokers().get(0); - Assertions.assertTrue(withFilter instanceof ListenerInvokerWrapper); - Assertions.assertTrue(((ListenerInvokerWrapper) withFilter).getInvoker() instanceof InjvmInvoker); + Invoker withFilter = ((ListenerInvokerWrapper) referenceConfig.getInvoker()).getInvoker(); + withFilter = ((MockClusterInvoker) withFilter).getDirectory().getAllInvokers().get(0); + Assertions.assertTrue(withFilter instanceof InjvmInvoker); URL url = withFilter.getUrl(); Assertions.assertEquals("application1", url.getParameter("application")); Assertions.assertEquals("value1", url.getParameter("key1")); diff --git a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java index 7a24a407794..a28e8ad0430 100644 --- a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java +++ b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java @@ -263,7 +263,7 @@ public List getEffectReferenceRegistryURLs() { protected static class MultipleNotifyListenerWrapper implements NotifyListener { - Map registryMap = new ConcurrentHashMap(4); + Map registryMap = new ConcurrentHashMap<>(4); NotifyListener sourceNotifyListener; public MultipleNotifyListenerWrapper(NotifyListener sourceNotifyListener) { diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java index c06a815fdb6..71587e6dd59 100644 --- a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java +++ b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java @@ -42,7 +42,6 @@ import java.lang.reflect.Type; import java.util.HashMap; -import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -63,7 +62,7 @@ public class InjvmInvoker extends AbstractInvoker { private final String key; - private final Map> exporterMap; + private final Exporter exporter; private final ExecutorRepository executorRepository; @@ -71,10 +70,10 @@ public class InjvmInvoker extends AbstractInvoker { private final boolean shouldIgnoreSameModule; - InjvmInvoker(Class type, URL url, String key, Map> exporterMap) { + InjvmInvoker(Class type, URL url, String key, Exporter exporter) { super(type, url); this.key = key; - this.exporterMap = exporterMap; + this.exporter = exporter; this.executorRepository = url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); this.paramDeepCopyUtil = url.getOrDefaultFrameworkModel().getExtensionLoader(ParamDeepCopyUtil.class) .getExtension(url.getParameter(CommonConstants.INJVM_COPY_UTIL_KEY, DefaultParamDeepCopyUtil.NAME)); @@ -83,7 +82,6 @@ public class InjvmInvoker extends AbstractInvoker { @Override public boolean isAvailable() { - InjvmExporter exporter = (InjvmExporter) exporterMap.get(key); if (exporter == null) { return false; } else { @@ -93,7 +91,6 @@ public boolean isAvailable() { @Override public Result doInvoke(Invocation invocation) throws Throwable { - Exporter exporter = InjvmProtocol.getExporter(exporterMap, getUrl()); if (exporter == null) { throw new RpcException("Service [" + key + "] not found."); } diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java index 75a51f37be2..fdc9e101f62 100644 --- a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java +++ b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java @@ -18,19 +18,29 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.common.utils.UrlUtils; import org.apache.dubbo.rpc.Exporter; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.cluster.Cluster; +import org.apache.dubbo.rpc.cluster.ClusterInvoker; +import org.apache.dubbo.rpc.cluster.directory.StaticDirectory; +import org.apache.dubbo.rpc.cluster.support.MergeableCluster; import org.apache.dubbo.rpc.model.ScopeModel; import org.apache.dubbo.rpc.protocol.AbstractProtocol; import org.apache.dubbo.rpc.support.ProtocolUtils; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import static org.apache.dubbo.common.constants.CommonConstants.BROADCAST_CLUSTER; import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; import static org.apache.dubbo.rpc.Constants.GENERIC_KEY; import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL; import static org.apache.dubbo.rpc.Constants.SCOPE_KEY; @@ -69,7 +79,7 @@ static Exporter getExporter(Map> map, URL key) { if (result == null) { return null; } else if (ProtocolUtils.isGeneric( - result.getInvoker().getUrl().getParameter(GENERIC_KEY))) { + result.getInvoker().getUrl().getParameter(GENERIC_KEY))) { return null; } else { return result; @@ -88,7 +98,15 @@ public Exporter export(Invoker invoker) throws RpcException { @Override public Invoker protocolBindingRefer(Class serviceType, URL url) throws RpcException { - return new InjvmInvoker(serviceType, url, url.getServiceKey(), exporterMap); + // group="a,b" or group="*" + String group = url.getParameter(GROUP_KEY); + if (StringUtils.isNotEmpty(group)) { + if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { + return doCreateInvoker(url, Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), serviceType); + } + } + Cluster cluster = Cluster.getCluster(url.getScopeModel(), url.getParameter(CLUSTER_KEY)); + return doCreateInvoker(url, cluster, serviceType); } public boolean isInjvmRefer(URL url) { @@ -116,4 +134,34 @@ public boolean isInjvmRefer(URL url) { return false; } } + + @SuppressWarnings({"unchecked", "rawtypes"}) + protected ClusterInvoker doCreateInvoker(URL url, Cluster cluster, Class type) { + StaticDirectory directory = new StaticDirectory(url, getInvokers(exporterMap, url, type)); + return (ClusterInvoker) cluster.join(directory, true); + } + + private List> getInvokers(Map> map, URL url, Class type) { + List> result = new ArrayList<>(); + + if (!url.getServiceKey().contains("*")) { + Exporter exporter = map.get(url.getServiceKey()); + InjvmInvoker invoker = new InjvmInvoker<>(type, url, url.getServiceKey(), exporter); + result.add(invoker); + } else { + if (CollectionUtils.isNotEmptyMap(map)) { + for (Exporter exporter : map.values()) { + if (UrlUtils.isServiceKeyMatch(url, exporter.getInvoker().getUrl())) { + URL providerUrl = exporter.getInvoker().getUrl(); + URL consumerUrl = url.addParameter(GROUP_KEY, providerUrl.getGroup()) + .addParameter(VERSION_KEY, providerUrl.getVersion()); + InjvmInvoker invoker = new InjvmInvoker<>(type, consumerUrl, consumerUrl.getServiceKey(), exporter); + result.add(invoker); + } + } + } + } + + return result; + } } diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello1ServiceImpl.java b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello1ServiceImpl.java new file mode 100644 index 00000000000..ee5b7737e21 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello1ServiceImpl.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.injvm; + + +import java.util.ArrayList; +import java.util.List; + +public class Hello1ServiceImpl implements HelloService { + + @Override + public List hellos() { + List res = new ArrayList<>(); + res.add("consumer-hello-1"); + return res; + } +} diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello2ServiceImpl.java b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello2ServiceImpl.java new file mode 100644 index 00000000000..8937f5096cb --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello2ServiceImpl.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.rpc.protocol.injvm; + + +import java.util.ArrayList; +import java.util.List; + + +public class Hello2ServiceImpl implements HelloService { + + @Override + public List hellos() { + List res = new ArrayList<>(); + res.add("consumer-hello-2"); + return res; + } +} diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/HelloService.java b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/HelloService.java new file mode 100644 index 00000000000..cd6bc45c62d --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/HelloService.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.rpc.protocol.injvm; + +import java.util.List; + +public interface HelloService { + List hellos(); +} diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java index 4e57d054b45..d7dd373591d 100644 --- a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java +++ b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java @@ -29,18 +29,16 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.*; import static org.apache.dubbo.rpc.Constants.ASYNC_KEY; import static org.apache.dubbo.rpc.Constants.GENERIC_KEY; import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL; import static org.apache.dubbo.rpc.Constants.SCOPE_KEY; import static org.apache.dubbo.rpc.Constants.SCOPE_LOCAL; import static org.apache.dubbo.rpc.Constants.SCOPE_REMOTE; +import static org.apache.dubbo.rpc.Constants.MERGER_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -76,7 +74,7 @@ public void testLocalProtocol() throws Exception { assertEquals(service.getSize(new String[]{"", "", ""}), 3); service.invoke("injvm://127.0.0.1/TestService", "invoke"); - InjvmInvoker injvmInvoker = new InjvmInvoker<>(DemoService.class, URL.valueOf("injvm://127.0.0.1/TestService"), null, new HashMap<>()); + InjvmInvoker injvmInvoker = new InjvmInvoker<>(DemoService.class, URL.valueOf("injvm://127.0.0.1/TestService"), null, null); assertFalse(injvmInvoker.isAvailable()); } @@ -137,4 +135,36 @@ public void testLocalProtocolAsync() throws Exception { assertNull(service.getAsyncResult()); } + @Test + public void testLocalProtocolForMergeResult() throws Exception { + HelloService helloService1 = new Hello1ServiceImpl(); + URL url = URL.valueOf("injvm://127.0.0.1/HelloService") + .addParameter(INTERFACE_KEY, HelloService.class.getName()) + .addParameter(APPLICATION_KEY, "consumer") + .addParameter(GROUP_KEY, "g1"); + Invoker invoker1 = proxy.getInvoker(helloService1, HelloService.class, url); + assertTrue(invoker1.isAvailable()); + Exporter exporter1 = protocol.export(invoker1); + exporters.add(exporter1); + + URL url2 = URL.valueOf("injvm://127.0.0.1/HelloService") + .addParameter(INTERFACE_KEY, HelloService.class.getName()) + .addParameter(APPLICATION_KEY, "consumer") + .addParameter(GROUP_KEY, "g2"); + HelloService helloService2 = new Hello2ServiceImpl(); + Invoker invoker2 = proxy.getInvoker(helloService2, HelloService.class, url2); + assertTrue(invoker2.isAvailable()); + Exporter exporter2 = protocol.export(invoker2); + exporters.add(exporter2); + + + URL referUrl = URL.valueOf("injvm://127.0.0.1/HelloService") + .addParameter(INTERFACE_KEY, HelloService.class.getName()) + .addParameter(APPLICATION_KEY, "consumer") + .addParameter(GROUP_KEY, "*") + .addParameter(MERGER_KEY, "list"); + List list = proxy.getProxy(protocol.refer(HelloService.class, referUrl)).hellos(); + assertEquals(2, list.size()); + } + }