Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Metastore #140

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion cim-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,28 @@
<artifactId>guava</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
<groupId>com.github.sgroschupf</groupId>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>


<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.crossoverjie.cim.common.metastore;

import lombok.Builder;
import lombok.Data;

/**
* @author crossverJie
*/
@Data
@Builder
public class AbstractConfiguration<RETRY> {

private String metaServiceUri;
private int timeoutMs;
private RETRY retryPolicy;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.crossoverjie.cim.common.metastore;

import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
* @author crossoverJie
*/
public interface MetaStore {

void initialize(AbstractConfiguration<?> configuration) throws Exception;

/**
* Get available server list
* @return available server list
* @throws Exception exception
*/
Set<String> getAvailableServerList() throws Exception;

/**
* Add server to meta store
* @throws Exception exception
*/
void addServer(String ip, int cimServerPort, int httpPort) throws Exception;

/**
* Subscribe server list
* @param childListener child listener
* @throws Exception exception
*/
void listenServerList(ChildListener childListener) throws Exception;


/**
* @throws Exception
*/
void rebuildCache() throws Exception;

interface ChildListener {
/**
* Child changed
* @param parentPath parent path(eg. for zookeeper: [/cim])
* @param currentChildren current children
* @throws Exception exception
*/
void childChanged(String parentPath, List<String> currentChildren) throws Exception;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.crossoverjie.cim.common.metastore;

import org.apache.curator.RetryPolicy;

/**
* @author crossoverJie
*/
public class ZkConfiguration extends AbstractConfiguration<RetryPolicy> {
ZkConfiguration(String metaServiceUri, int timeout, RetryPolicy retryPolicy) {
super(metaServiceUri, timeout, retryPolicy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.crossoverjie.cim.common.metastore;

import com.crossoverjie.cim.common.pojo.RouteInfo;
import com.crossoverjie.cim.common.util.RouteInfoParseUtil;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;

/**
* @author crossovreJie
*/
@Slf4j
public class ZkMetaStoreImpl implements MetaStore {
public static final String ROOT = "/cim";

private ZkClient client;

LoadingCache<String, String> cache;

@Override
public void initialize(AbstractConfiguration<?> configuration) throws Exception {
// TODO: 2024/8/19 Change to set or caffeine?
cache = CacheBuilder.newBuilder()
.concurrencyLevel(3)
.build(new CacheLoader<>() {
@Override
public String load(String s) {
return null;
}
});
client = new ZkClient(configuration.getMetaServiceUri(), configuration.getTimeoutMs());
}

@Override
public Set<String> getAvailableServerList() throws Exception {
if (cache.size() > 0) {
return cache.asMap().keySet();
}
List<String> coll = client.getChildren(ROOT);
Map<String, String> voidMap = coll.stream().collect(Collectors.toMap(
Function.identity(),
Function.identity()
));
cache.putAll(voidMap);
return voidMap.keySet();
}

@Override
public void addServer(String ip, int cimServerPort, int httpPort) throws Exception {
boolean exists = client.exists(ROOT);
if (!exists) {
client.createPersistent(ROOT);
}
String zkParse = RouteInfoParseUtil.parse(RouteInfo.builder()
.ip(ip)
.cimServerPort(cimServerPort)
.httpPort(httpPort)
.build());
String serverPath = String.format("%s/%s", ROOT, zkParse);
client.createEphemeral(serverPath);
log.info("Add server to zk [{}]", serverPath);
}

@Override
public void listenServerList(ChildListener childListener) throws Exception {
client.subscribeChildChanges(ROOT, (parentPath, currentChildren) -> {
log.info("Clear and update local cache parentPath=[{}],currentChildren=[{}]", parentPath, currentChildren.toString());
childListener.childChanged(parentPath, currentChildren);

// TODO: 2024/8/19 maybe can reuse currentChildren.
// Because rebuildCache() will re-fetch the server list from zk.
rebuildCache();
});
}

@Override
public synchronized void rebuildCache() throws Exception {
cache.invalidateAll();

// Because of calling invalidateAll, this method will re-fetch the server list from zk.
this.getAvailableServerList();

}


private List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception {
/**
* Get children and set a watcher on the node. The watcher notification will come through the
* CuratorListener (see setDataAsync() above).
*/
return client.getChildren().watched().forPath(path);
}

private void createEphemeral(CuratorFramework client, String path, byte[] payload) throws Exception {
// this will create the given EPHEMERAL ZNode with the given data
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
}

private void create(CuratorFramework client, String path, byte[] payload) throws Exception {
// this will create the given ZNode with the given data
client.create().forPath(path, payload);
}

private void watchedGetChildren(CuratorFramework client, String path, Watcher watcher)
throws Exception {
// Get children and set the given watcher on the node.
client.getChildren().usingWatcher(watcher).forPath(path);
}
}
Original file line number Diff line number Diff line change
@@ -1,45 +1,22 @@
package com.crossoverjie.cim.common.pojo;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

/**
* Function:
*
* @author crossoverJie
* Date: 2020-04-12 20:48
* @since JDK 1.8
*/
@Data
@AllArgsConstructor
@Builder
public final class RouteInfo {

private String ip ;
private Integer cimServerPort;
private Integer httpPort;

public RouteInfo(String ip, Integer cimServerPort, Integer httpPort) {
this.ip = ip;
this.cimServerPort = cimServerPort;
this.httpPort = httpPort;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public Integer getCimServerPort() {
return cimServerPort;
}

public void setCimServerPort(Integer cimServerPort) {
this.cimServerPort = cimServerPort;
}

public Integer getHttpPort() {
return httpPort;
}

public void setHttpPort(Integer httpPort) {
this.httpPort = httpPort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@ public class RouteInfoParseUtil {
public static RouteInfo parse(String info){
try {
String[] serverInfo = info.split(":");
RouteInfo routeInfo = new RouteInfo(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2])) ;
return routeInfo ;
return new RouteInfo(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));
}catch (Exception e){
throw new CIMException(VALIDATION_FAIL) ;
}
}

public static String parse(RouteInfo routeInfo){
return routeInfo.getIp() + ":" + routeInfo.getCimServerPort() + ":" + routeInfo.getHttpPort();
}

private RouteInfoParseUtil() {
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package com.crossoverjie.cim.common;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.Test;

import java.time.LocalDate;
Expand Down Expand Up @@ -79,4 +85,19 @@ private void cycle() throws InterruptedException {
}
}

@Test
public void test3(){

List<String> coll = List.of("abc","def");
Set<String> set = new HashSet<>(coll);
System.out.println(set);
Map<String, Void> map = set.stream()
.collect(Collectors.toMap(
Function.identity(), // 将Set中的元素作为键
v -> null // 每个键对应的值为null
));
System.out.println(map);
}


}
Loading