Skip to content

Commit

Permalink
feat: reduce conflicts when update configmap in k8s apolloconfig#89
Browse files Browse the repository at this point in the history
  • Loading branch information
24kpure committed Dec 9, 2024
1 parent e64c35b commit 45d2cce
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@
*/
package com.ctrip.framework.apollo.kubernetes;

import com.ctrip.framework.apollo.build.ApolloInjector;
import com.ctrip.framework.apollo.core.utils.StringUtils;
import com.ctrip.framework.apollo.exceptions.ApolloConfigException;
import com.ctrip.framework.apollo.util.ConfigUtil;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1Node;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -39,11 +45,14 @@ public class KubernetesManager {

private ApiClient client;
private CoreV1Api coreV1Api;
private int propertyKubernetesMaxWritePods;

public KubernetesManager() {
try {
client = Config.defaultClient();
coreV1Api = new CoreV1Api(client);
ConfigUtil configUtil = ApolloInjector.getInstance(ConfigUtil.class);
propertyKubernetesMaxWritePods = configUtil.getPropertyKubernetesMaxWritePods();
} catch (Exception e) {
String errorMessage = "Failed to initialize Kubernetes client: " + e.getMessage();
logger.error(errorMessage, e);
Expand Down Expand Up @@ -132,6 +141,10 @@ public boolean updateConfigMap(String k8sNamespace, String name, Map<String, Str
return false;
}

if (!isWritePod(k8sNamespace)) {
return true;
}

int maxRetries = 5;
int retryCount = 0;
long waitTime = 100;
Expand Down Expand Up @@ -205,4 +218,41 @@ public boolean checkConfigMapExist(String k8sNamespace, String configMapName) {
return false;
}
}

/**
* check pod whether pod can write configmap
*
* @param k8sNamespace config map namespace
* @return true if this pod can write configmap, false otherwise
*/
private boolean isWritePod(String k8sNamespace) {
try {
String localPodName = System.getenv("HOSTNAME");
V1Node localNode = coreV1Api.readNode(localPodName, null);
V1ObjectMeta localMetadata = localNode.getMetadata();
if (localMetadata == null || localMetadata.getLabels() == null) {
return true;
}
String appName = localMetadata.getLabels().get("app");
String labelSelector = "app=" + appName;

V1PodList v1PodList = coreV1Api.listNamespacedPod(k8sNamespace, null, null,
null, null, labelSelector,
null, null, null
, null, null);

return v1PodList.getItems().stream()
.map(V1Pod::getMetadata)
.filter(Objects::nonNull)
//Make each node selects the same write nodes by sorting
.filter(metadata -> metadata.getCreationTimestamp() != null)
.sorted(Comparator.comparing(V1ObjectMeta::getCreationTimestamp))
.map(V1ObjectMeta::getName)
.limit(propertyKubernetesMaxWritePods)
.anyMatch(localPodName::equals);
} catch (Exception e) {
logger.info("Select write nodes error:{}", e.getMessage(), e);
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ public class ConfigUtil {
private boolean propertyFileCacheEnabled = true;
private boolean overrideSystemProperties = true;
private boolean propertyKubernetesCacheEnabled = false;
private int propertyKubernetesMaxWritePods = 3;
private boolean clientMonitorEnabled = false;
private boolean clientMonitorJmxEnabled = false;
private String monitorExternalType = "";
private long monitorExternalExportPeriod = 10;
private int monitorExceptionQueueSize = 25;


public ConfigUtil() {
warnLogRateLimiter = RateLimiter.create(0.017); // 1 warning log output per minute
initRefreshInterval();
Expand All @@ -93,6 +95,7 @@ public ConfigUtil() {
initPropertyFileCacheEnabled();
initOverrideSystemProperties();
initPropertyKubernetesCacheEnabled();
initPropertyKubernetesMaxWritePods();
initClientMonitorEnabled();
initClientMonitorJmxEnabled();
initClientMonitorExternalType();
Expand Down Expand Up @@ -390,31 +393,44 @@ private String getDeprecatedCustomizedCacheRoot() {
}

public String getK8sNamespace() {
String k8sNamespace = getCacheKubernetesNamespace();
return getK8sConfigProperties(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE,
ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE_ENVIRONMENT_VARIABLES,
ConfigConsts.KUBERNETES_CACHE_CONFIG_MAP_NAMESPACE_DEFAULT);
}

if (!Strings.isNullOrEmpty(k8sNamespace)) {
return k8sNamespace;
private void initPropertyKubernetesMaxWritePods() {
String propertyKubernetesMaxWritePodsStr = getK8sConfigProperties(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_MAX_WRITE_PODS,
ApolloClientSystemConsts.APOLLO_CACHE_MAX_WRITE_PODS_ENVIRONMENT_VARIABLES,
String.valueOf(propertyKubernetesMaxWritePods));
if (!Strings.isNullOrEmpty(propertyKubernetesMaxWritePodsStr)) {
try {
propertyKubernetesMaxWritePods = Integer.parseInt(propertyKubernetesMaxWritePodsStr);
} catch (Throwable ex) {
logger.error("Config for {} is invalid: {}",
ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE, propertyKubernetesMaxWritePodsStr);
}
}

return ConfigConsts.KUBERNETES_CACHE_CONFIG_MAP_NAMESPACE_DEFAULT;
}

private String getCacheKubernetesNamespace() {
private String getK8sConfigProperties(String key, String environmentKey, String defaultValue) {
// 1. Get from System Property
String k8sNamespace = System.getProperty(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE);
String k8sNamespace = System.getProperty(key);
if (Strings.isNullOrEmpty(k8sNamespace)) {
// 2. Get from OS environment variable
k8sNamespace = System.getenv(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE_ENVIRONMENT_VARIABLES);
k8sNamespace = System.getenv(environmentKey);
}
if (Strings.isNullOrEmpty(k8sNamespace)) {
// 3. Get from server.properties
k8sNamespace = Foundation.server().getProperty(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE, null);
k8sNamespace = Foundation.server().getProperty(key, null);
}
if (Strings.isNullOrEmpty(k8sNamespace)) {
// 4. Get from app.properties
k8sNamespace = Foundation.app().getProperty(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE, null);
k8sNamespace = Foundation.app().getProperty(key, null);
}
if (!Strings.isNullOrEmpty(k8sNamespace)) {
return k8sNamespace;
}
return k8sNamespace;
return defaultValue;
}

public boolean isInLocalMode() {
Expand Down Expand Up @@ -524,6 +540,10 @@ public boolean isPropertyKubernetesCacheEnabled() {
return propertyKubernetesCacheEnabled;
}

public int getPropertyKubernetesMaxWritePods() {
return propertyKubernetesMaxWritePods;
}

public boolean isOverrideSystemProperties() {
return overrideSystemProperties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,32 @@
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1Node;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class KubernetesManagerTest {

Expand Down Expand Up @@ -135,20 +152,37 @@ public void testUpdateConfigMapSuccess() throws Exception {
// arrange
String namespace = "default";
String name = "testConfigMap";

V1Node node = new V1Node()
.metadata(
new V1ObjectMeta()
.creationTimestamp(OffsetDateTime.now())
.labels(Collections.singletonMap("app", "app")));
V1PodList v1PodList = new V1PodList().addItemsItem(new V1Pod().metadata(node.getMetadata()));

Map<String, String> data = new HashMap<>();
data.put("key", "value");
V1ConfigMap configMap = new V1ConfigMap();
configMap.metadata(new V1ObjectMeta().name(name).namespace(namespace));
configMap.data(data);

when(coreV1Api.readNode(null, null)).thenReturn(node);
when(coreV1Api.listNamespacedPod(namespace, null, null,
null, null, "app=app",
null, null, null
, null, null)).thenReturn(v1PodList);
when(coreV1Api.readNamespacedConfigMap(name, namespace, null)).thenReturn(configMap);
when(coreV1Api.replaceNamespacedConfigMap(name, namespace, configMap, null, null, null, null)).thenReturn(configMap);

// act
Boolean success = kubernetesManager.updateConfigMap(namespace, name, data);
boolean success = kubernetesManager.updateConfigMap(namespace, name, data);

// assert
assertTrue(success);
Mockito.verify(coreV1Api, Mockito.times(1)).listNamespacedPod(namespace, null, null,
null, null, "app=app",
null, null, null
, null, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ public class ApolloClientSystemConsts {
*/
public static final String APOLLO_CACHE_KUBERNETES_NAMESPACE_ENVIRONMENT_VARIABLES = "APOLLO_CACHE_KUBERNETES_NAMESPACE";

/**
* max number of pods that can write the configmap cache in Kubernetes
*/
public static final String APOLLO_CACHE_KUBERNETES_MAX_WRITE_PODS = "apollo.cache.kubernetes.max-write-pods";

/**
* max number of pods that can write the configmap cache in Kubernetes environment variables
*/
public static final String APOLLO_CACHE_MAX_WRITE_PODS_ENVIRONMENT_VARIABLES = "APOLLO_CACHE_MAX_WRITE_PODS_NAMESPACE";

/**
* apollo client access key
*/
Expand Down

0 comments on commit 45d2cce

Please sign in to comment.