Skip to content

Commit

Permalink
[3.0] Multi Instances Related Refactor (#8737)
Browse files Browse the repository at this point in the history
* Refactor some ExtensionLoader usage

* Fix ut

* Refactor some ExtensionLoader usage

* Introduce Initializer for ScopeModel

* Fix ut

* Fix ut

* Add TCCL Changer

* remove some default model usage

* remove some default model usage

* Fix NPE

* Remove Hessian2 TL

* Fix NPE

* Fix NPE

* Fix Shade

* Fix DubboBootstrap init

* Fix Metadata Serialize

* Fix not support multi classloader

* Add Multi ClassLoader Test case

* Fix UT

* add ant

* Fix UT

* Fix Merge Directory
  • Loading branch information
AlbumenJ authored Sep 10, 2021
1 parent 1b100f8 commit 1dfb114
Show file tree
Hide file tree
Showing 116 changed files with 1,152 additions and 478 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package org.apache.dubbo.rpc.cluster;

import org.apache.dubbo.common.extension.Adaptive;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;

/**
* Cluster. (SPI, Singleton, ThreadSafe)
Expand All @@ -46,14 +47,14 @@ public interface Cluster {
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;

static Cluster getCluster(String name) {
return getCluster(name, true);
static Cluster getCluster(ScopeModel scopeModel, String name) {
return getCluster(scopeModel, name, true);
}

static Cluster getCluster(String name, boolean wrap) {
static Cluster getCluster(ScopeModel scopeModel, String name, boolean wrap) {
if (StringUtils.isEmpty(name)) {
name = Cluster.DEFAULT;
}
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap);
return ScopeModelUtil.getApplicationModel(scopeModel).getExtensionLoader(Cluster.class).getExtension(name, wrap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster;

import org.apache.dubbo.common.beans.factory.ScopeBeanFactory;
import org.apache.dubbo.rpc.cluster.merger.MergerFactory;
import org.apache.dubbo.rpc.cluster.support.ClusterUtils;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ModuleModel;
import org.apache.dubbo.rpc.model.ScopeModelInitializer;

public class ClusterScopeModelInitializer implements ScopeModelInitializer {
@Override
public void initializeFrameworkModel(FrameworkModel frameworkModel) {

}

@Override
public void initializeApplicationModel(ApplicationModel applicationModel) {
ScopeBeanFactory beanFactory = applicationModel.getBeanFactory();
beanFactory.registerBean(MergerFactory.class);
beanFactory.registerBean(ClusterUtils.class);
}

@Override
public void initializeModuleModel(ModuleModel moduleModel) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.dubbo.rpc.cluster;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.CollectionUtils;

import java.util.ArrayList;
Expand Down Expand Up @@ -74,7 +73,7 @@ static Optional<List<Configurator>> toConfigurators(List<URL> urls) {
return Optional.empty();
}

ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
ConfiguratorFactory configuratorFactory = urls.get(0).getOrDefaultApplicationModel().getExtensionLoader(ConfiguratorFactory.class)
.getAdaptiveExtension();

List<Configurator> configurators = new ArrayList<>(urls.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
Expand Down Expand Up @@ -71,8 +70,7 @@ public class RouterChain<T> {

private List<StateRouter> builtinStateRouters = Collections.emptyList();
private List<StateRouter> stateRouters = Collections.emptyList();
private final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
.getDefaultExtension();
private final ExecutorRepository executorRepository;

protected URL url;

Expand All @@ -90,8 +88,10 @@ public static <T> RouterChain<T> buildChain(URL url) {
}

private RouterChain(URL url) {
executorRepository = url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class)
.getDefaultExtension();
loopPool = executorRepository.nextExecutorExecutor();
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
List<RouterFactory> extensionFactories = url.getOrDefaultApplicationModel().getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, ROUTER_KEY);

List<Router> routers = extensionFactories.stream()
Expand All @@ -101,8 +101,8 @@ private RouterChain(URL url) {

initWithRouters(routers);

List<StateRouterFactory> extensionStateRouterFactories = ExtensionLoader.getExtensionLoader(
StateRouterFactory.class)
List<StateRouterFactory> extensionStateRouterFactories = url.getOrDefaultApplicationModel()
.getExtensionLoader(StateRouterFactory.class)
.getActivateExtension(url, STATE_ROUTER_KEY);

List<StateRouter> stateRouters = extensionStateRouterFactories.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.dubbo.rpc.cluster.Router;
import org.apache.dubbo.rpc.cluster.RouterChain;
import org.apache.dubbo.rpc.cluster.support.ClusterUtils;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -84,7 +85,8 @@ public AbstractDirectory(URL url, RouterChain<T> routerChain, boolean isUrlFromR
}

// remove some local only parameters
this.queryMap = ClusterUtils.mergeLocalParams(queryMap);
ApplicationModel applicationModel = url.getOrDefaultApplicationModel();
this.queryMap = applicationModel.getBeanFactory().getBean(ClusterUtils.class).mergeLocalParams(queryMap);

if (consumerUrl == null) {
String host = StringUtils.isNotEmpty(queryMap.get("register.ip")) ? queryMap.get("register.ip") : this.url.getHost();
Expand All @@ -98,7 +100,7 @@ public AbstractDirectory(URL url, RouterChain<T> routerChain, boolean isUrlFromR
.setPath(path == null ? queryMap.get(INTERFACE_KEY) : path);
if (isUrlFromRegistry) {
// reserve parameters if url is already a consumer url
consumerUrlFrom = consumerUrlFrom.clearParameters();
consumerUrlFrom = consumerUrlFrom.clearParameters().setServiceModel(url.getServiceModel()).setScopeModel(url.getScopeModel());
}
this.consumerUrl = consumerUrlFrom.addParameters(queryMap).removeAttribute(MONITOR_KEY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.Map;
import java.util.Set;
Expand All @@ -49,6 +50,12 @@
@Activate(group = CONSUMER, order = -10000)
public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Listener {

private ApplicationModel applicationModel;

public ConsumerContextFilter(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext.getServiceContext()
Expand All @@ -61,7 +68,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
((RpcInvocation) invocation).setInvoker(invoker);
}

ExtensionLoader<PenetrateAttachmentSelector> selectorExtensionLoader = ExtensionLoader.getExtensionLoader(PenetrateAttachmentSelector.class);
ExtensionLoader<PenetrateAttachmentSelector> selectorExtensionLoader = applicationModel.getExtensionLoader(PenetrateAttachmentSelector.class);
Set<String> supportedSelectors = selectorExtensionLoader.getSupportedExtensions();
if (CollectionUtils.isNotEmpty(supportedSelectors)) {
for (String supportedSelector : supportedSelectors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.ZoneDetector;
import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
import org.apache.dubbo.rpc.model.ApplicationModel;

import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_FORCE;
Expand All @@ -39,12 +40,18 @@
@Activate(group = CommonConstants.CONSUMER, value = "cluster:zone-aware")
public class ZoneAwareFilter implements ClusterFilter {

private ApplicationModel applicationModel;

public ZoneAwareFilter(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getClientAttachment();
String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE);
String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE);
ExtensionLoader<ZoneDetector> loader = ExtensionLoader.getExtensionLoader(ZoneDetector.class);
ExtensionLoader<ZoneDetector> loader = applicationModel.getExtensionLoader(ZoneDetector.class);
if (StringUtils.isEmpty(zone) && loader.hasExtension("default")) {
ZoneDetector detector = loader.getExtension("default");
zone = detector.getZoneOfCurrentRequest(invocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@

package org.apache.dubbo.rpc.cluster.merger;

import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.rpc.cluster.Merger;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class MergerFactory {
public class MergerFactory implements ScopeModelAware {

private static final ConcurrentMap<Class<?>, Merger<?>> MERGER_CACHE =
new ConcurrentHashMap<Class<?>, Merger<?>>();
private ConcurrentMap<Class<?>, Merger<?>> MERGER_CACHE = new ConcurrentHashMap<Class<?>, Merger<?>>();
private ScopeModel scopeModel;

@Override
public void setScopeModel(ScopeModel scopeModel) {
this.scopeModel = scopeModel;
}

/**
* Find the merger according to the returnType class, the merger will
Expand All @@ -38,7 +44,7 @@ public class MergerFactory {
* @return the merger which merges an array of returnType into one, return null if not exist
* @throws IllegalArgumentException if returnType is null
*/
public static <T> Merger<T> getMerger(Class<T> returnType) {
public <T> Merger<T> getMerger(Class<T> returnType) {
if (returnType == null) {
throw new IllegalArgumentException("returnType is null");
}
Expand All @@ -64,11 +70,11 @@ public static <T> Merger<T> getMerger(Class<T> returnType) {
return result;
}

static void loadMergers() {
Set<String> names = ExtensionLoader.getExtensionLoader(Merger.class)
private void loadMergers() {
Set<String> names = scopeModel.getExtensionLoader(Merger.class)
.getSupportedExtensions();
for (String name : names) {
Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name);
Merger m = scopeModel.getExtensionLoader(Merger.class).getExtension(name);
MERGER_CACHE.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.dubbo.rpc.cluster.router;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.cluster.Router;
import org.apache.dubbo.rpc.cluster.governance.GovernanceRuleRepository;

Expand All @@ -29,7 +28,7 @@ public abstract class AbstractRouter implements Router {
private GovernanceRuleRepository ruleRepository;

public AbstractRouter(URL url) {
this.ruleRepository = ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension();
this.ruleRepository = url.getOrDefaultApplicationModel().getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension();
this.url = url;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@
*/
package org.apache.dubbo.rpc.cluster.router.state;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.RouterChain;
import org.apache.dubbo.rpc.cluster.governance.GovernanceRuleRepository;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/***
* The abstract class of StateRoute.
* @since 3.0
Expand All @@ -42,7 +41,7 @@ public abstract class AbstractStateRouter implements StateRouter {
protected GovernanceRuleRepository ruleRepository;

public AbstractStateRouter(URL url, RouterChain chain) {
this.ruleRepository = ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension();
this.ruleRepository = url.getOrDefaultApplicationModel().getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension();
this.chain = chain;
this.url = url;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
Expand All @@ -32,6 +31,8 @@
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;
import org.apache.dubbo.rpc.support.RpcUtils;

import java.util.ArrayList;
Expand Down Expand Up @@ -338,14 +339,15 @@ protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
* @return LoadBalance instance. if not need init, return null.
*/
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
ApplicationModel applicationModel = ScopeModelUtil.getApplicationModel(invocation.getModuleModel());
if (CollectionUtils.isNotEmpty(invokers)) {
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(
return applicationModel.getExtensionLoader(LoadBalance.class).getExtension(
invokers.get(0).getUrl().getMethodParameter(
RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE
)
);
} else {
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
return applicationModel.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
}
}

Expand Down
Loading

0 comments on commit 1dfb114

Please sign in to comment.