Skip to content

Commit

Permalink
support sofa registry kubernetes
Browse files Browse the repository at this point in the history
  • Loading branch information
呈铭 committed Mar 21, 2024
1 parent 3feb714 commit cc38025
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.annotation.VisibleForTesting;
import com.alipay.sofa.rpc.common.json.JSON;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
Expand Down Expand Up @@ -65,9 +64,6 @@ public class KubernetesRegistry extends Registry {

private KubernetesRegistryProviderWatcher kubernetesRegistryProviderWatcher;

// {dataId:ProviderConfig}
private final ConcurrentHashMap<String, ProviderConfig> providerInstances = new ConcurrentHashMap<>(64);

private final ConcurrentMap<ConsumerConfig, SharedIndexInformer<Pod>> consumerListeners = new ConcurrentHashMap<>(64);

/**
Expand All @@ -80,7 +76,7 @@ public KubernetesRegistry(RegistryConfig registryConfig) {
}

@Override
public void init() {
public synchronized void init() {
// init kubernetes config
Config config = KubernetesConfigUtils.buildKubernetesConfig(registryConfig);
// init kubernetes client
Expand Down Expand Up @@ -120,12 +116,13 @@ public void register(ProviderConfig config) {
if (CommonUtils.isNotEmpty(serverConfigs)) {
for (ServerConfig serverConfig : serverConfigs) {
String dataId = KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol());
providerInstances.put(dataId, config);
// 对外提供服务的URL
String url = KubernetesRegistryHelper.convertToUrl(podResource.get(), serverConfig, config);

podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
// 将ProviderConfig存在Annotations上
.addToAnnotations(dataId, JSON.toJSONString(config, true))
// 为了过滤而不是订阅全部pod
.addToAnnotations(dataId, url)
// 为了过滤pod、其实value是用不到的
.addToLabels(dataId, "")
.endMetadata().build());
}
Expand All @@ -152,7 +149,6 @@ public void unRegister(ProviderConfig config) {
if (CommonUtils.isNotEmpty(serverConfigs)) {
for (ServerConfig serverConfig : serverConfigs) {
String dataId = KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol());
providerInstances.remove(dataId);

podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
.removeFromAnnotations(dataId)
Expand Down Expand Up @@ -255,16 +251,11 @@ public void batchUnSubscribe(List<ConsumerConfig> configs) {

@Override
public void destroy() {
// unRegister provider
providerInstances.forEach((k, v) -> unRegister(v));

// unRegister consumer
consumerListeners.forEach((k, v) -> unSubscribe(k));

// close kubernetes client
if (CommonUtils.isEmpty(providerInstances)) {
kubernetesClient.close();
}
kubernetesClient.close();
}

private List<Pod> getPods() {
Expand All @@ -289,12 +280,4 @@ public void setCurrentHostname(String currentHostname) {
public ConcurrentMap<ConsumerConfig, SharedIndexInformer<Pod>> getConsumerListeners() {
return consumerListeners;
}

/**
* UT used only
*/
@VisibleForTesting
public ConcurrentMap<String, ProviderConfig> getProviderInstances() {
return providerInstances;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.alipay.sofa.rpc.client.ProviderHelper;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.json.JSON;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.AbstractInterfaceConfig;
Expand Down Expand Up @@ -47,22 +46,17 @@ public static List<ProviderInfo> convertPodsToProviders(List<Pod> pods, Consumer
}

for (Pod pod : pods) {
ProviderConfig providerConfig = getProviderConfig(pod, config);
if (null == providerConfig || null == providerConfig.getServer()) {
ProviderInfo providerInfo = getProviderInfo(pod, config);
if (null == providerInfo) {
continue;
}

providerConfig.getServer().forEach(serverConfig -> {
String url = convertInstanceToUrl(pod, (ServerConfig) serverConfig, providerConfig);
ProviderInfo providerInfo = ProviderHelper.toProviderInfo(url);
providerInfos.add(providerInfo);
});
providerInfos.add(providerInfo);
}

return providerInfos;
}

private static String convertInstanceToUrl(Pod pod, ServerConfig serverConfig, ProviderConfig providerConfig) {
public static String convertToUrl(Pod pod, ServerConfig serverConfig, ProviderConfig providerConfig) {
String uri = "";
String protocol = serverConfig.getProtocol();
if (StringUtils.isNotEmpty(protocol)) {
Expand All @@ -82,21 +76,15 @@ private static String convertInstanceToUrl(Pod pod, ServerConfig serverConfig, P
return uri;
}

private static ProviderConfig getProviderConfig(Pod pod, ConsumerConfig config) {
private static ProviderInfo getProviderInfo(Pod pod, ConsumerConfig config) {
try {
String dataId = buildDataId(config, config.getProtocol());
String providerConfigStr = pod.getMetadata().getAnnotations().get(dataId);
if (StringUtils.isBlank(providerConfigStr)) {
return null;
}
String providerUrlString = pod.getMetadata().getAnnotations().get(dataId);

ProviderConfig providerConfig = JSON.parseObject(providerConfigStr, ProviderConfig.class);

if (null == providerConfig) {
if (StringUtils.isBlank(providerUrlString)) {
return null;
}

return providerConfig;
return ProviderHelper.toProviderInfo(providerUrlString);
} catch (Exception e) {
LOGGER.info("get provider config error with pod");
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.json.JSON;
import com.alipay.sofa.rpc.config.ApplicationConfig;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
Expand Down Expand Up @@ -199,8 +198,6 @@ public void testAll() throws InterruptedException {
// 注册第二个providerConfig2
kubernetesRegistry.register(providerConfig2);

Assert.assertEquals(2, kubernetesRegistry.getProviderInstances().size());

List<Pod> items = mockClient.pods().inNamespace(NAMESPACE).list().getItems();

Assert.assertEquals(1, items.size());
Expand Down Expand Up @@ -248,7 +245,7 @@ public void testAll() throws InterruptedException {
latch.await(5000 * 2, TimeUnit.MILLISECONDS);
Assert.assertTrue(ps.size() > 0);
Assert.assertNotNull(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP));
Assert.assertEquals(2, ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP).size());
Assert.assertEquals(1, ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP).size());

// 反订阅
kubernetesRegistry.unSubscribe(consumer);
Expand All @@ -260,11 +257,7 @@ public void testAll() throws InterruptedException {
kubernetesRegistry.unRegister(providerConfig2);

List<Pod> unRegisterItems = mockClient.pods().inNamespace(NAMESPACE).list().getItems();
Assert.assertEquals(0, kubernetesRegistry.getProviderInstances().size());
String unRegisterAnnotationsBolt = unRegisterItems.get(0).getMetadata().getAnnotations().get(buildDataId(providerConfig1, "bolt"));
Assert.assertNull(unRegisterAnnotationsBolt);
String unRegisterAnnotationsH2c = unRegisterItems.get(0).getMetadata().getAnnotations().get(buildDataId(providerConfig2, "h2c"));
Assert.assertNull(unRegisterAnnotationsH2c);
Assert.assertEquals(0, unRegisterItems.get(0).getMetadata().getAnnotations().size());
}

private static class MockProviderInfoListener implements ProviderInfoListener {
Expand Down

0 comments on commit cc38025

Please sign in to comment.