Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
wind57 committed Mar 19, 2024
2 parents 88021bb + cc43ce5 commit dcf4e62
Show file tree
Hide file tree
Showing 9 changed files with 670 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.kubernetes.commons.KubernetesNamespaceProvider;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
import org.springframework.cloud.kubernetes.commons.loadbalancer.KubernetesServiceInstanceMapper;
import org.springframework.cloud.kubernetes.commons.loadbalancer.KubernetesServicesListSupplier;
import org.springframework.cloud.kubernetes.fabric8.Fabric8Utils;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
Expand Down Expand Up @@ -64,15 +65,30 @@ public Flux<List<ServiceInstance>> get() {
LOG.debug(() -> "discovering services in all namespaces");
List<Service> services = kubernetesClient.services().inAnyNamespace()
.withField("metadata.name", serviceName).list().getItems();
services.forEach(service -> result.add(mapper.map(service)));
services.forEach(service -> addMappedService(mapper, result, service));
}
else if (!discoveryProperties.namespaces().isEmpty()) {
List<String> selectiveNamespaces = discoveryProperties.namespaces().stream().sorted().toList();
LOG.debug(() -> "discovering services in selective namespaces : " + selectiveNamespaces);
selectiveNamespaces.forEach(selectiveNamespace -> {
Service service = kubernetesClient.services().inNamespace(selectiveNamespace).withName(serviceName)
.get();
if (service != null) {
addMappedService(mapper, result, service);
}
else {
LOG.debug(() -> "did not find service with name : " + serviceName + " in namespace : "
+ selectiveNamespace);
}
});
}
else {
String namespace = Fabric8Utils.getApplicationNamespace(kubernetesClient, null, "loadbalancer-service",
namespaceProvider);
LOG.debug(() -> "discovering services in namespace : " + namespace);
Service service = kubernetesClient.services().inNamespace(namespace).withName(serviceName).get();
if (service != null) {
result.add(mapper.map(service));
addMappedService(mapper, result, service);
}
else {
LOG.debug(() -> "did not find service with name : " + serviceName + " in namespace : " + namespace);
Expand All @@ -83,4 +99,9 @@ public Flux<List<ServiceInstance>> get() {
return Flux.defer(() -> Flux.just(result));
}

private void addMappedService(KubernetesServiceInstanceMapper<Service> mapper, List<ServiceInstance> services,
Service service) {
services.add(mapper.map(service));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,18 @@ void testAllNamespaces(CapturedOutput output) {

List<List<ServiceInstance>> serviceInstances = supplier.get().collectList().block();
Assertions.assertEquals(serviceInstances.size(), 1);
List<ServiceInstance> inner = serviceInstances.get(0);

List<ServiceInstance> serviceInstancesSorted = serviceInstances.get(0).stream()
.sorted(Comparator.comparing(ServiceInstance::getServiceId)).toList();
Assertions.assertEquals(serviceInstancesSorted.size(), 2);
Assertions.assertEquals(inner.get(0).getServiceId(), "service-a");
Assertions.assertEquals(inner.get(0).getHost(), "service-a.a.svc.cluster.local");
Assertions.assertEquals(inner.get(0).getPort(), 8887);

Assertions.assertEquals(inner.get(1).getServiceId(), "service-a");
Assertions.assertEquals(inner.get(1).getHost(), "service-a.c.svc.cluster.local");
Assertions.assertEquals(inner.get(1).getPort(), 8889);
Assertions.assertEquals(serviceInstancesSorted.get(0).getServiceId(), "service-a");
Assertions.assertEquals(serviceInstancesSorted.get(0).getHost(), "service-a.a.svc.cluster.local");
Assertions.assertEquals(serviceInstancesSorted.get(0).getPort(), 8887);

Assertions.assertEquals(serviceInstancesSorted.get(1).getServiceId(), "service-a");
Assertions.assertEquals(serviceInstancesSorted.get(1).getHost(), "service-a.c.svc.cluster.local");
Assertions.assertEquals(serviceInstancesSorted.get(1).getPort(), 8889);

Assertions.assertTrue(output.getOut().contains("discovering services in all namespaces"));
}
Expand Down Expand Up @@ -138,6 +138,42 @@ void testOneNamespace(CapturedOutput output) {
Assertions.assertTrue(output.getOut().contains("discovering services in namespace : c"));
}

@Test
void testSelectiveNamespaces(CapturedOutput output) {

createService("a", "my-service", 8887);
createService("b", "my-service", 8888);
createService("c", "my-service", 8889);

Environment environment = new MockEnvironment().withProperty("loadbalancer.client.name", "my-service");
boolean allNamespaces = false;
Set<String> selectiveNamespaces = Set.of("a", "b");

KubernetesLoadBalancerProperties loadBalancerProperties = new KubernetesLoadBalancerProperties();
KubernetesDiscoveryProperties discoveryProperties = new KubernetesDiscoveryProperties(true, allNamespaces,
selectiveNamespaces, true, 60, false, null, Set.of(), Map.of(), null,
KubernetesDiscoveryProperties.Metadata.DEFAULT, 0, false, false, null);

Fabric8ServicesListSupplier supplier = new Fabric8ServicesListSupplier(environment, mockClient,
new Fabric8ServiceInstanceMapper(loadBalancerProperties, discoveryProperties), discoveryProperties);

List<List<ServiceInstance>> serviceInstances = supplier.get().collectList().block();
Assertions.assertEquals(serviceInstances.size(), 1);

List<ServiceInstance> serviceInstancesSorted = serviceInstances.get(0).stream()
.sorted(Comparator.comparing(ServiceInstance::getPort)).toList();
Assertions.assertEquals(serviceInstancesSorted.size(), 2);
Assertions.assertEquals(serviceInstancesSorted.get(0).getServiceId(), "my-service");
Assertions.assertEquals(serviceInstancesSorted.get(0).getHost(), "my-service.a.svc.cluster.local");
Assertions.assertEquals(serviceInstancesSorted.get(0).getPort(), 8887);

Assertions.assertEquals(serviceInstancesSorted.get(1).getServiceId(), "my-service");
Assertions.assertEquals(serviceInstancesSorted.get(1).getHost(), "my-service.b.svc.cluster.local");
Assertions.assertEquals(serviceInstancesSorted.get(1).getPort(), 8888);

Assertions.assertTrue(output.getOut().contains("discovering services in selective namespaces : [a, b]"));
}

private void createService(String namespace, String name, int port) {
Service service = new ServiceBuilder().withNewMetadata().withNamespace(namespace).withName(name).endMetadata()
.withSpec(new ServiceSpecBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,65 @@

package org.springframework.cloud.kubernetes.fabric8.loadbalancer.it;

import io.fabric8.kubernetes.api.model.EndpointAddressBuilder;
import io.fabric8.kubernetes.api.model.EndpointPortBuilder;
import io.fabric8.kubernetes.api.model.EndpointSubsetBuilder;
import io.fabric8.kubernetes.api.model.Endpoints;
import io.fabric8.kubernetes.api.model.EndpointsBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
import io.fabric8.kubernetes.api.model.ServicePortBuilder;
import io.fabric8.kubernetes.api.model.ServiceSpecBuilder;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.client.WebClient;

/**
* @author wind57
*/
final class Util {
public final class Util {

private Util() {

}

static Service createService(String namespace, String name, int port) {
public static Service service(String namespace, String name, int port) {
return new ServiceBuilder().withNewMetadata().withNamespace(namespace).withName(name).endMetadata()
.withSpec(new ServiceSpecBuilder()
.withPorts(new ServicePortBuilder().withName("http").withPort(port).build()).build())
.build();
}

public static Endpoints endpoints(int port, String host, String namespace) {
return new EndpointsBuilder()
.withSubsets(new EndpointSubsetBuilder().withPorts(new EndpointPortBuilder().withPort(port).build())
.withAddresses(new EndpointAddressBuilder().withIp(host).build()).build())
.withMetadata(new ObjectMetaBuilder().withName("random-name").withNamespace(namespace).build()).build();
}

@TestConfiguration
public static class LoadBalancerConfiguration {

@Bean
@LoadBalanced
WebClient.Builder client() {
return WebClient.builder();
}

}

@SpringBootApplication
public static class Configuration {

public static void main(String[] args) {
SpringApplication.run(Configuration.class);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,12 @@
* limitations under the License.
*/

package org.springframework.cloud.kubernetes.fabric8.loadbalancer.it;
package org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.mode.pod;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import io.fabric8.kubernetes.api.model.EndpointAddressBuilder;
import io.fabric8.kubernetes.api.model.EndpointPortBuilder;
import io.fabric8.kubernetes.api.model.EndpointSubsetBuilder;
import io.fabric8.kubernetes.api.model.Endpoints;
import io.fabric8.kubernetes.api.model.EndpointsBuilder;
import io.fabric8.kubernetes.api.model.EndpointsListBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.utils.Serialization;
Expand All @@ -37,23 +32,19 @@

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.kubernetes.commons.loadbalancer.KubernetesServiceInstanceMapper;
import org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.Util;
import org.springframework.cloud.loadbalancer.core.CachingServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.DiscoveryClientServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.web.reactive.function.client.WebClient;

import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import static org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.PodModeAllNamespacesTest.Configuration;
import static org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.PodModeAllNamespacesTest.LoadBalancerConfiguration;
import static org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.Util.Configuration;
import static org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.Util.LoadBalancerConfiguration;

/**
* @author wind57
Expand All @@ -62,7 +53,7 @@
properties = { "spring.cloud.kubernetes.loadbalancer.mode=POD", "spring.main.cloud-platform=KUBERNETES",
"spring.cloud.kubernetes.discovery.all-namespaces=true" },
classes = { LoadBalancerConfiguration.class, Configuration.class })
class PodModeAllNamespacesTest {
class AllNamespacesTest {

private static final String SERVICE_A_URL = "http://service-a";

Expand Down Expand Up @@ -137,22 +128,11 @@ static void afterAll() {
@Test
void test() {

Service serviceA = Util.createService("a", "service-a", SERVICE_A_PORT);
Service serviceB = Util.createService("b", "service-b", SERVICE_B_PORT);
Service serviceA = Util.service("a", "service-a", SERVICE_A_PORT);
Service serviceB = Util.service("b", "service-b", SERVICE_B_PORT);

Endpoints endpointsA = new EndpointsBuilder()
.withSubsets(new EndpointSubsetBuilder()
.withPorts(new EndpointPortBuilder().withPort(SERVICE_A_PORT).build())
.withAddresses(new EndpointAddressBuilder().withIp("127.0.0.1").build()).build())
.withMetadata(new ObjectMetaBuilder().withName("no-port-name-service").withNamespace("a").build())
.build();

Endpoints endpointsB = new EndpointsBuilder()
.withSubsets(new EndpointSubsetBuilder()
.withPorts(new EndpointPortBuilder().withPort(SERVICE_B_PORT).build())
.withAddresses(new EndpointAddressBuilder().withIp("127.0.0.1").build()).build())
.withMetadata(new ObjectMetaBuilder().withName("no-port-name-service").withNamespace("b").build())
.build();
Endpoints endpointsA = Util.endpoints(SERVICE_A_PORT, "127.0.0.1", "a");
Endpoints endpointsB = Util.endpoints(SERVICE_B_PORT, "127.0.0.1", "b");

String endpointsAListAsString = Serialization.asJson(new EndpointsListBuilder().withItems(endpointsA).build());
String endpointsBListAsString = Serialization.asJson(new EndpointsListBuilder().withItems(endpointsB).build());
Expand Down Expand Up @@ -192,26 +172,18 @@ void test() {
.getIfAvailable().getProvider("service-a", ServiceInstanceListSupplier.class).getIfAvailable();
Assertions.assertThat(supplier.getDelegate().getClass())
.isSameAs(DiscoveryClientServiceInstanceListSupplier.class);
}

@TestConfiguration
static class LoadBalancerConfiguration {

@Bean
@LoadBalanced
WebClient.Builder client() {
return WebClient.builder();
}

}
wireMockServer.verify(WireMock.exactly(1), WireMock
.getRequestedFor(WireMock.urlEqualTo("/api/v1/endpoints?fieldSelector=metadata.name%3Dservice-a")));

@SpringBootApplication
static class Configuration {
wireMockServer.verify(WireMock.exactly(1), WireMock
.getRequestedFor(WireMock.urlEqualTo("/api/v1/endpoints?fieldSelector=metadata.name%3Dservice-b")));

public static void main(String[] args) {
SpringApplication.run(ServiceModeAllNamespacesTest.Configuration.class);
}
wireMockServer.verify(WireMock.exactly(1),
WireMock.getRequestedFor(WireMock.urlEqualTo("/api/v1/namespaces/a/services/service-a")));

wireMockServer.verify(WireMock.exactly(1),
WireMock.getRequestedFor(WireMock.urlEqualTo("/api/v1/namespaces/b/services/service-b")));
}

}
Loading

0 comments on commit dcf4e62

Please sign in to comment.