Skip to content

Commit

Permalink
[ISSUE #4931]Add Registry Module for Discovery AdminServer
Browse files Browse the repository at this point in the history
  • Loading branch information
sodaRyCN authored May 29, 2024
1 parent 859ad8d commit 78942c4
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 126 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.apache.eventmesh.registry;

import lombok.Getter;

import java.util.List;

public class NotifyEvent {

public NotifyEvent(){

}

public NotifyEvent(List<RegisterServerInfo> instances) {
this(instances, false);
}

public NotifyEvent(List<RegisterServerInfo> instances, boolean isIncrement) {
this.isIncrement = isIncrement;
this.instances = instances;
}



// means whether it is an increment data
@Getter
private boolean isIncrement;

@Getter
private List<RegisterServerInfo> instances;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,17 @@ public void setMetadata(Map<String, String> metadata) {
public void addMetadata(String key, String value) {
this.metadata.put(key, value);
}

public void setExtFields(Map<String, Object> extFields) {
if (extFields == null) {
this.extFields.clear();
return;
}

this.extFields = extFields;
}

public void addExtFields(String key, Object value) {
this.extFields.put(key, value);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.eventmesh.registry;

import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;

import java.util.HashMap;
import java.util.Map;

@Slf4j
public class RegistryFactory {
private static final Map<String, RegistryService> META_CACHE = new HashMap<>(16);

public static RegistryService getInstance(String registryPluginType) {
return META_CACHE.computeIfAbsent(registryPluginType, RegistryFactory::registryBuilder);
}

private static RegistryService registryBuilder(String registryPluginType) {
RegistryService registryServiceExt = EventMeshExtensionFactory.getExtension(RegistryService.class, registryPluginType);
if (registryServiceExt == null) {
String errorMsg = "can't load the registry plugin, please check.";
log.error(errorMsg);
throw new RuntimeException(errorMsg);
}
log.info("build registry plugin [{}] by type [{}] success", registryServiceExt.getClass().getSimpleName(),
registryPluginType);
return registryServiceExt;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package org.apache.eventmesh.registry;

public interface RegistryListener {
void onChange(Object data);
void onChange(NotifyEvent event) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

@EventMeshSPI(eventMeshExtensionType = EventMeshExtensionType.REGISTRY)
public interface RegistryService {
String ConfigurationKey = "registry";
void init() throws RegistryException;

void shutdown() throws RegistryException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
import org.apache.eventmesh.registry.NotifyEvent;
import org.apache.eventmesh.registry.QueryInstances;
import org.apache.eventmesh.registry.RegisterServerInfo;
import org.apache.eventmesh.registry.RegistryListener;
Expand All @@ -28,35 +30,43 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

@Slf4j
public class NacosDiscoveryService implements RegistryService {
private final AtomicBoolean initFlag = new AtomicBoolean(false);

private CommonConfiguration configuration;
private final AtomicBoolean initFlag = new AtomicBoolean(false);

private NacosRegistryConfiguration nacosConf;

private NamingService namingService;

private final Map<String, Map<RegistryListener, EventListener>> listeners = new HashMap<>();

private static final Executor notifyExecutor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20), r -> {
Thread t = new Thread(r);
t.setName("org.apache.eventmesh.registry.nacos.executor");
t.setDaemon(true);
return t;
}, new ThreadPoolExecutor.DiscardOldestPolicy()
);

private final Lock lock = new ReentrantLock();
private static final String GROUP_NAME = "admin";


@Override
public void init() throws RegistryException {
if (!initFlag.compareAndSet(false, true)) {
return;
}
configuration = ConfigurationContextUtil.get(RegistryService.ConfigurationKey);
if (configuration == null ) {
throw new RegistryException("registry config instance is null");
}
nacosConf = ConfigService.getInstance().buildConfigInstance(NacosRegistryConfiguration.class);
if (nacosConf == null) {
log.info("nacos registry configuration is null");
Expand All @@ -73,12 +83,13 @@ public void init() throws RegistryException {

private Properties buildProperties() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, configuration.getRegistryAddr());
properties.setProperty(PropertyKeyConst.USERNAME, configuration.getEventMeshRegistryPluginUsername());
properties.setProperty(PropertyKeyConst.PASSWORD, configuration.getEventMeshRegistryPluginPassword());
if (nacosConf == null) {
return properties;
}
properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosConf.getRegistryAddr());
properties.setProperty(PropertyKeyConst.USERNAME, nacosConf.getEventMeshRegistryPluginUsername());
properties.setProperty(PropertyKeyConst.PASSWORD, nacosConf.getEventMeshRegistryPluginPassword());

String endpoint = nacosConf.getEndpoint();
if (Objects.nonNull(endpoint) && endpoint.contains(":")) {
int index = endpoint.indexOf(":");
Expand All @@ -87,7 +98,8 @@ private Properties buildProperties() {
} else {
Optional.ofNullable(endpoint).ifPresent(value -> properties.put(PropertyKeyConst.ENDPOINT, endpoint));
String endpointPort = nacosConf.getEndpointPort();
Optional.ofNullable(endpointPort).ifPresent(value -> properties.put(PropertyKeyConst.ENDPOINT_PORT, endpointPort));
Optional.ofNullable(endpointPort).ifPresent(value -> properties.put(PropertyKeyConst.ENDPOINT_PORT,
endpointPort));
}
String accessKey = nacosConf.getAccessKey();
Optional.ofNullable(accessKey).ifPresent(value -> properties.put(PropertyKeyConst.ACCESS_KEY, accessKey));
Expand All @@ -96,7 +108,8 @@ private Properties buildProperties() {
String clusterName = nacosConf.getClusterName();
Optional.ofNullable(clusterName).ifPresent(value -> properties.put(PropertyKeyConst.CLUSTER_NAME, clusterName));
String logFileName = nacosConf.getLogFileName();
Optional.ofNullable(logFileName).ifPresent(value -> properties.put(UtilAndComs.NACOS_NAMING_LOG_NAME, logFileName));
Optional.ofNullable(logFileName).ifPresent(value -> properties.put(UtilAndComs.NACOS_NAMING_LOG_NAME,
logFileName));
String logLevel = nacosConf.getLogLevel();
Optional.ofNullable(logLevel).ifPresent(value -> properties.put(UtilAndComs.NACOS_NAMING_LOG_LEVEL, logLevel));
Integer pollingThreadCount = nacosConf.getPollingThreadCount();
Expand All @@ -122,19 +135,54 @@ public void subscribe(RegistryListener listener, String serviceName) {
lock.lock();
try {
ServiceInfo serviceInfo = ServiceInfo.fromKey(serviceName);
Map<RegistryListener, EventListener> eventListenerMap = listeners.computeIfAbsent(serviceName, k -> new HashMap<>());
Map<RegistryListener, EventListener> eventListenerMap = listeners.computeIfAbsent(serviceName,
k -> new HashMap<>());
if (eventListenerMap.containsKey(listener)) {
log.warn("already use same listener subscribe service name {}" ,serviceName);
log.warn("already use same listener subscribe service name {}", serviceName);
return;
}
EventListener eventListener = listener::onChange;
List<String> clusters ;
EventListener eventListener = new AbstractEventListener() {
@Override
public Executor getExecutor() {
return notifyExecutor;
}

@Override
public void onEvent(Event event) {
if (!(event instanceof NamingEvent)) {
log.warn("received notify event type isn't not as expected");
return;
}
try {
NamingEvent namingEvent = (NamingEvent) event;
List<Instance> instances = namingEvent.getInstances();
List<RegisterServerInfo> list = new ArrayList<>();
if (instances != null) {
for (Instance instance : instances) {
RegisterServerInfo info = new RegisterServerInfo();
info.setAddress(instance.getIp() + ":" + instance.getPort());
info.setMetadata(instance.getMetadata());
info.setHealth(instance.isHealthy());
info.setServiceName(
ServiceInfo.getKey(NamingUtils.getGroupedName(namingEvent.getServiceName(),
namingEvent.getGroupName()),
namingEvent.getClusters()));
list.add(info);
}
}
listener.onChange(new NotifyEvent(list));
} catch (Exception e) {
log.warn("");
}
}
};
List<String> clusters;
if (serviceInfo.getClusters() == null || serviceInfo.getClusters().isEmpty()) {
clusters = new ArrayList<>();
} else {
clusters = Arrays.stream(serviceInfo.getClusters().split(",")).collect(Collectors.toList());
}
namingService.subscribe(serviceInfo.getName(),serviceInfo.getGroupName(), clusters, eventListener);
namingService.subscribe(serviceInfo.getName(), serviceInfo.getGroupName(), clusters, eventListener);
eventListenerMap.put(listener, eventListener);
} catch (Exception e) {
log.error("subscribe service name {} fail", serviceName, e);
Expand All @@ -152,7 +200,7 @@ public void unsubscribe(RegistryListener registryListener, String serviceName) {
if (map == null) {
return;
}
List<String> clusters ;
List<String> clusters;
if (serviceInfo.getClusters() == null || serviceInfo.getClusters().isEmpty()) {
clusters = new ArrayList<>();
} else {
Expand All @@ -177,14 +225,18 @@ public List<RegisterServerInfo> selectInstances(QueryInstances queryInstances) {
if (StringUtils.isNotBlank(serviceInfo.getClusters())) {
clusters.addAll(Arrays.asList(serviceInfo.getClusters().split(",")));
}
List<Instance> instances = namingService.selectInstances(serviceInfo.getName(), serviceInfo.getGroupName(), clusters, queryInstances.isHealth());
List<Instance> instances = namingService.selectInstances(serviceInfo.getName(),
serviceInfo.getGroupName(), clusters,
queryInstances.isHealth());
if (instances != null) {
instances.forEach(x -> {
RegisterServerInfo instanceInfo = new RegisterServerInfo();
instanceInfo.setMetadata(x.getMetadata());
instanceInfo.setHealth(x.isHealthy());
instanceInfo.setAddress(x.getIp() + ":" + x.getPort());
instanceInfo.setServiceName(ServiceInfo.getKey(NamingUtils.getGroupedName(x.getServiceName(), serviceInfo.getGroupName()), x.getClusterName()));
instanceInfo.setServiceName(
ServiceInfo.getKey(NamingUtils.getGroupedName(x.getServiceName(),
serviceInfo.getGroupName()), x.getClusterName()));
list.add(instanceInfo);
});
}
Expand Down Expand Up @@ -228,7 +280,9 @@ public boolean unRegister(RegisterServerInfo eventMeshRegisterInfo) throws Regis
return false;
}
ServiceInfo serviceInfo = ServiceInfo.fromKey(eventMeshRegisterInfo.getServiceName());
namingService.deregisterInstance(serviceInfo.getName(), serviceInfo.getGroupName(), ipPort[0], Integer.parseInt(ipPort[1]), serviceInfo.getClusters());
namingService.deregisterInstance(serviceInfo.getName(), serviceInfo.getGroupName(), ipPort[0],
Integer.parseInt(ipPort[1]),
serviceInfo.getClusters());
return true;
} catch (Exception e) {
log.error("unregister instance service {} fail", eventMeshRegisterInfo, e);
Expand Down
Loading

0 comments on commit 78942c4

Please sign in to comment.