Skip to content

Commit

Permalink
Merge pull request #140 from crossoverJie/refator-metastore
Browse files Browse the repository at this point in the history
Refactor Metastore
  • Loading branch information
crossoverJie authored Aug 29, 2024
2 parents 5cbd207 + 95d306e commit a108170
Show file tree
Hide file tree
Showing 20 changed files with 486 additions and 82 deletions.
19 changes: 18 additions & 1 deletion cim-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,28 @@
<artifactId>guava</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
<groupId>com.github.sgroschupf</groupId>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>


<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.crossoverjie.cim.common.metastore;

import lombok.Builder;
import lombok.Data;

/**
* @author crossverJie
*/
@Data
@Builder
public class AbstractConfiguration<RETRY> {

private String metaServiceUri;
private int timeoutMs;
private RETRY retryPolicy;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.crossoverjie.cim.common.metastore;

import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
* @author crossoverJie
*/
public interface MetaStore {

void initialize(AbstractConfiguration<?> configuration) throws Exception;

/**
* Get available server list
* @return available server list
* @throws Exception exception
*/
Set<String> getAvailableServerList() throws Exception;

/**
* Add server to meta store
* @throws Exception exception
*/
void addServer(String ip, int cimServerPort, int httpPort) throws Exception;

/**
* Subscribe server list
* @param childListener child listener
* @throws Exception exception
*/
void listenServerList(ChildListener childListener) throws Exception;


/**
* @throws Exception
*/
void rebuildCache() throws Exception;

interface ChildListener {
/**
* Child changed
* @param parentPath parent path(eg. for zookeeper: [/cim])
* @param currentChildren current children
* @throws Exception exception
*/
void childChanged(String parentPath, List<String> currentChildren) throws Exception;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.crossoverjie.cim.common.metastore;

import org.apache.curator.RetryPolicy;

/**
* @author crossoverJie
*/
public class ZkConfiguration extends AbstractConfiguration<RetryPolicy> {
ZkConfiguration(String metaServiceUri, int timeout, RetryPolicy retryPolicy) {
super(metaServiceUri, timeout, retryPolicy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.crossoverjie.cim.common.metastore;

import com.crossoverjie.cim.common.pojo.RouteInfo;
import com.crossoverjie.cim.common.util.RouteInfoParseUtil;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;

/**
* @author crossovreJie
*/
@Slf4j
public class ZkMetaStoreImpl implements MetaStore {
public static final String ROOT = "/cim";

private ZkClient client;

LoadingCache<String, String> cache;

@Override
public void initialize(AbstractConfiguration<?> configuration) throws Exception {
// TODO: 2024/8/19 Change to set or caffeine?
cache = CacheBuilder.newBuilder()
.concurrencyLevel(3)
.build(new CacheLoader<>() {
@Override
public String load(String s) {
return null;
}
});
client = new ZkClient(configuration.getMetaServiceUri(), configuration.getTimeoutMs());
}

@Override
public Set<String> getAvailableServerList() throws Exception {
if (cache.size() > 0) {
return cache.asMap().keySet();
}
List<String> coll = client.getChildren(ROOT);
Map<String, String> voidMap = coll.stream().collect(Collectors.toMap(
Function.identity(),
Function.identity()
));
cache.putAll(voidMap);
return voidMap.keySet();
}

@Override
public void addServer(String ip, int cimServerPort, int httpPort) throws Exception {
boolean exists = client.exists(ROOT);
if (!exists) {
client.createPersistent(ROOT);
}
String zkParse = RouteInfoParseUtil.parse(RouteInfo.builder()
.ip(ip)
.cimServerPort(cimServerPort)
.httpPort(httpPort)
.build());
String serverPath = String.format("%s/%s", ROOT, zkParse);
client.createEphemeral(serverPath);
log.info("Add server to zk [{}]", serverPath);
}

@Override
public void listenServerList(ChildListener childListener) throws Exception {
client.subscribeChildChanges(ROOT, (parentPath, currentChildren) -> {
log.info("Clear and update local cache parentPath=[{}],currentChildren=[{}]", parentPath, currentChildren.toString());
childListener.childChanged(parentPath, currentChildren);

// TODO: 2024/8/19 maybe can reuse currentChildren.
// Because rebuildCache() will re-fetch the server list from zk.
rebuildCache();
});
}

@Override
public synchronized void rebuildCache() throws Exception {
cache.invalidateAll();

// Because of calling invalidateAll, this method will re-fetch the server list from zk.
this.getAvailableServerList();

}


private List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception {
/**
* Get children and set a watcher on the node. The watcher notification will come through the
* CuratorListener (see setDataAsync() above).
*/
return client.getChildren().watched().forPath(path);
}

private void createEphemeral(CuratorFramework client, String path, byte[] payload) throws Exception {
// this will create the given EPHEMERAL ZNode with the given data
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
}

private void create(CuratorFramework client, String path, byte[] payload) throws Exception {
// this will create the given ZNode with the given data
client.create().forPath(path, payload);
}

private void watchedGetChildren(CuratorFramework client, String path, Watcher watcher)
throws Exception {
// Get children and set the given watcher on the node.
client.getChildren().usingWatcher(watcher).forPath(path);
}
}
Original file line number Diff line number Diff line change
@@ -1,45 +1,22 @@
package com.crossoverjie.cim.common.pojo;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

/**
* Function:
*
* @author crossoverJie
* Date: 2020-04-12 20:48
* @since JDK 1.8
*/
@Data
@AllArgsConstructor
@Builder
public final class RouteInfo {

private String ip ;
private Integer cimServerPort;
private Integer httpPort;

public RouteInfo(String ip, Integer cimServerPort, Integer httpPort) {
this.ip = ip;
this.cimServerPort = cimServerPort;
this.httpPort = httpPort;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public Integer getCimServerPort() {
return cimServerPort;
}

public void setCimServerPort(Integer cimServerPort) {
this.cimServerPort = cimServerPort;
}

public Integer getHttpPort() {
return httpPort;
}

public void setHttpPort(Integer httpPort) {
this.httpPort = httpPort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@ public class RouteInfoParseUtil {
public static RouteInfo parse(String info){
try {
String[] serverInfo = info.split(":");
RouteInfo routeInfo = new RouteInfo(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2])) ;
return routeInfo ;
return new RouteInfo(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));
}catch (Exception e){
throw new CIMException(VALIDATION_FAIL) ;
}
}

public static String parse(RouteInfo routeInfo){
return routeInfo.getIp() + ":" + routeInfo.getCimServerPort() + ":" + routeInfo.getHttpPort();
}

private RouteInfoParseUtil() {
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package com.crossoverjie.cim.common;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.Test;

import java.time.LocalDate;
Expand Down Expand Up @@ -79,4 +85,19 @@ private void cycle() throws InterruptedException {
}
}

@Test
public void test3(){

List<String> coll = List.of("abc","def");
Set<String> set = new HashSet<>(coll);
System.out.println(set);
Map<String, Void> map = set.stream()
.collect(Collectors.toMap(
Function.identity(), // 将Set中的元素作为键
v -> null // 每个键对应的值为null
));
System.out.println(map);
}


}
Loading

0 comments on commit a108170

Please sign in to comment.