diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml index c09973132473..58177909ae88 100644 --- a/dubbo-bom/pom.xml +++ b/dubbo-bom/pom.xml @@ -138,6 +138,11 @@ dubbo-remoting-http ${project.version} + + org.apache.dubbo + dubbo-remoting-etcd3 + ${project.version} + org.apache.dubbo dubbo-rpc-api @@ -218,6 +223,11 @@ dubbo-registry-redis ${project.version} + + org.apache.dubbo + dubbo-registry-etcd3 + ${project.version} + org.apache.dubbo dubbo-monitor-api diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index 37d2a068e158..6dafe434a96f 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -108,6 +108,7 @@ 2.0 3.0.19.Final 8.5.31 + 0.3.0 1.7.25 1.2 @@ -343,6 +344,21 @@ tomcat-embed-logging-juli ${tomcat_embed_version} + + io.etcd + jetcd-core + ${jetcd_version} + + + io.netty + netty-codec-http2 + + + io.netty + netty-handler-proxy + + + org.slf4j diff --git a/dubbo-registry/dubbo-registry-etcd3/pom.xml b/dubbo-registry/dubbo-registry-etcd3/pom.xml new file mode 100644 index 000000000000..00b0ae53902a --- /dev/null +++ b/dubbo-registry/dubbo-registry-etcd3/pom.xml @@ -0,0 +1,53 @@ + + + + + 4.0.0 + + dubbo-registry + org.apache.dubbo + 2.7.1-SNAPSHOT + + + dubbo-registry-etcd3 + jar + ${project.artifactId} + The etcd3 registry module of Dubbo project + + + + org.apache.dubbo + dubbo-registry-api + ${project.parent.version} + + + org.apache.dubbo + dubbo-common + ${project.parent.version} + + + org.apache.dubbo + dubbo-remoting-etcd3 + ${project.parent.version} + + + + + \ No newline at end of file diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java new file mode 100644 index 000000000000..504d521aaedd --- /dev/null +++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.registry.etcd; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.common.utils.UrlUtils; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.support.FailbackRegistry; +import org.apache.dubbo.remoting.etcd.ChildListener; +import org.apache.dubbo.remoting.etcd.EtcdClient; +import org.apache.dubbo.remoting.etcd.EtcdTransporter; +import org.apache.dubbo.remoting.etcd.StateListener; +import org.apache.dubbo.remoting.etcd.option.Constants; +import org.apache.dubbo.remoting.etcd.option.OptionUtil; +import org.apache.dubbo.rpc.RpcException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + + +/** + * Support for ectd3 registry. + */ +public class EtcdRegistry extends FailbackRegistry { + + private final static Logger logger = LoggerFactory.getLogger(EtcdRegistry.class); + + private final static int DEFAULT_ETCD_PORT = 2379; + + private final static String DEFAULT_ROOT = "dubbo"; + + private final String root; + + private final Set anyServices = new ConcurrentHashSet(); + + private final ConcurrentMap> etcdListeners = new ConcurrentHashMap>(); + private final EtcdClient etcdClient; + private long expirePeriod; + + public EtcdRegistry(URL url, EtcdTransporter etcdTransporter) { + super(url); + if (url.isAnyHost()) { + throw new IllegalStateException("registry address is invalid, actual: '" + url.getHost() + "'"); + } + String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); + if (!group.startsWith(Constants.PATH_SEPARATOR)) { + group = Constants.PATH_SEPARATOR + group; + } + this.root = group; + etcdClient = etcdTransporter.connect(url); + etcdClient.addStateListener(new StateListener() { + public void stateChanged(int state) { + if (state == CONNECTED) { + try { + recover(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + } + }); + } + + protected static String appendDefaultPort(String address) { + if (address != null && address.length() > 0) { + int i = address.indexOf(':'); + if (i < 0) { + return address + ":" + DEFAULT_ETCD_PORT; + } else if (Integer.parseInt(address.substring(i + 1)) == 0) { + return address.substring(0, i + 1) + DEFAULT_ETCD_PORT; + } + } + return address; + } + + @Override + public void doRegister(URL url) { + try { + String path = toUrlPath(url); + if (url.getParameter(Constants.DYNAMIC_KEY, true)) { + etcdClient.createEphemeral(path); + return; + } + etcdClient.create(path); + } catch (Throwable e) { + throw new RpcException("Failed to register " + url + " to etcd " + getUrl() + + ", cause: " + (OptionUtil.isProtocolError(e) + ? "etcd3 registy maybe not supported yet or etcd3 registry not available." + : e.getMessage()), e); + } + } + + @Override + public void doUnregister(URL url) { + try { + String path = toUrlPath(url); + etcdClient.delete(path); + } catch (Throwable e) { + throw new RpcException("Failed to unregister " + url + " to etcd " + getUrl() + ", cause: " + e.getMessage(), e); + } + } + + @Override + public void doSubscribe(URL url, NotifyListener listener) { + try { + if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { + String root = toRootPath(); + + /** + * if we interesting all interfaces, + * we find current or create container for url, put or get only once. + */ + ConcurrentMap listeners = + Optional.ofNullable(etcdListeners.get(url)) + .orElseGet(() -> { + ConcurrentMap container, prev; + prev = etcdListeners.putIfAbsent(url, container = new ConcurrentHashMap<>()); + return prev != null ? prev : container; + }); + + /** + * if we have not interface watcher listener, + * we find current or create listener for current root, put or get only once. + */ + ChildListener interfaceListener = + Optional.ofNullable(listeners.get(listener)) + .orElseGet(() -> { + ChildListener childListener, prev; + prev = listeners.putIfAbsent(listener, childListener = new ChildListener() { + public void childChanged(String parentPath, List currentChildren) { + /** + * because etcd3 not support direct children watch events, + * we should filter not interface events. if we watch /dubbo + * and /dubbo/interface, when we put key-value pair {/dubbo/interface/hello hello}, + * we will got events in watching path /dubbo. + */ + for (String child : currentChildren) { + child = URL.decode(child); + if (!anyServices.contains(child)) { + anyServices.add(child); + /** + * if new interface event arrived, we watching direct children, + * eg: /dubbo/interface, /dubbo/interface and so on. + */ + subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, + Constants.CHECK_KEY, String.valueOf(false)), listener); + } + } + } + }); + return prev != null ? prev : childListener; + }); + + etcdClient.create(root); + /** + * first time, we want pull already interface and then watching direct children, + * eg: /dubbo/interface, /dubbo/interface and so on. + */ + List services = etcdClient.addChildListener(root, interfaceListener); + for (String service : services) { + service = URL.decode(service); + anyServices.add(service); + subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, + Constants.CHECK_KEY, String.valueOf(false)), listener); + } + } else { + List urls = new ArrayList(); + for (String path : toCategoriesPath(url)) { + + /** + * if we interesting special categories (providers, consumers, routers and so on), + * we find current or create container for url, put or get only once. + */ + ConcurrentMap listeners = + Optional.ofNullable(etcdListeners.get(url)) + .orElseGet(() -> { + ConcurrentMap container, prev; + prev = etcdListeners.putIfAbsent(url, + container = new ConcurrentHashMap()); + return prev != null ? prev : container; + }); + + /** + * if we have no category watcher listener, + * we find current or create listener for current category, put or get only once. + */ + ChildListener childListener = + Optional.ofNullable(listeners.get(listener)) + .orElseGet(() -> { + ChildListener watchListener, prev; + prev = listeners.putIfAbsent(listener, watchListener = new ChildListener() { + public void childChanged(String parentPath, List currentChildren) { + EtcdRegistry.this.notify(url, listener, + toUrlsWithEmpty(url, parentPath, currentChildren)); + } + }); + return prev != null ? prev : watchListener; + }); + + etcdClient.create(path); + /** + * first time, we want pull already category and then watching direct children, + * eg: /dubbo/interface/providers, /dubbo/interface/consumers and so on. + */ + List children = etcdClient.addChildListener(path, childListener); + if (children != null) { + final String watchPath = path; + urls.addAll(toUrlsWithEmpty(url, path, children)); + } + } + notify(url, listener, urls); + } + } catch (Throwable e) { + throw new RpcException("Failed to subscribe " + url + " to etcd " + getUrl() + + ", cause: " + (OptionUtil.isProtocolError(e) + ? "etcd3 registy maybe not supported yet or etcd3 registry not available." + : e.getMessage()), e); + } + } + + @Override + public void doUnsubscribe(URL url, NotifyListener listener) { + ConcurrentMap listeners = etcdListeners.get(url); + if (listeners != null) { + ChildListener etcdListener = listeners.get(listener); + if (etcdListener != null) { + // maybe url has many subscribe path + for (String path : toUnsubscribedPath(url)) { + etcdClient.removeChildListener(path, etcdListener); + } + } + } + } + + @Override + public boolean isAvailable() { + return etcdClient.isConnected(); + } + + @Override + public void destroy() { + super.destroy(); + try { + etcdClient.close(); + } catch (Exception e) { + logger.warn("Failed to close etcd client " + getUrl() + ", cause: " + e.getMessage(), e); + } + } + + protected String toRootDir() { + if (root.startsWith(Constants.PATH_SEPARATOR)) { + return root; + } + return Constants.PATH_SEPARATOR + root; + } + + protected String toRootPath() { + return root; + } + + protected String toServicePath(URL url) { + String name = url.getServiceInterface(); + if (Constants.ANY_VALUE.equals(name)) { + return toRootPath(); + } + return toRootDir() + Constants.PATH_SEPARATOR + URL.encode(name); + } + + protected String[] toCategoriesPath(URL url) { + String[] categroies; + if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) { + categroies = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY, + Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY}; + } else { + categroies = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY}); + } + String[] paths = new String[categroies.length]; + for (int i = 0; i < categroies.length; i++) { + paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categroies[i]; + } + return paths; + } + + protected String toCategoryPath(URL url) { + return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); + } + + protected String toUrlPath(URL url) { + return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString()); + } + + protected List toUnsubscribedPath(URL url) { + List categories = new ArrayList<>(); + if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { + String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); + if (!group.startsWith(Constants.PATH_SEPARATOR)) { + group = Constants.PATH_SEPARATOR + group; + } + categories.add(group); + return categories; + } else { + for (String path : toCategoriesPath(url)) { + categories.add(path); + } + } + return categories; + } + + protected List toUrlsWithoutEmpty(URL consumer, List providers) { + List urls = new ArrayList(); + if (providers != null && providers.size() > 0) { + for (String provider : providers) { + provider = URL.decode(provider); + if (provider.contains(Constants.HTTP_SUBFIX_KEY)) { + URL url = URL.valueOf(provider); + if (UrlUtils.isMatch(consumer, url)) { + urls.add(url); + } + } + } + } + return urls; + } + + protected List toUrlsWithEmpty(URL consumer, String path, List providers) { + List urls = toUrlsWithoutEmpty(consumer, providers); + if (urls == null || urls.isEmpty()) { + int i = path.lastIndexOf('/'); + String category = i < 0 ? path : path.substring(i + 1); + URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category); + urls.add(empty); + } + return urls; + } +} diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java new file mode 100644 index 000000000000..187da199090b --- /dev/null +++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistryFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.registry.etcd; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.Registry; +import org.apache.dubbo.registry.support.AbstractRegistryFactory; +import org.apache.dubbo.remoting.etcd.EtcdTransporter; + +public class EtcdRegistryFactory extends AbstractRegistryFactory { + + private EtcdTransporter etcdTransporter; + + @Override + protected Registry createRegistry(URL url) { + return new EtcdRegistry(url, etcdTransporter); + } + + public void setEtcdTransporter(EtcdTransporter etcdTransporter) { + this.etcdTransporter = etcdTransporter; + } +} diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory new file mode 100644 index 000000000000..4a6d09c9f0e3 --- /dev/null +++ b/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory @@ -0,0 +1 @@ +etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory \ No newline at end of file diff --git a/dubbo-registry/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java b/dubbo-registry/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java new file mode 100644 index 000000000000..c23c1d23a0a1 --- /dev/null +++ b/dubbo-registry/dubbo-registry-etcd3/src/test/java/org/apache/dubbo/registry/etcd/EtcdRegistryTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.registry.etcd; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.extension.ExtensionLoader; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.RegistryFactory; +import org.apache.dubbo.registry.support.AbstractRegistryFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +@Disabled +public class EtcdRegistryTest { + + String service = "org.apache.dubbo.internal.test.DemoServie"; + String outerService = "org.apache.dubbo.outer.test.OuterDemoServie"; + URL serviceUrl = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + service + "?methods=test1,test2"); + URL serviceUrl2 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + service + "?methods=test1,test2,test3"); + URL serviceUrl3 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + "/" + outerService + "?methods=test1,test2"); + URL registryUrl = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService"); + URL consumerUrl = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":2018" + "/" + service + "?methods=test1,test2"); + RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension(); + EtcdRegistry registry; + URL subscribe = new URL( + Constants.ADMIN_PROTOCOL, NetUtils.getLocalHost(), 0, "", + Constants.INTERFACE_KEY, Constants.ANY_VALUE, + Constants.GROUP_KEY, Constants.ANY_VALUE, + Constants.VERSION_KEY, Constants.ANY_VALUE, + Constants.CLASSIFIER_KEY, Constants.ANY_VALUE, + Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + + Constants.CONSUMERS_CATEGORY + "," + + Constants.ROUTERS_CATEGORY + "," + + Constants.CONFIGURATORS_CATEGORY, + Constants.ENABLED_KEY, Constants.ANY_VALUE, + Constants.CHECK_KEY, String.valueOf(false)); + + @Test + public void test_register() { + + registry.register(serviceUrl); + Set registered = registry.getRegistered(); + Assertions.assertEquals(1, registered.size()); + Assertions.assertTrue(registered.contains(serviceUrl)); + + registry.unregister(serviceUrl); + } + + @Test + public void test_unregister() { + + registry.register(serviceUrl); + Set registered = registry.getRegistered(); + Assertions.assertTrue(registered.size() == 1); + Assertions.assertTrue(registered.contains(serviceUrl)); + + registry.unregister(serviceUrl); + + registered = registry.getRegistered(); + Assertions.assertTrue(registered.size() == 0); + } + + @Test + public void test_subscribe() { + + registry.register(serviceUrl); + + final AtomicReference notifiedUrl = new AtomicReference(); + registry.subscribe(consumerUrl, new NotifyListener() { + public void notify(List urls) { + notifiedUrl.set(urls.get(0)); + } + }); + Assertions.assertEquals(serviceUrl.toFullString(), notifiedUrl.get().toFullString()); + Map> arg = registry.getSubscribed(); + Assertions.assertEquals(consumerUrl, arg.keySet().iterator().next()); + } + + @Test + public void test_subscribe_when_register() throws InterruptedException { + + Assertions.assertTrue(registry.getRegistered().size() == 0); + Assertions.assertTrue(registry.getSubscribed().size() == 0); + + CountDownLatch notNotified = new CountDownLatch(2); + + final AtomicReference notifiedUrl = new AtomicReference(); + registry.subscribe(consumerUrl, new NotifyListener() { + public void notify(List urls) { + notifiedUrl.set(urls.get(0)); + notNotified.countDown(); + } + }); + + registry.register(serviceUrl); + + Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS)); + + Assertions.assertEquals(serviceUrl.toFullString(), notifiedUrl.get().toFullString()); + Map> subscribed = registry.getSubscribed(); + Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next()); + } + + @Test + public void test_subscribe_when_register0() throws InterruptedException { + + Assertions.assertTrue(registry.getRegistered().size() == 0); + Assertions.assertTrue(registry.getSubscribed().size() == 0); + + CountDownLatch notNotified = new CountDownLatch(3); + ConcurrentHashMap notifiedUrls = new ConcurrentHashMap<>(); + registry.subscribe(consumerUrl, new NotifyListener() { + public void notify(List urls) { + if (urls != null && urls.size() > 0) { + if (!urls.get(0).getProtocol().equals("empty")) { + for (Iterator iterator = urls.iterator(); iterator.hasNext(); ) { + notifiedUrls.put(iterator.next(), true); + } + } + } + + notNotified.countDown(); + } + }); + + registry.register(serviceUrl); + registry.register(serviceUrl2); + + Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS)); + + Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl)); + Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl2)); + Map> subscribed = registry.getSubscribed(); + Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next()); + } + + @Test + public void test_subscribe_when_register1() throws InterruptedException { + + Assertions.assertTrue(registry.getRegistered().size() == 0); + Assertions.assertTrue(registry.getSubscribed().size() == 0); + + CountDownLatch notNotified = new CountDownLatch(2); + + final AtomicReference notifiedUrls = new AtomicReference(); + registry.subscribe(consumerUrl, new NotifyListener() { + public void notify(List urls) { + notifiedUrls.set(urls.get(0)); + notNotified.countDown(); + } + }); + + registry.register(serviceUrl); + // register service3 should not trigger notify + registry.register(serviceUrl3); + + Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS)); + + Assertions.assertEquals(serviceUrl, notifiedUrls.get()); + Map> subscribed = registry.getSubscribed(); + Assertions.assertEquals(consumerUrl, subscribed.keySet().iterator().next()); + } + + @Test + public void test_subscribe_when_register2() throws InterruptedException { + + Assertions.assertTrue(registry.getRegistered().size() == 0); + Assertions.assertTrue(registry.getSubscribed().size() == 0); + + CountDownLatch notNotified = new CountDownLatch(3); + + ConcurrentHashMap notifiedUrls = new ConcurrentHashMap<>(); + + registry.subscribe(subscribe, new NotifyListener() { + public void notify(List urls) { + if (urls != null && urls.size() > 0) { + if (!urls.get(0).getProtocol().equals("empty")) { + for (Iterator iterator = urls.iterator(); iterator.hasNext(); ) { + notifiedUrls.put(iterator.next(), true); + } + notNotified.countDown(); + } + } + } + }); + + registry.register(serviceUrl); + registry.register(serviceUrl2); + // service3 interface is not equals server2 + registry.register(serviceUrl3); + + Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS)); + Assertions.assertTrue(notifiedUrls.size() == 3); + Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl)); + Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl2)); + Assertions.assertTrue(notifiedUrls.containsKey(serviceUrl3)); + } + + @Test + public void test_unsubscribe() throws InterruptedException { + + Assertions.assertTrue(registry.getRegistered().size() == 0); + Assertions.assertTrue(registry.getSubscribed().size() == 0); + + CountDownLatch notNotified = new CountDownLatch(2); + + final AtomicReference notifiedUrl = new AtomicReference(); + + NotifyListener listener = new NotifyListener() { + public void notify(List urls) { + if (urls != null) { + for (Iterator iterator = urls.iterator(); iterator.hasNext(); ) { + URL url = iterator.next(); + if (!url.getProtocol().equals("empty")) { + notifiedUrl.set(url); + notNotified.countDown(); + } + } + } + } + }; + registry.subscribe(consumerUrl, listener); + registry.unsubscribe(consumerUrl, listener); + + registry.register(serviceUrl); + + Assertions.assertFalse(notNotified.await(2, TimeUnit.SECONDS)); + // expect nothing happen + Assertions.assertTrue(notifiedUrl.get() == null); + } + + @BeforeEach + public void setUp() { + registry = (EtcdRegistry) registryFactory.getRegistry(registryUrl); + Assertions.assertTrue(registry != null); + if (!registry.isAvailable()) { + AbstractRegistryFactory.destroyAll(); + registry = (EtcdRegistry) registryFactory.getRegistry(registryUrl); + } + } + + @AfterEach + public void tearDown() { + + registry.unregister(serviceUrl); + registry.unregister(serviceUrl2); + registry.unregister(serviceUrl3); + registry.unregister(subscribe); + + registry.destroy(); + } + + +} diff --git a/dubbo-registry/pom.xml b/dubbo-registry/pom.xml index 93f64bfe8a49..c50270d1cd09 100644 --- a/dubbo-registry/pom.xml +++ b/dubbo-registry/pom.xml @@ -34,5 +34,6 @@ dubbo-registry-multicast dubbo-registry-zookeeper dubbo-registry-redis + dubbo-registry-etcd3 diff --git a/dubbo-remoting/dubbo-remoting-etcd3/pom.xml b/dubbo-remoting/dubbo-remoting-etcd3/pom.xml new file mode 100644 index 000000000000..768052f6b1d6 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/pom.xml @@ -0,0 +1,52 @@ + + + + + 4.0.0 + + dubbo-remoting + org.apache.dubbo + 2.7.1-SNAPSHOT + + dubbo-remoting-etcd3 + jar + ${project.artifactId} + The etcd3 remoting module of Dubbo project + + false + + + + org.apache.dubbo + dubbo-common + ${project.parent.version} + + + io.etcd + jetcd-core + + + io.netty + netty-all + + + + + \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java new file mode 100644 index 000000000000..f626202467e8 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd; + +public abstract class AbstractRetryPolicy implements RetryPolicy { + + private final int maxRetried; + + protected AbstractRetryPolicy(int maxRetried) { + this.maxRetried = maxRetried; + } + + public boolean shouldRetry(int retried, long elapsed, boolean sleep) { + if (retried < maxRetried) { + try { + if (sleep) { + Thread.sleep(getSleepTime(retried, elapsed)); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + return true; + } + return false; + } + + protected abstract long getSleepTime(int retried, long elapsed); + +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java new file mode 100644 index 000000000000..46a0af8ceed4 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd; + +import java.util.List; + +public interface ChildListener { + + void childChanged(String path, List children); + +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java new file mode 100644 index 000000000000..b1e765d3416e --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd; + +import org.apache.dubbo.common.URL; + +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public interface EtcdClient { + + /** + * save the specified path to the etcd registry. + * + * @param path the path to be saved + */ + void create(String path); + + /** + * save the specified path to the etcd registry. + * if node disconnect from etcd, it will be deleted + * automatically by etcd when sessian timeout. + * + * @param path the path to be saved + * @return the lease of current path. + */ + long createEphemeral(String path); + + /** + * remove the specified from etcd registry. + * + * @param path the path to be removed + */ + void delete(String path); + + /** + * find direct children directory, excluding path self, + * Never return null. + * + * @param path the path to be found direct children. + * @return direct children directory, contains zero element + * list if children directory not exists. + */ + List getChildren(String path); + + /** + * register children listener for specified path. + * + * @param path the path to be watched when children is added, delete or update. + * @param listener when children is changed , listener will be trigged. + * @return direct children directory, contains zero element + * list if children directory not exists. + */ + List addChildListener(String path, ChildListener listener); + + /** + * find watcher of the children listener for specified path. + * + * @param path the path to be watched when children is added, delete or update. + * @param listener when children is changed , listener will be trigged. + * @return watcher if find else null + */ + T getChildListener(String path, ChildListener listener); + + /** + * unregister children lister for specified path. + * + * @param path the path to be unwatched . + * @param listener when children is changed , lister will be trigged. + */ + void removeChildListener(String path, ChildListener listener); + + /** + * support connection notify if connection state was changed. + * + * @param listener if state changed, listener will be triggered. + */ + void addStateListener(StateListener listener); + + /** + * remove connection notify if connection state was changed. + * + * @param listener remove already registered listener, if listener + * not exists nothing happened. + */ + void removeStateListener(StateListener listener); + + /** + * test if current client is active. + * + * @return true if connection is active else false. + */ + boolean isConnected(); + + /** + * close current client and release all resourses. + */ + void close(); + + URL getUrl(); + + /*** + * create new lease from specified second ,it should be waiting if failed.

+ * + * @param second lease time (support second only). + * @return lease id from etcd + */ + long createLease(long second); + + /*** + * create new lease from specified ttl second before waiting specified timeout.

+ * + * @param ttl lease time (support second only). + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @throws CancellationException if this future was cancelled + * @throws ExecutionException if this future completed exceptionally + * @throws InterruptedException if the current thread was interrupted + * while waiting + * @throws TimeoutException if the wait timed out + * @return lease id from etcd + */ + public long createLease(long ttl, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException; + + /** + * revoke specified lease, any associated path will removed automatically. + * + * @param lease to be removed lease + */ + void revokeLease(long lease); + +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java new file mode 100644 index 000000000000..2c0befb0a4c7 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.extension.Adaptive; +import org.apache.dubbo.common.extension.SPI; + +@SPI("jetcd") +public interface EtcdTransporter { + + @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) + EtcdClient connect(URL url); + +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java new file mode 100644 index 000000000000..b1fc5256b1cd --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd; + +public interface RetryPolicy { + + /** + * Whether retry is supported when operation fails. + * + * @param retried the number of times retried so far + * @param elapsed the elapsed time in millisecond since the operation was attempted + * @param sleep should be sleep + * @return true should be retry + */ + public boolean shouldRetry(int retried, long elapsed, boolean sleep); + +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java new file mode 100644 index 000000000000..435808373952 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd; + +public interface StateListener { + + int DISCONNECTED = 0; + + int CONNECTED = 1; + + void stateChanged(int connected); + +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java new file mode 100644 index 000000000000..979caeef82ed --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd.jetcd; + +import com.google.protobuf.ByteString; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.api.Event; +import io.etcd.jetcd.api.KeyValue; +import io.etcd.jetcd.api.WatchCancelRequest; +import io.etcd.jetcd.api.WatchCreateRequest; +import io.etcd.jetcd.api.WatchGrpc; +import io.etcd.jetcd.api.WatchRequest; +import io.etcd.jetcd.api.WatchResponse; +import io.etcd.jetcd.common.exception.ClosedClientException; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import io.netty.util.internal.ConcurrentSet; +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.remoting.etcd.ChildListener; +import org.apache.dubbo.remoting.etcd.StateListener; +import org.apache.dubbo.remoting.etcd.option.OptionUtil; +import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.util.stream.Collectors.toList; +import static org.apache.dubbo.remoting.etcd.jetcd.JEtcdClientWrapper.UTF_8; + +/** + * etct3 client. + */ +public class JEtcdClient extends AbstractEtcdClient { + + private JEtcdClientWrapper clientWrapper; + private ScheduledExecutorService reconnectSchedule; + + private int delayPeriod; + private Logger logger = LoggerFactory.getLogger(JEtcdClient.class); + + public JEtcdClient(URL url) { + super(url); + try { + clientWrapper = new JEtcdClientWrapper(url); + clientWrapper.setConnectionStateListener((client, state) -> { + if (state == StateListener.CONNECTED) { + JEtcdClient.this.stateChanged(StateListener.CONNECTED); + } else if (state == StateListener.DISCONNECTED) { + JEtcdClient.this.stateChanged(StateListener.DISCONNECTED); + } + }); + delayPeriod = getUrl().getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); + reconnectSchedule = Executors.newScheduledThreadPool(1, + new NamedThreadFactory("auto-reconnect")); + clientWrapper.start(); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + public void doCreatePersistent(String path) { + clientWrapper.createPersistent(path); + } + + @Override + public long doCreateEphemeral(String path) { + return clientWrapper.createEphemeral(path); + } + + @Override + public boolean checkExists(String path) { + return clientWrapper.checkExists(path); + } + + @Override + public EtcdWatcher createChildWatcherListener(String path, ChildListener listener) { + return new EtcdWatcher(listener); + } + + @Override + public List addChildWatcherListener(String path, EtcdWatcher etcdWatcher) { + return etcdWatcher.forPath(path); + } + + @Override + public void removeChildWatcherListener(String path, EtcdWatcher etcdWatcher) { + etcdWatcher.unwatch(); + } + + @Override + public List getChildren(String path) { + return clientWrapper.getChildren(path); + } + + @Override + public boolean isConnected() { + return clientWrapper.isConnected(); + } + + @Override + public long createLease(long second) { + return clientWrapper.createLease(second); + } + + @Override + public long createLease(long ttl, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return clientWrapper.createLease(ttl, timeout, unit); + } + + @Override + public void delete(String path) { + clientWrapper.delete(path); + } + + @Override + public void revokeLease(long lease) { + clientWrapper.revokeLease(lease); + } + + @Override + public void doClose() { + try { + reconnectSchedule.shutdownNow(); + } catch (Exception e) { + + } finally { + clientWrapper.doClose(); + } + } + + public class EtcdWatcher implements StreamObserver { + + protected WatchGrpc.WatchStub watchStub; + protected StreamObserver watchRequest; + protected long watchId; + protected String path; + protected Throwable throwable; + protected Set urls = new ConcurrentSet<>(); + private ChildListener listener; + + public EtcdWatcher(ChildListener listener) { + this.listener = listener; + } + + @Override + public void onNext(WatchResponse response) { + + // prevents grpc on sending watchResponse to a closed watch client. + if (!isConnected()) { + return; + } + + watchId = response.getWatchId(); + + if (listener != null) { + int modified = 0; + String service = null; + Iterator iterator = response.getEventsList().iterator(); + while (iterator.hasNext()) { + Event event = iterator.next(); + switch (event.getType()) { + case PUT: { + if (((service = find(event)) != null) + && safeUpdate(service, true)) modified++; + break; + } + case DELETE: { + if (((service = find(event)) != null) + && safeUpdate(service, false)) modified++; + break; + } + default: + break; + } + } + if (modified > 0) { + listener.childChanged(path, new ArrayList<>(urls)); + } + + } + } + + @Override + public void onError(Throwable e) { + tryReconnect(e); + } + + public void unwatch() { + + // prevents grpc on sending watchResponse to a closed watch client. + if (!isConnected()) { + return; + } + + try { + this.listener = null; + if (watchRequest != null) { + WatchCancelRequest watchCancelRequest = + WatchCancelRequest.newBuilder().setWatchId(watchId).build(); + WatchRequest cancelRequest = WatchRequest.newBuilder() + .setCancelRequest(watchCancelRequest).build(); + this.watchRequest.onNext(cancelRequest); + } + } catch (Exception ignored) { + logger.warn("Failed to cancel watch for path '" + path + "'", ignored); + } + } + + public List forPath(String path) { + + if (!isConnected()) { + throw new ClosedClientException("watch client has been closed, path '" + path + "'"); + } + + if (this.path != null) { + if (this.path.equals(path)) { + return clientWrapper.getChildren(path); + } + unwatch(); + } + + this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel()); + this.watchRequest = watchStub.watch(this); + this.path = path; + this.watchRequest.onNext(nextRequest()); + + List children = clientWrapper.getChildren(path); + + /** + * caching the current service + */ + if (!children.isEmpty()) { + this.urls.addAll(filterChildren(children)); + } + + return new ArrayList<>(urls); + } + + private boolean safeUpdate(String service, boolean add) { + synchronized (this) { + /** + * If the collection already contains the specified service, do nothing + */ + return add ? this.urls.add(service) : this.urls.remove(service); + } + } + + private String find(Event event) { + KeyValue keyValue = event.getKv(); + String key = keyValue.getKey().toStringUtf8(); + + int len = path.length(), index = len, count = 0; + if (key.length() >= index) { + for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) { + if (count++ > 1) break; + } + } + + /** + * if children changed , we should refresh invokers + */ + if (count == 1) { + /** + * remove prefix + */ + return key.substring(len + 1); + } + + return null; + } + + private List filterChildren(List children) { + if (children == null) return Collections.emptyList(); + if (children.size() <= 0) return children; + final int len = path.length(); + return children.stream().parallel() + .filter(child -> { + int index = len, count = 0; + if (child.length() > len) { + for (; (index = child.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) { + if (count++ > 1) break; + } + } + return count == 1; + }) + .map(child -> child.substring(len + 1)) + .collect(toList()); + } + + /** + * create new watching request for current path. + */ + protected WatchRequest nextRequest() { + + WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() + .setKey(ByteString.copyFromUtf8(path)) + .setRangeEnd(ByteString.copyFrom( + OptionUtil.prefixEndOf(ByteSequence.from(path, UTF_8)).getBytes())) + .setProgressNotify(true); + + return WatchRequest.newBuilder().setCreateRequest(builder).build(); + } + + public void tryReconnect(Throwable e) { + + this.throwable = e; + + logger.error("watcher client has error occurred, current path '" + path + "'", e); + + // prevents grpc on sending error to a closed watch client. + if (!isConnected()) { + return; + } + + + Status status = Status.fromThrowable(e); + // system may be recover later, current connect won't be lost + if (OptionUtil.isHaltError(status) || OptionUtil.isNoLeaderError(status)) { + reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS); + return; + } + // reconnect with a delay; avoiding immediate retry on a long connection downtime. + reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS); + } + + protected synchronized void reconnect() { + this.closeWatchRequest(); + this.recreateWatchRequest(); + } + + protected void recreateWatchRequest() { + if (watchRequest == null) { + this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel()); + this.watchRequest = watchStub.watch(this); + } + this.watchRequest.onNext(nextRequest()); + this.throwable = null; + logger.warn("watch client retried connect for path '" + path + "', connection status : " + isConnected()); + } + + protected void closeWatchRequest() { + if (this.watchRequest == null) { + return; + } + this.watchRequest.onCompleted(); + this.watchRequest = null; + } + + @Override + public void onCompleted() { + // do not touch this method, if you want terminate this stream. + } + } +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java new file mode 100644 index 000000000000..e563cc2e822d --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java @@ -0,0 +1,706 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd.jetcd; + +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.ClientBuilder; +import io.etcd.jetcd.CloseableClient; +import io.etcd.jetcd.Observers; +import io.etcd.jetcd.common.exception.ErrorCode; +import io.etcd.jetcd.common.exception.EtcdException; +import io.etcd.jetcd.lease.LeaseKeepAliveResponse; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import io.grpc.util.RoundRobinLoadBalancerFactory; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.remoting.etcd.RetryPolicy; +import org.apache.dubbo.remoting.etcd.StateListener; +import org.apache.dubbo.remoting.etcd.option.Constants; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static java.util.stream.Collectors.toList; + +public class JEtcdClientWrapper { + + private Logger logger = LoggerFactory.getLogger(JEtcdClientWrapper.class); + + private final URL url; + private volatile Client client; + private volatile boolean started = false; + private volatile boolean connectState = false; + private ScheduledFuture future; + private ScheduledExecutorService reconnectNotify; + private AtomicReference channel; + + private ConnectionStateListener connectionStateListener; + + private long expirePeriod; + + private CompletableFuture completableFuture; + + private RetryPolicy retryPolicy; + + private RuntimeException failed; + + private final ScheduledFuture retryFuture; + private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true)); + + private final Set failedRegistered = new ConcurrentHashSet(); + + private final Set registeredPaths = new ConcurrentHashSet<>(); + private volatile CloseableClient keepAlive = null; + + /** + * Support temporary nodes to reuse the same lease + */ + private volatile long globalLeaseId; + + private volatile boolean cancelKeepAlive = false; + + public static final Charset UTF_8 = Charset.forName("UTF-8"); + + public JEtcdClientWrapper(URL url) { + this.url = url; + this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_KEEPALIVE_TIMEOUT) / 1000; + if (expirePeriod <= 0) { + this.expirePeriod = Constants.DEFAULT_KEEPALIVE_TIMEOUT / 1000; + } + this.channel = new AtomicReference<>(); + this.completableFuture = CompletableFuture.supplyAsync(() -> prepareClient(url)); + this.reconnectNotify = Executors.newScheduledThreadPool(1, + new NamedThreadFactory("reconnectNotify", true)); + this.retryPolicy = new RetryNTimes(1, 1000, TimeUnit.MILLISECONDS); + + this.failed = new IllegalStateException("Etcd3 registry is not connected yet, url:" + url); + int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); + this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { + public void run() { + try { + retry(); + } catch (Throwable t) { + logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); + } + } + }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); + } + + private Client prepareClient(URL url) { + + int maxInboudSize = DEFAULT_INBOUT_SIZE; + if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY))) { + maxInboudSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY)); + } + + ClientBuilder clientBuilder = Client.builder() + .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) + .endpoints(endPoints(url.getBackupAddress())) + .maxInboundMessageSize(maxInboudSize); + + return clientBuilder.build(); + } + + public Client getClient() { + return client; + } + + /** + * try to get current connected channel. + * + * @return connected channel. + */ + public ManagedChannel getChannel() { + if (channel.get() == null || (channel.get().isShutdown() || channel.get().isTerminated())) { + channel.set(newChannel(client)); + } + return channel.get(); + } + + /** + * find direct children directory, excluding path self, + * Never return null. + * + * @param path the path to be found direct children. + * @return direct children directory, contains zero element + * list if children directory not exists. + */ + public List getChildren(String path) { + try { + return RetryLoops.invokeWithRetry( + new Callable>() { + @Override + public List call() throws Exception { + requiredNotNull(client, failed); + int len = path.length(); + return client.getKVClient() + .get(ByteSequence.from(path, UTF_8), + GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build()) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getKvs().stream().parallel() + .filter(pair -> { + String key = pair.getKey().toString(UTF_8); + int index = len, count = 0; + if (key.length() > len) { + for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) { + if (count++ > 1) break; + } + } + return count == 1; + }) + .map(pair -> pair.getKey().toString(UTF_8)) + .collect(toList()); + } + }, retryPolicy); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public boolean isConnected() { + return ConnectivityState.READY == (getChannel().getState(false)) + || ConnectivityState.IDLE == (getChannel().getState(false)); + } + + public long createLease(long second) { + try { + return RetryLoops.invokeWithRetry( + new Callable() { + @Override + public Long call() throws Exception { + requiredNotNull(client, failed); + return client.getLeaseClient() + .grant(second) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getID(); + } + }, retryPolicy); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public void revokeLease(long lease) { + try { + RetryLoops.invokeWithRetry( + new Callable() { + @Override + public Void call() throws Exception { + requiredNotNull(client, failed); + client.getLeaseClient() + .revoke(lease) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return null; + } + }, retryPolicy); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public long createLease(long ttl, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + + if (timeout <= 0) { + return createLease(ttl); + } + + requiredNotNull(client, failed); + return client.getLeaseClient() + .grant(ttl) + .get(timeout, unit).getID(); + } + + + /** + * try to check if path exists. + */ + public boolean checkExists(String path) { + try { + return RetryLoops.invokeWithRetry( + new Callable() { + @Override + public Boolean call() throws Exception { + requiredNotNull(client, failed); + return client.getKVClient() + .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build()) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getCount() > 0; + } + }, retryPolicy); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + /** + * only internal use only, maybe change in the future + */ + protected Long find(String path) { + try { + return RetryLoops.invokeWithRetry( + new Callable() { + @Override + public Long call() throws Exception { + requiredNotNull(client, failed); + return client.getKVClient() + .get(ByteSequence.from(path, UTF_8)) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getKvs().stream() + .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8))) + .findFirst().getAsLong(); + } + }, retryPolicy); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public void createPersistent(String path) { + try { + RetryLoops.invokeWithRetry( + new Callable() { + @Override + public Void call() throws Exception { + requiredNotNull(client, failed); + client.getKVClient() + .put(ByteSequence.from(path, UTF_8), + ByteSequence.from(String.valueOf(path.hashCode()), UTF_8)) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return null; + } + }, retryPolicy); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + /** + * create new ephemeral path save to etcd . + * if node disconnect from etcd, it will be deleted + * automatically by etcd when sessian timeout. + * + * @param path the path to be saved + * @return the lease of current path. + */ + public long createEphemeral(String path) { + try { + return RetryLoops.invokeWithRetry( + new Callable() { + @Override + public Long call() throws Exception { + requiredNotNull(client, failed); + + keepAlive(); + registeredPaths.add(path); + client.getKVClient() + .put(ByteSequence.from(path, UTF_8) + , ByteSequence.from(String.valueOf(globalLeaseId), UTF_8) + , PutOption.newBuilder().withLeaseId(globalLeaseId).build()) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return globalLeaseId; + } + }, retryPolicy); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + // easy for mock + public void keepAlive(long lease) { + this.keepAlive(lease, null); + } + + private void keepAlive(long lease, Consumer onFailed) { + final StreamObserver observer = new Observers.Builder() + .onError((e) -> { + if (e instanceof EtcdException) { + EtcdException error = (EtcdException) e; + /** + * ttl has expired + */ + if (error.getErrorCode() == ErrorCode.NOT_FOUND) { + keepAlive0(onFailed); + } + } + }).onCompleted(() -> { + /** + * deadline reached. + */ + keepAlive0(onFailed); + }).build(); + + /** + * If there is already a keepalive, cancel first + */ + cancelKeepAlive(); + + /** + * create and set new keepAlive to globalKeepAliveRef + */ + this.keepAlive = client.getLeaseClient().keepAlive(lease, observer); + } + + private void keepAlive() throws Exception { + if (keepAlive == null) { + synchronized (this) { + if (keepAlive == null) { + this.globalLeaseId = client.getLeaseClient() + .grant(expirePeriod) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getID(); + /** + * If the keepAlive expires, the registration will be re-attempted + */ + keepAlive(globalLeaseId, (NULL) -> recovery()); + } + } + } + } + + private void keepAlive0(Consumer onFailed) { + if (onFailed != null) { + + /** + * The following two scenarios will cause the keep-alive failureļ¼š + * + * 1. Service is offline + * 2. Local deadline check expired + * + * The multiplex lease cannot update the local deadline, + * causing the extreme scene service to be dropped. + * + */ + try { + if (logger.isWarnEnabled()) { + logger.warn("Failed to keep alive for global lease, waiting for retry again."); + } + onFailed.accept(null); + } catch (Exception ignored) { + logger.warn("Failed to recover from global lease expired or lease deadline exceeded.", ignored); + } + } + } + + private void recovery() { + + /** + * The client is processing reconnection + */ + if (cancelKeepAlive) return; + + cancelKeepAlive(); + + try { + Set ephemeralPaths = new HashSet(registeredPaths); + if (!ephemeralPaths.isEmpty()) { + for (String path : ephemeralPaths) { + try { + + /** + * The client is processing reconnection, + * cancel remaining service registration + */ + if (cancelKeepAlive) return; + + createEphemeral(path); + failedRegistered.remove(path); + } catch (Exception ignored) { + /** + * waiting for retry again + */ + failedRegistered.add(path); + } + } + } + } catch (Throwable t) { + logger.warn("Unexpected error, failed to recover from global lease expired or deadline exceeded.", t); + } + } + + public void delete(String path) { + try { + RetryLoops.invokeWithRetry( + new Callable() { + @Override + public Void call() throws Exception { + requiredNotNull(client, failed); + client.getKVClient() + .delete(ByteSequence.from(path, UTF_8)) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + registeredPaths.remove(path); + return null; + } + }, retryPolicy); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } finally { + /** + * Cancel retry + */ + failedRegistered.remove(path); + } + } + + public String[] endPoints(String backupAddress) { + String[] endpoints = backupAddress.split(Constants.COMMA_SEPARATOR); + return Arrays.stream(endpoints) + .map(address -> address.indexOf(Constants.HTTP_SUBFIX_KEY) > -1 + ? address + : Constants.HTTP_KEY + address) + .collect(toList()) + .toArray(new String[0]); + } + + /** + * because jetcd's connection change callback not supported yet, we must + * loop to test if connect or disconnect event happend or not. It will be changed + * in the future if we found better choice. + */ + public void start() { + if (!started) { + try { + this.client = completableFuture.get(expirePeriod, TimeUnit.SECONDS); + this.connectState = isConnected(); + this.started = true; + } catch (Throwable t) { + logger.error("Timeout! etcd3 server can not be connected in : " + expirePeriod + " seconds! url: " + url, t); + + completableFuture.whenComplete((c, e) -> { + this.client = c; + if (e != null) { + logger.error("Got an exception when trying to create etcd3 instance, can not connect to etcd3 server, url: " + url, e); + } + }); + + } + + try { + this.future = reconnectNotify.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + boolean connected = isConnected(); + if (connectState != connected) { + int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED; + if (connectionStateListener != null) { + if (connected) { + clearKeepAlive(); + } + connectionStateListener.stateChanged(getClient(), notifyState); + cancelKeepAlive = false; + } + connectState = connected; + } + } + + }, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + logger.error("monitor reconnect status failed.", t); + } + } + } + + private void cancelKeepAlive() { + try { + if (keepAlive != null) { + keepAlive.close(); + } + } finally { + // help for gc + keepAlive = null; + } + } + + private synchronized void clearKeepAlive() { + cancelKeepAlive = true; + registeredPaths.clear(); + failedRegistered.clear(); + cancelKeepAlive(); + } + + protected void doClose() { + + try { + cancelKeepAlive = true; + revokeLease(this.globalLeaseId); + } catch (Exception e) { + logger.warn("revoke global lease '" + globalLeaseId + "' failed, registry: " + url, e); + } + + try { + if (started && future != null) { + started = false; + future.cancel(true); + reconnectNotify.shutdownNow(); + } + } catch (Exception e) { + logger.warn("stop reconnect Notify failed, registry: " + url, e); + } + + try { + retryFuture.cancel(true); + retryExecutor.shutdownNow(); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + + if (getClient() != null) getClient().close(); + } + + /** + * try get client's shared channel, becase all fields is private on jetcd, + * we must using it by reflect, in the future, jetcd may provider better tools. + * + * @param client get channel from current client + * @return current connection channel + */ + private ManagedChannel newChannel(Client client) { + try { + Field connectionField = client.getClass().getDeclaredField("connectionManager"); + if (!connectionField.isAccessible()) { + connectionField.setAccessible(true); + } + Object connection = connectionField.get(client); + Method channel = connection.getClass().getDeclaredMethod("getChannel"); + if (!channel.isAccessible()) { + channel.setAccessible(true); + } + return (ManagedChannel) channel.invoke(connection); + } catch (Exception e) { + throw new RuntimeException("Failed to obtain connection channel from " + url.getBackupAddress(), e); + } + } + + public ConnectionStateListener getConnectionStateListener() { + return connectionStateListener; + } + + public void setConnectionStateListener(ConnectionStateListener connectionStateListener) { + this.connectionStateListener = connectionStateListener; + } + + public static void requiredNotNull(Object obj, RuntimeException exeception) { + if (obj == null) { + throw exeception; + } + } + + private void retry() { + if (!failedRegistered.isEmpty()) { + Set failed = new HashSet(failedRegistered); + if (!failed.isEmpty()) { + + if (cancelKeepAlive) return; + + if (logger.isWarnEnabled()) { + logger.warn("Retry failed register(keep alive) for path '" + failed + + "', path size: " + failed.size()); + } + try { + for (String path : failed) { + try { + + /** + * Is it currently reconnecting ? + */ + if (cancelKeepAlive) return; + + createEphemeral(path); + failedRegistered.remove(path); + } catch (Throwable t) { + logger.warn("Failed to retry register(keep alive) for path '" + path + "', waiting for again, cause: " + t.getMessage(), t); + } + } + } catch (Throwable t) { + logger.warn("Failed to retry register(keep alive) for path '" + failed + "', waiting for again, cause: " + t.getMessage(), t); + } + } + } + } + + public interface ConnectionStateListener { + /** + * Called when there is a state change in the connection + * + * @param client the client + * @param newState the new state + */ + public void stateChanged(Client client, int newState); + } + + /** + * default request timeout + */ + public static final long DEFAULT_REQUEST_TIMEOUT = obtainRequestTimeout(); + + public static final int DEFAULT_INBOUT_SIZE = 100 * 1024 * 1024; + + public static final String GRPC_MAX_INBOUD_SIZE_KEY = "grpc.max.inbound.size"; + + public static final String ETCD_REQUEST_TIMEOUT_KEY = "etcd.request.timeout"; + + private static int obtainRequestTimeout() { + if (StringUtils.isNotEmpty(System.getProperty(ETCD_REQUEST_TIMEOUT_KEY))) { + return Integer.valueOf(System.getProperty(ETCD_REQUEST_TIMEOUT_KEY)); + } + /** + * 10 seconds. + */ + return 10 * 1000; + } +} \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java new file mode 100644 index 000000000000..5ddec8ee1fd3 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd.jetcd; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.remoting.etcd.EtcdClient; +import org.apache.dubbo.remoting.etcd.EtcdTransporter; + +public class JEtcdTransporter implements EtcdTransporter { + + @Override + public EtcdClient connect(URL url) { + return new JEtcdClient(url); + } + +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java new file mode 100644 index 000000000000..cf8617c3ab26 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd.jetcd; + +import io.grpc.Status; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.remoting.etcd.RetryPolicy; +import org.apache.dubbo.remoting.etcd.option.OptionUtil; + +import java.util.concurrent.Callable; + +public class RetryLoops { + + private final long startTimeMs = System.currentTimeMillis(); + private boolean isDone = false; + private int retriedCount = 0; + private Logger logger = LoggerFactory.getLogger(RetryLoops.class); + + public static R invokeWithRetry(Callable task, RetryPolicy retryPolicy) throws Exception { + R result = null; + RetryLoops retryLoop = new RetryLoops(); + while (retryLoop.shouldContinue()) { + try { + result = task.call(); + retryLoop.complete(); + } catch (Exception e) { + retryLoop.fireException(e, retryPolicy); + } + } + return result; + } + + public void fireException(Exception e, RetryPolicy retryPolicy) throws Exception { + + if (e instanceof InterruptedException) Thread.currentThread().interrupt(); + + boolean rethrow = true; + if (isRetryException(e) + && retryPolicy.shouldRetry(retriedCount++, System.currentTimeMillis() - startTimeMs, true)) { + rethrow = false; + } + + if (rethrow) { + throw e; + } + } + + private boolean isRetryException(Throwable e) { + Status status = Status.fromThrowable(e); + if (OptionUtil.isRecoverable(status)) return true; + + return false; + } + + public boolean shouldContinue() { + return !isDone; + } + + public void complete() { + isDone = true; + } + +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java new file mode 100644 index 000000000000..745320830ca6 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd.jetcd; + +import org.apache.dubbo.remoting.etcd.AbstractRetryPolicy; + +import java.util.concurrent.TimeUnit; + +public class RetryNTimes extends AbstractRetryPolicy { + + private final long sleepMilliseconds; + + public RetryNTimes(int maxRetried, int sleepTime, TimeUnit unit) { + super(maxRetried); + this.sleepMilliseconds = unit.convert(sleepTime, TimeUnit.MILLISECONDS); + } + + @Override + protected long getSleepTime(int retried, long elapsed) { + return sleepMilliseconds; + } +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/Constants.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/Constants.java new file mode 100644 index 000000000000..c935808b095c --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/Constants.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd.option; + +/** + * Etcd registry constants. + */ +public class Constants extends org.apache.dubbo.common.Constants { + + public static final String HTTP_SUBFIX_KEY = "://"; + + public static final String HTTP_KEY = "http://"; + + public static final int DEFAULT_KEEPALIVE_TIMEOUT = DEFAULT_SESSION_TIMEOUT / 2; + +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java new file mode 100644 index 000000000000..609f289e91c8 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd.option; + +import io.etcd.jetcd.ByteSequence; +import io.grpc.Status; +import io.netty.handler.codec.http2.Http2Exception; + +import java.util.Arrays; + +public class OptionUtil { + + public static final byte[] NO_PREFIX_END = {0}; + + public static final ByteSequence prefixEndOf(ByteSequence prefix) { + byte[] endKey = prefix.getBytes().clone(); + for (int i = endKey.length - 1; i >= 0; i--) { + if (endKey[i] < 0xff) { + endKey[i] = (byte) (endKey[i] + 1); + return ByteSequence.from(Arrays.copyOf(endKey, i + 1)); + } + } + + return ByteSequence.from(NO_PREFIX_END); + } + + public static boolean isRecoverable(Status status) { + return isHaltError(status) + || isNoLeaderError(status) + // ephemeral is expired + || status.getCode() == Status.Code.NOT_FOUND; + } + + public static boolean isHaltError(Status status) { + // Unavailable codes mean the system will be right back. + // (e.g., can't connect, lost leader) + // Treat Internal codes as if something failed, leaving the + // system in an inconsistent state, but retrying could make progress. + // (e.g., failed in middle of send, corrupted frame) + return status.getCode() != Status.Code.UNAVAILABLE && status.getCode() != Status.Code.INTERNAL; + } + + public static boolean isNoLeaderError(Status status) { + return status.getCode() == Status.Code.UNAVAILABLE + && "etcdserver: no leader".equals(status.getDescription()); + } + + public static boolean isProtocolError(Throwable e) { + if (e == null) return false; + Throwable cause = e.getCause(); + while (cause != null) { + if (cause instanceof Http2Exception) { + Http2Exception t = (Http2Exception) cause; + if ("PROTOCOL_ERROR".equals(t.error().name())) { + return true; + } + } + cause = cause.getCause(); + } + return false; + } +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java new file mode 100644 index 000000000000..31752bffe8f6 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd.support; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.remoting.etcd.ChildListener; +import org.apache.dubbo.remoting.etcd.EtcdClient; +import org.apache.dubbo.remoting.etcd.StateListener; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public abstract class AbstractEtcdClient implements EtcdClient { + + protected static final Logger logger = LoggerFactory.getLogger(AbstractEtcdClient.class); + + private final URL url; + + private final Set stateListeners = new ConcurrentHashSet<>(); + + private final ConcurrentMap> childListeners = new ConcurrentHashMap>(); + private final List categroies = Arrays.asList(Constants.PROVIDERS_CATEGORY + , Constants.CONSUMERS_CATEGORY + , Constants.ROUTERS_CATEGORY + , Constants.CONFIGURATORS_CATEGORY); + private volatile boolean closed = false; + + public AbstractEtcdClient(URL url) { + this.url = url; + } + + public URL getUrl() { + return url; + } + + public void create(String path) { + String fixedPath = fixNamespace(path); + createParentIfAbsent(fixedPath); + doCreatePersistent(fixedPath); + } + + public long createEphemeral(String path) { + String fixedPath = fixNamespace(path); + createParentIfAbsent(fixedPath); + return doCreateEphemeral(path); + } + + public void addStateListener(StateListener listener) { + stateListeners.add(listener); + } + + public void removeStateListener(StateListener listener) { + stateListeners.remove(listener); + } + + public Set getSessionListeners() { + return stateListeners; + } + + public List addChildListener(String path, final ChildListener listener) { + ConcurrentMap listeners = childListeners.get(path); + if (listeners == null) { + childListeners.putIfAbsent(path, new ConcurrentHashMap()); + listeners = childListeners.get(path); + } + WatcherListener targetListener = listeners.get(listener); + if (targetListener == null) { + listeners.putIfAbsent(listener, createChildWatcherListener(path, listener)); + targetListener = listeners.get(listener); + } + return addChildWatcherListener(path, targetListener); + } + + public WatcherListener getChildListener(String path, ChildListener listener) { + ConcurrentMap listeners = childListeners.get(path); + if (listeners == null) { + return null; + } + WatcherListener targetListener = listeners.get(listener); + if (targetListener == null) { + listeners.putIfAbsent(listener, createChildWatcherListener(path, listener)); + targetListener = listeners.get(listener); + } + return targetListener; + } + + public void removeChildListener(String path, ChildListener listener) { + ConcurrentMap listeners = childListeners.get(path); + if (listeners != null) { + WatcherListener targetListener = listeners.remove(listener); + if (targetListener != null) { + removeChildWatcherListener(path, targetListener); + } + } + } + + protected void stateChanged(int state) { + for (StateListener sessionListener : getSessionListeners()) { + sessionListener.stateChanged(state); + } + } + + protected String fixNamespace(String path) { + if (StringUtils.isEmpty(path)) { + throw new IllegalArgumentException("path is required, actual null or ''"); + } + return (path.charAt(0) != '/') ? (Constants.PATH_SEPARATOR + path) : path; + } + + protected void createParentIfAbsent(String fixedPath) { + int i = fixedPath.lastIndexOf('/'); + if (i > 0) { + String parentPath = fixedPath.substring(0, i); + if (categroies.stream().anyMatch(c -> fixedPath.endsWith(c))) { + if (!checkExists(parentPath)) { + this.doCreatePersistent(parentPath); + } + } else if (categroies.stream().anyMatch(c -> parentPath.endsWith(c))) { + String grandfather = parentPath.substring(0, parentPath.lastIndexOf('/')); + if (!checkExists(grandfather)) { + this.doCreatePersistent(grandfather); + } + } + } + } + + public void close() { + if (closed) { + return; + } + closed = true; + try { + doClose(); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + } + + public abstract void doClose(); + + public abstract void doCreatePersistent(String path); + + public abstract long doCreateEphemeral(String path); + + public abstract void delete(String path); + + public abstract boolean checkExists(String path); + + public abstract WatcherListener createChildWatcherListener(String path, ChildListener listener); + + public abstract List addChildWatcherListener(String path, WatcherListener listener); + + public abstract void removeChildWatcherListener(String path, WatcherListener listener); + +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter b/dubbo-remoting/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter new file mode 100644 index 000000000000..d10733a0dd50 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter @@ -0,0 +1 @@ +jetcd=org.apache.dubbo.remoting.etcd.jetcd.JEtcdTransporter \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java new file mode 100644 index 000000000000..19254abeac78 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd.jetcd; + +import io.etcd.jetcd.common.exception.ClosedClientException; +import io.grpc.Status; +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.remoting.etcd.ChildListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +@Disabled +public class JEtcdClientTest { + + JEtcdClient client; + + @Test + public void test_watch_when_create_path() throws InterruptedException { + + String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers"; + String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1"; + + final CountDownLatch notNotified = new CountDownLatch(1); + + ChildListener childListener = (parent, children) -> { + Assertions.assertEquals(1, children.size()); + Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0)); + notNotified.countDown(); + }; + + client.addChildListener(path, childListener); + + client.createEphemeral(child); + Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS)); + + client.removeChildListener(path, childListener); + client.delete(child); + } + + @Test + public void test_watch_when_create_wrong_path() throws InterruptedException { + + String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers"; + String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/routers/demoService1"; + + final CountDownLatch notNotified = new CountDownLatch(1); + + ChildListener childListener = (parent, children) -> { + Assertions.assertEquals(1, children.size()); + Assertions.assertEquals(child, children.get(0)); + notNotified.countDown(); + }; + + client.addChildListener(path, childListener); + + client.createEphemeral(child); + Assertions.assertFalse(notNotified.await(1, TimeUnit.SECONDS)); + + client.removeChildListener(path, childListener); + client.delete(child); + } + + @Test + public void test_watch_when_delete_path() throws InterruptedException { + + String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers"; + String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1"; + + final CountDownLatch notNotified = new CountDownLatch(1); + + ChildListener childListener = (parent, children) -> { + Assertions.assertEquals(0, children.size()); + notNotified.countDown(); + }; + + client.createEphemeral(child); + + client.addChildListener(path, childListener); + client.delete(child); + + Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS)); + client.removeChildListener(path, childListener); + } + + @Test + public void test_watch_then_unwatch() throws InterruptedException { + + String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers"; + String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService2"; + + final CountDownLatch notNotified = new CountDownLatch(1); + final CountDownLatch notTwiceNotified = new CountDownLatch(2); + + final Holder notified = new Holder(); + + ChildListener childListener = (parent, children) -> { + Assertions.assertEquals(1, children.size()); + Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0)); + notNotified.countDown(); + notTwiceNotified.countDown(); + notified.getAndIncrease(); + }; + + client.addChildListener(path, childListener); + + client.createEphemeral(child); + Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS)); + + client.removeChildListener(path, childListener); + client.delete(child); + + Assertions.assertFalse(notTwiceNotified.await(5, TimeUnit.SECONDS)); + Assertions.assertEquals(1, notified.value); + client.delete(child); + } + + @Test + public void test_watch_on_unrecoverable_connection() throws InterruptedException { + + String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers"; + JEtcdClient.EtcdWatcher watcher = null; + try { + ChildListener childListener = (parent, children) -> { + Assertions.assertEquals(path, parent); + }; + client.addChildListener(path, childListener); + watcher = client.getChildListener(path, childListener); + watcher.watchRequest.onError(Status.ABORTED.withDescription("connection error").asRuntimeException()); + + watcher.watchRequest.onNext(watcher.nextRequest()); + } catch (Exception e) { + Assertions.assertTrue(e.getMessage().contains("call was cancelled")); + } + } + + @Test + public void test_watch_on_recoverable_connection() throws InterruptedException { + + String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection"; + String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection/demoService1"; + + final CountDownLatch notNotified = new CountDownLatch(1); + final CountDownLatch notTwiceNotified = new CountDownLatch(2); + final Holder notified = new Holder(); + ChildListener childListener = (parent, children) -> { + notTwiceNotified.countDown(); + switch (notified.increaseAndGet()) { + case 1: { + notNotified.countDown(); + Assertions.assertTrue(children.size() == 1); + Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0)); + break; + } + case 2: { + Assertions.assertTrue(children.size() == 0); + Assertions.assertEquals(path, parent); + break; + } + default: + Assertions.fail("two many callback invoked."); + } + }; + + client.addChildListener(path, childListener); + client.createEphemeral(child); + + // make sure first time callback successfully + Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS)); + + // connection error causes client to release all resources including current watcher + JEtcdClient.EtcdWatcher watcher = client.getChildListener(path, childListener); + watcher.onError(Status.UNAVAILABLE.withDescription("temporary connection issue").asRuntimeException()); + + // trigger delete after unavailable + client.delete(child); + Assertions.assertTrue(notTwiceNotified.await(15, TimeUnit.SECONDS)); + + client.removeChildListener(path, childListener); + } + + @Test + public void test_watch_after_client_closed() throws InterruptedException { + + String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers"; + client.close(); + + try { + client.addChildListener(path, (parent, children) -> { + Assertions.assertEquals(path, parent); + }); + } catch (ClosedClientException e) { + Assertions.assertEquals("watch client has been closed, path '" + path + "'", e.getMessage()); + } + } + + @BeforeEach + public void setUp() { + // timeout in 15 seconds. + URL url = URL.valueOf("etcd3://127.0.0.1:2379/com.alibaba.dubbo.registry.RegistryService") + .addParameter(Constants.SESSION_TIMEOUT_KEY, 15000); + + client = new JEtcdClient(url); + } + + @AfterEach + public void tearDown() { + client.close(); + } + + static class Holder { + + volatile int value; + + synchronized int getAndIncrease() { + return value++; + } + + synchronized int increaseAndGet() { + return ++value; + } + } +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java new file mode 100644 index 000000000000..b7d2671e4357 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.etcd.jetcd; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.LockSupport; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +@Disabled +public class JEtcdClientWrapperTest { + + JEtcdClientWrapper clientWrapper; + + @Test + public void test_path_exists() { + String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; + clientWrapper.createPersistent(path); + Assertions.assertTrue(clientWrapper.checkExists(path)); + Assertions.assertFalse(clientWrapper.checkExists(path + "/noneexits")); + clientWrapper.delete(path); + } + + @Test + public void test_create_emerphal_path() { + String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; + clientWrapper.createEphemeral(path); + Assertions.assertTrue(clientWrapper.checkExists(path)); + clientWrapper.delete(path); + } + + @Test + public void test_grant_lease_then_revoke() { + long lease = clientWrapper.createLease(1); + clientWrapper.revokeLease(lease); + + long newLease = clientWrapper.createLease(1); + LockSupport.parkNanos(this, TimeUnit.SECONDS.toNanos(2)); + // test timeout of lease + clientWrapper.revokeLease(newLease); + } + + @Test + public void test_create_emerphal_path_then_timeout() { + String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; + + URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService") + .addParameter(Constants.SESSION_TIMEOUT_KEY, 1000); + + JEtcdClientWrapper saved = clientWrapper; + + try { + clientWrapper = spy(new JEtcdClientWrapper(url)); + clientWrapper.start(); + + doAnswer(new Answer() { + int timeout; + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + LockSupport.parkNanos(this, TimeUnit.SECONDS.toNanos(2)); + if (timeout++ > 0) { + throw new TimeoutException(); + } + return null; + } + }).when(clientWrapper).keepAlive(anyLong()); + + try { + clientWrapper.createEphemeral(path); + } catch (IllegalStateException ex) { + Assertions.assertEquals("failed to create ephereral by path '" + path + "'", ex.getMessage()); + } + + } finally { + clientWrapper.doClose(); + clientWrapper = saved; + } + } + + @Test + public void test_get_emerphal_children_path() { + String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; + String[] children = { + "/dubbo/org.apache.dubbo.demo.DemoService/providers/service1" + , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service2" + , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service3" + , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service4" + , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service5/exclude" + }; + + Arrays.stream(children).forEach((child) -> { + Assertions.assertFalse(clientWrapper.checkExists(child)); + clientWrapper.createEphemeral(child); + }); + + List extected = clientWrapper.getChildren(path); + + Assertions.assertEquals(4, extected.size()); + extected.stream().forEach((child) -> { + boolean found = false; + for (int i = 0; i < children.length; ++i) { + if (child.equals(children[i])) { + found = true; + break; + } + } + Assertions.assertTrue(found); + clientWrapper.delete(child); + }); + } + + @Test + public void test_connect_cluster() { + URL url = URL.valueOf("etcd3://127.0.0.1:22379/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:2379,127.0.0.1:32379"); + JEtcdClientWrapper clientWrapper = new JEtcdClientWrapper(url); + try { + clientWrapper.start(); + String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; + clientWrapper.createEphemeral(path); + Assertions.assertTrue(clientWrapper.checkExists(path)); + Assertions.assertFalse(clientWrapper.checkExists(path + "/noneexits")); + clientWrapper.delete(path); + } finally { + clientWrapper.doClose(); + } + } + + @BeforeEach + public void setUp() { + URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService"); + clientWrapper = new JEtcdClientWrapper(url); + clientWrapper.start(); + } + + @AfterEach + public void tearDown() { + clientWrapper.doClose(); + } +} diff --git a/dubbo-remoting/pom.xml b/dubbo-remoting/pom.xml index d646c2235ebc..fd17dbf0cd64 100644 --- a/dubbo-remoting/pom.xml +++ b/dubbo-remoting/pom.xml @@ -38,5 +38,6 @@ dubbo-remoting-http dubbo-remoting-zookeeper dubbo-remoting-netty4 + dubbo-remoting-etcd3 \ No newline at end of file