diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java index 11e7e82fa40..3583b15f1cc 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java @@ -20,12 +20,16 @@ import org.apache.dubbo.common.Version; import org.apache.dubbo.common.config.Configuration; import org.apache.dubbo.common.config.ConfigurationUtils; +import org.apache.dubbo.common.constants.LoggerCodeConstants; import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; import org.apache.dubbo.common.utils.ConcurrentHashSet; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.metrics.event.MetricsEventBus; +import org.apache.dubbo.metrics.model.key.MetricsKey; +import org.apache.dubbo.metrics.registry.event.RegistryEvent; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcContext; @@ -33,6 +37,7 @@ import org.apache.dubbo.rpc.cluster.Directory; import org.apache.dubbo.rpc.cluster.Router; import org.apache.dubbo.rpc.cluster.RouterChain; +import org.apache.dubbo.rpc.cluster.SingleRouterChain; import org.apache.dubbo.rpc.cluster.router.state.BitList; import org.apache.dubbo.rpc.cluster.support.ClusterUtils; import org.apache.dubbo.rpc.model.ApplicationModel; @@ -40,6 +45,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -50,6 +56,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_RECONNECT_TASK_PERIOD; @@ -125,6 +132,8 @@ public abstract class AbstractDirectory implements Directory { */ private final int reconnectTaskPeriod; + private ApplicationModel applicationModel; + public AbstractDirectory(URL url) { this(url, null, false); } @@ -150,7 +159,7 @@ public AbstractDirectory(URL url, RouterChain routerChain, boolean isUrlFromR } // remove some local only parameters - ApplicationModel applicationModel = url.getOrDefaultApplicationModel(); + applicationModel = url.getOrDefaultApplicationModel(); this.queryMap = applicationModel.getBeanFactory().getBean(ClusterUtils.class).mergeLocalParams(queryMap); if (consumerUrl == null) { @@ -176,36 +185,58 @@ public AbstractDirectory(URL url, RouterChain routerChain, boolean isUrlFromR this.reconnectTaskTryCount = configuration.getInt(RECONNECT_TASK_TRY_COUNT, DEFAULT_RECONNECT_TASK_TRY_COUNT); this.reconnectTaskPeriod = configuration.getInt(RECONNECT_TASK_PERIOD, DEFAULT_RECONNECT_TASK_PERIOD); setRouterChain(routerChain); + } @Override public List> list(Invocation invocation) throws RpcException { if (destroyed) { - throw new RpcException("Directory of type " + this.getClass().getSimpleName() + " already destroyed for service " + getConsumerUrl().getServiceKey() + " from registry " + getUrl()); + throw new RpcException("Directory of type " + this.getClass().getSimpleName() + " already destroyed for service " + getConsumerUrl().getServiceKey() + " from registry " + getUrl()); } BitList> availableInvokers; - // use clone to avoid being modified at doList(). - if (invokersInitialized) { - availableInvokers = validInvokers.clone(); - } else { - availableInvokers = invokers.clone(); - } + SingleRouterChain singleChain = null; + try { + try { + if (routerChain != null) { + routerChain.getLock().readLock().lock(); + } + // use clone to avoid being modified at doList(). + if (invokersInitialized) { + availableInvokers = validInvokers.clone(); + } else { + availableInvokers = invokers.clone(); + } - List> routedResult = doList(availableInvokers, invocation); - if (routedResult.isEmpty()) { - // 2-2 - No provider available. - - logger.warn(CLUSTER_NO_VALID_PROVIDER, "provider server or registry center crashed", "", - "No provider available after connectivity filter for the service " + getConsumerUrl().getServiceKey() - + " All validInvokers' size: " + validInvokers.size() - + " All routed invokers' size: " + routedResult.size() - + " All invokers' size: " + invokers.size() - + " from registry " + getUrl().getAddress() - + " on the consumer " + NetUtils.getLocalHost() - + " using the dubbo version " + Version.getVersion() + "."); + if (routerChain != null) { + singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation); + singleChain.getLock().readLock().lock(); + } + } finally { + if (routerChain != null) { + routerChain.getLock().readLock().unlock(); + } + } + + List> routedResult = doList(singleChain, availableInvokers, invocation); + if (routedResult.isEmpty()) { + // 2-2 - No provider available. + + logger.warn(CLUSTER_NO_VALID_PROVIDER, "provider server or registry center crashed", "", + "No provider available after connectivity filter for the service " + getConsumerUrl().getServiceKey() + + " All validInvokers' size: " + validInvokers.size() + + " All routed invokers' size: " + routedResult.size() + + " All invokers' size: " + invokers.size() + + " from registry " + getUrl().getAddress() + + " on the consumer " + NetUtils.getLocalHost() + + " using the dubbo version " + Version.getVersion() + "."); + } + return Collections.unmodifiableList(routedResult); + } finally { + if (singleChain != null) { + singleChain.getLock().readLock().unlock(); + } } - return Collections.unmodifiableList(routedResult); } @Override @@ -260,6 +291,9 @@ public void addInvalidateInvoker(Invoker invoker) { invokersToReconnect.add(invoker); // 3. try start check connectivity task checkConnectivity(); + + logger.info("The invoker " + invoker.getUrl() + " has been added to invalidate list due to connectivity problem. " + + "Will trying to reconnect to it in the background."); } } @@ -318,6 +352,7 @@ public void checkConnectivity() { } }, reconnectTaskPeriod, TimeUnit.MILLISECONDS); } + MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary())); } /** @@ -331,6 +366,7 @@ public void refreshInvoker() { if (invokersInitialized) { refreshInvokerInternal(); } + MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary())); } private synchronized void refreshInvokerInternal() { @@ -359,6 +395,7 @@ public void addDisabledInvoker(Invoker invoker) { removeValidInvoker(invoker); logger.info("Disable service address: " + invoker.getUrl() + "."); } + MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary())); } @Override @@ -371,6 +408,22 @@ public void recoverDisabledInvoker(Invoker invoker) { } } + MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary())); + } + + protected final void refreshRouter(BitList> newlyInvokers, Runnable switchAction) { + try { + routerChain.setInvokers(newlyInvokers.clone(), switchAction); + } catch (Throwable t) { + logger.error(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Error occurred when refreshing router chain. " + + "The addresses from notification: " + + newlyInvokers.stream() + .map(Invoker::getUrl) + .map(URL::getAddress) + .collect(Collectors.joining(", ")), t); + + throw t; + } } /** @@ -411,6 +464,8 @@ protected void setInvokers(BitList> invokers) { this.invokers = invokers; refreshInvokerInternal(); this.invokersInitialized = true; + + MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary())); } protected void destroyInvokers() { @@ -421,17 +476,84 @@ protected void destroyInvokers() { } private boolean addValidInvoker(Invoker invoker) { + boolean result; synchronized (this.validInvokers) { - return this.validInvokers.add(invoker); + result = this.validInvokers.add(invoker); } + MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary())); + return result; } private boolean removeValidInvoker(Invoker invoker) { + boolean result; synchronized (this.validInvokers) { - return this.validInvokers.remove(invoker); + result = this.validInvokers.remove(invoker); } + MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary())); + return result; + } + + protected abstract List> doList(SingleRouterChain singleRouterChain, + BitList> invokers, Invocation invocation) throws RpcException; + + protected String joinValidInvokerAddresses() { + BitList> validInvokers = getValidInvokers().clone(); + if (validInvokers.isEmpty()) { + return "empty"; + } + return validInvokers.stream() + .limit(5) + .map(Invoker::getUrl) + .map(URL::getAddress) + .collect(Collectors.joining(",")); + } + + private Map> getSummary() { + Map> summaryMap = new HashMap<>(); + + summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_VALID, groupByServiceKey(getValidInvokers())); + summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_DISABLE, groupByServiceKey(getDisabledInvokers())); + summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_TO_RECONNECT, groupByServiceKey(getInvokersToReconnect())); + summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_ALL, groupByServiceKey(getInvokers())); + return summaryMap; } - protected abstract List> doList(BitList> invokers, Invocation invocation) throws RpcException; + private Map groupByServiceKey(Collection> invokers) { + Map serviceNumMap = new HashMap<>(); + for (Invoker invoker : invokers) { + if (invoker.getClass().getSimpleName().contains("Mockito")) { + return serviceNumMap; + } + } + if (invokers.size() > 0) { + serviceNumMap = invokers.stream().filter(invoker -> invoker.getInterface() != null).collect(Collectors.groupingBy(invoker -> invoker.getInterface().getName(), Collectors.reducing(0, e -> 1, Integer::sum))); + } + + return serviceNumMap; + } + + @Override + public String toString() { + return "Directory(" + + "invokers: " + invokers.size() + "[" + + invokers.stream() + .map(Invoker::getUrl) + .map(URL::getAddress) + .limit(3) + .collect(Collectors.joining(", ")) + "]" + + ", validInvokers: " + validInvokers.size() + "[" + + validInvokers.stream() + .map(Invoker::getUrl) + .map(URL::getAddress) + .limit(3) + .collect(Collectors.joining(", ")) + "]" + + ", invokersToReconnect: " + invokersToReconnect.size() + "[" + + invokersToReconnect.stream() + .map(Invoker::getUrl) + .map(URL::getAddress) + .limit(3) + .collect(Collectors.joining(", ")) + "]" + + ')'; + } }