Skip to content

Commit

Permalink
update runtime instance for adminservice
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed May 28, 2024
1 parent 400c57d commit ef4504a
Showing 1 changed file with 24 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.apache.eventmesh.runtime.boot;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;

import org.apache.eventmesh.registry.QueryInstances;
import org.apache.eventmesh.registry.RegisterServerInfo;
import org.apache.eventmesh.registry.RegistryFactory;
Expand All @@ -26,7 +28,7 @@ public class RuntimeInstance {

private Map<String, RegisterServerInfo> adminServerInfoMap = new HashMap<>();

// private final RegistryService registryService;
private final RegistryService registryService;

private Runtime runtime;

Expand All @@ -38,20 +40,20 @@ public class RuntimeInstance {

public RuntimeInstance(RuntimeInstanceConfig runtimeInstanceConfig) {
this.runtimeInstanceConfig = runtimeInstanceConfig;
// this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType());
this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType());
}

public void init() throws Exception {
// registryService.init();
registryService.init();
QueryInstances queryInstances = new QueryInstances();
queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName());
queryInstances.setHealth(true);
// List<RegisterServerInfo> adminServerRegisterInfoList = registryService.selectInstances(queryInstances);
// if (!adminServerRegisterInfoList.isEmpty()) {
// adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList);
// } else {
// throw new RuntimeException("admin server address is empty, please check");
// }
List<RegisterServerInfo> adminServerRegisterInfoList = registryService.selectInstances(queryInstances);
if (!adminServerRegisterInfoList.isEmpty()) {
adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList);
} else {
throw new RuntimeException("admin server address is empty, please check");
}
runtimeInstanceConfig.setAdminServerAddr(adminServerAddr);
runtimeFactory = initRuntimeFactory(runtimeInstanceConfig);
runtime = runtimeFactory.createRuntime(runtimeInstanceConfig);
Expand All @@ -61,19 +63,19 @@ public void init() throws Exception {
public void start() throws Exception {
if (!StringUtils.isBlank(adminServerAddr)) {

// registryService.subscribe((event) -> {
// log.info("runtime receive registry event: {}", event);
// List<RegisterServerInfo> registerServerInfoList = event.getInstances();
// Map<String, RegisterServerInfo> registerServerInfoMap = new HashMap<>();
// for (RegisterServerInfo registerServerInfo : registerServerInfoList) {
// registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo);
// }
// if (!registerServerInfoMap.isEmpty()) {
// adminServerInfoMap = registerServerInfoMap;
// updateAdminServerAddr();
// }
//
// }, runtimeInstanceConfig.getAdminServiceName());
registryService.subscribe((event) -> {
log.info("runtime receive registry event: {}", event);
List<RegisterServerInfo> registerServerInfoList = event.getInstances();
Map<String, RegisterServerInfo> registerServerInfoMap = new HashMap<>();
for (RegisterServerInfo registerServerInfo : registerServerInfoList) {
registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo);
}
if (!registerServerInfoMap.isEmpty()) {
adminServerInfoMap = registerServerInfoMap;
updateAdminServerAddr();
}

}, runtimeInstanceConfig.getAdminServiceName());
runtime.start();
isStarted = true;
} else {
Expand Down

0 comments on commit ef4504a

Please sign in to comment.