Skip to content

Commit

Permalink
Fix change
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ committed Jul 6, 2023
1 parent 9c35548 commit 1806e77
Showing 1 changed file with 146 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,32 @@
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.config.Configuration;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.LoggerCodeConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metrics.event.MetricsEventBus;
import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.registry.event.RegistryEvent;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.Router;
import org.apache.dubbo.rpc.cluster.RouterChain;
import org.apache.dubbo.rpc.cluster.SingleRouterChain;
import org.apache.dubbo.rpc.cluster.router.state.BitList;
import org.apache.dubbo.rpc.cluster.support.ClusterUtils;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -50,6 +56,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_RECONNECT_TASK_PERIOD;
Expand Down Expand Up @@ -125,6 +132,8 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
*/
private final int reconnectTaskPeriod;

private ApplicationModel applicationModel;

public AbstractDirectory(URL url) {
this(url, null, false);
}
Expand All @@ -150,7 +159,7 @@ public AbstractDirectory(URL url, RouterChain<T> routerChain, boolean isUrlFromR
}

// remove some local only parameters
ApplicationModel applicationModel = url.getOrDefaultApplicationModel();
applicationModel = url.getOrDefaultApplicationModel();
this.queryMap = applicationModel.getBeanFactory().getBean(ClusterUtils.class).mergeLocalParams(queryMap);

if (consumerUrl == null) {
Expand All @@ -176,36 +185,58 @@ public AbstractDirectory(URL url, RouterChain<T> routerChain, boolean isUrlFromR
this.reconnectTaskTryCount = configuration.getInt(RECONNECT_TASK_TRY_COUNT, DEFAULT_RECONNECT_TASK_TRY_COUNT);
this.reconnectTaskPeriod = configuration.getInt(RECONNECT_TASK_PERIOD, DEFAULT_RECONNECT_TASK_PERIOD);
setRouterChain(routerChain);

}

@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory of type " + this.getClass().getSimpleName() + " already destroyed for service " + getConsumerUrl().getServiceKey() + " from registry " + getUrl());
throw new RpcException("Directory of type " + this.getClass().getSimpleName() + " already destroyed for service " + getConsumerUrl().getServiceKey() + " from registry " + getUrl());
}

BitList<Invoker<T>> availableInvokers;
// use clone to avoid being modified at doList().
if (invokersInitialized) {
availableInvokers = validInvokers.clone();
} else {
availableInvokers = invokers.clone();
}
SingleRouterChain<T> singleChain = null;
try {
try {
if (routerChain != null) {
routerChain.getLock().readLock().lock();
}
// use clone to avoid being modified at doList().
if (invokersInitialized) {
availableInvokers = validInvokers.clone();
} else {
availableInvokers = invokers.clone();
}

List<Invoker<T>> routedResult = doList(availableInvokers, invocation);
if (routedResult.isEmpty()) {
// 2-2 - No provider available.

logger.warn(CLUSTER_NO_VALID_PROVIDER, "provider server or registry center crashed", "",
"No provider available after connectivity filter for the service " + getConsumerUrl().getServiceKey()
+ " All validInvokers' size: " + validInvokers.size()
+ " All routed invokers' size: " + routedResult.size()
+ " All invokers' size: " + invokers.size()
+ " from registry " + getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ".");
if (routerChain != null) {
singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation);
singleChain.getLock().readLock().lock();
}
} finally {
if (routerChain != null) {
routerChain.getLock().readLock().unlock();
}
}

List<Invoker<T>> routedResult = doList(singleChain, availableInvokers, invocation);
if (routedResult.isEmpty()) {
// 2-2 - No provider available.

logger.warn(CLUSTER_NO_VALID_PROVIDER, "provider server or registry center crashed", "",
"No provider available after connectivity filter for the service " + getConsumerUrl().getServiceKey()
+ " All validInvokers' size: " + validInvokers.size()
+ " All routed invokers' size: " + routedResult.size()
+ " All invokers' size: " + invokers.size()
+ " from registry " + getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ".");
}
return Collections.unmodifiableList(routedResult);
} finally {
if (singleChain != null) {
singleChain.getLock().readLock().unlock();
}
}
return Collections.unmodifiableList(routedResult);
}

@Override
Expand Down Expand Up @@ -260,6 +291,9 @@ public void addInvalidateInvoker(Invoker<T> invoker) {
invokersToReconnect.add(invoker);
// 3. try start check connectivity task
checkConnectivity();

logger.info("The invoker " + invoker.getUrl() + " has been added to invalidate list due to connectivity problem. " +
"Will trying to reconnect to it in the background.");
}
}

