Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unregister when client destroyed(referenceconfig#destroy) #3295

Merged
merged 10 commits into from
Jan 23, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
Expand Down Expand Up @@ -90,6 +90,8 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify

private volatile URL overrideDirectoryUrl; // Initialization at construction time, assertion not null, and always assign non null value

private volatile URL registeredConsumerUrl;

/**
* override rules
* Priority: override>-D>consumer>provider
Expand Down Expand Up @@ -158,6 +160,15 @@ public void destroy() {
if (isDestroyed()) {
return;
}

// unregister.
try {
if (getRegisteredConsumerUrl() != null && registry != null && registry.isAvailable()) {
registry.unregister(getRegisteredConsumerUrl());
}
} catch (Throwable t) {
logger.warn("unexpected error when unregister service " + serviceKey + "from registry" + registry.getUrl(), t);
}
// unsubscribe.
try {
if (getConsumerUrl() != null && registry != null && registry.isAvailable()) {
Expand Down Expand Up @@ -565,6 +576,14 @@ public URL getUrl() {
return this.overrideDirectoryUrl;
}

public URL getRegisteredConsumerUrl() {
return registeredConsumerUrl;
}

public void setRegisteredConsumerUrl(URL registeredConsumerUrl) {
this.registeredConsumerUrl = registeredConsumerUrl;
}

@Override
public boolean isAvailable() {
if (isDestroyed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
registry.register(getRegisteredConsumerUrl(subscribeUrl, url));
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
Expand All @@ -383,7 +384,7 @@ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type
return invoker;
}

private URL getRegisteredConsumerUrl(final URL consumerUrl, URL registryUrl) {
public URL getRegisteredConsumerUrl(final URL consumerUrl, URL registryUrl) {
if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
return consumerUrl.addParameters(CATEGORY_KEY, CONSUMERS_CATEGORY,
CHECK_KEY, String.valueOf(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ private synchronized void initConnectStatusCheckCommand() {
@Override
public void run() {
try {
if (cancelFutureIfOffline()) return;

if (!isConnected()) {
connect();
} else {
Expand All @@ -173,7 +175,29 @@ public void run() {
}
}
}

private boolean cancelFutureIfOffline() {
/**
* If the provider service is detected offline,
* the client should not attempt to connect again.
*
* issue: https://github.com/apache/incubator-dubbo/issues/3158
*/
if(isClosed()) {
ScheduledFuture<?> future = reconnectExecutorFuture;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you can use the ExecutorUtil#cancelSurtureFuture

if(future != null && !future.isCancelled()){
/**
* Client has been destroyed and
* scheduled task should be cancelled.
*/
future.cancel(true);
}
return true;
}
return false;
}
};

reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
Expand Down Expand Up @@ -345,14 +369,14 @@ public void reconnect() throws RemotingException {
@Override
public void close() {
try {
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
super.close();
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
Expand Down