Expand Down Expand Up @@ -318,6 +352,7 @@ public void checkConnectivity() {
}
}, reconnectTaskPeriod, TimeUnit.MILLISECONDS);
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
}

/**
Expand All @@ -331,6 +366,7 @@ public void refreshInvoker() {
if (invokersInitialized) {
refreshInvokerInternal();
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
}

private synchronized void refreshInvokerInternal() {
Expand Down Expand Up @@ -359,6 +395,7 @@ public void addDisabledInvoker(Invoker<T> invoker) {
removeValidInvoker(invoker);
logger.info("Disable service address: " + invoker.getUrl() + ".");
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
}

@Override
Expand All @@ -371,6 +408,22 @@ public void recoverDisabledInvoker(Invoker<T> invoker) {

}
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
}

protected final void refreshRouter(BitList<Invoker<T>> newlyInvokers, Runnable switchAction) {
try {
routerChain.setInvokers(newlyInvokers.clone(), switchAction);
} catch (Throwable t) {
logger.error(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Error occurred when refreshing router chain. " +
"The addresses from notification: " +
newlyInvokers.stream()
.map(Invoker::getUrl)
.map(URL::getAddress)
.collect(Collectors.joining(", ")), t);

throw t;
}
}

/**
Expand Down Expand Up @@ -411,6 +464,8 @@ protected void setInvokers(BitList<Invoker<T>> invokers) {
this.invokers = invokers;
refreshInvokerInternal();
this.invokersInitialized = true;

MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
}

protected void destroyInvokers() {
Expand All @@ -421,17 +476,84 @@ protected void destroyInvokers() {
}

private boolean addValidInvoker(Invoker<T> invoker) {
boolean result;
synchronized (this.validInvokers) {
return this.validInvokers.add(invoker);
result = this.validInvokers.add(invoker);
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
return result;
}

private boolean removeValidInvoker(Invoker<T> invoker) {
boolean result;
synchronized (this.validInvokers) {
return this.validInvokers.remove(invoker);
result = this.validInvokers.remove(invoker);
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
return result;
}

protected abstract List<Invoker<T>> doList(SingleRouterChain<T> singleRouterChain,
BitList<Invoker<T>> invokers, Invocation invocation) throws RpcException;

protected String joinValidInvokerAddresses() {
BitList<Invoker<T>> validInvokers = getValidInvokers().clone();
if (validInvokers.isEmpty()) {
return "empty";
}
return validInvokers.stream()
.limit(5)
.map(Invoker::getUrl)
.map(URL::getAddress)
.collect(Collectors.joining(","));
}

private Map<MetricsKey, Map<String, Integer>> getSummary() {
Map<MetricsKey, Map<String, Integer>> summaryMap = new HashMap<>();

summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_VALID, groupByServiceKey(getValidInvokers()));
summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_DISABLE, groupByServiceKey(getDisabledInvokers()));
summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_TO_RECONNECT, groupByServiceKey(getInvokersToReconnect()));
summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_ALL, groupByServiceKey(getInvokers()));
return summaryMap;
}

protected abstract List<Invoker<T>> doList(BitList<Invoker<T>> invokers, Invocation invocation) throws RpcException;
private Map<String, Integer> groupByServiceKey(Collection<Invoker<T>> invokers) {

Map<String, Integer> serviceNumMap = new HashMap<>();
for (Invoker<T> invoker : invokers) {
if (invoker.getClass().getSimpleName().contains("Mockito")) {
return serviceNumMap;
}
}
if (invokers.size() > 0) {
serviceNumMap = invokers.stream().filter(invoker -> invoker.getInterface() != null).collect(Collectors.groupingBy(invoker -> invoker.getInterface().getName(), Collectors.reducing(0, e -> 1, Integer::sum)));
}

return serviceNumMap;
}

@Override
public String toString() {
return "Directory(" +
"invokers: " + invokers.size() + "[" +
invokers.stream()
.map(Invoker::getUrl)
.map(URL::getAddress)
.limit(3)
.collect(Collectors.joining(", ")) + "]" +
", validInvokers: " + validInvokers.size() + "[" +
validInvokers.stream()
.map(Invoker::getUrl)
.map(URL::getAddress)
.limit(3)
.collect(Collectors.joining(", ")) + "]" +
", invokersToReconnect: " + invokersToReconnect.size() + "[" +
invokersToReconnect.stream()
.map(Invoker::getUrl)
.map(URL::getAddress)
.limit(3)
.collect(Collectors.joining(", ")) + "]" +
')';
}
}

0 comments on commit 1806e77

Please sign in to comment.