diff --git a/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java b/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java new file mode 100644 index 00000000000..a97eb8f8757 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java @@ -0,0 +1,332 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package io.grpc.xds; + +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_LDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS; +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import com.google.protobuf.UInt32Value; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.core.v3.Address; +import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TrafficDirection; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.envoyproxy.envoy.config.listener.v3.ApiListener; +import io.envoyproxy.envoy.config.listener.v3.Filter; +import io.envoyproxy.envoy.config.listener.v3.FilterChain; +import io.envoyproxy.envoy.config.listener.v3.FilterChainMatch; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.config.route.v3.NonForwardingAction; +import io.envoyproxy.envoy.config.route.v3.Route; +import io.envoyproxy.envoy.config.route.v3.RouteAction; +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.envoyproxy.envoy.config.route.v3.RouteMatch; +import io.envoyproxy.envoy.config.route.v3.VirtualHost; +import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.InsecureServerCredentials; +import io.grpc.ManagedChannel; +import io.grpc.NameResolverRegistry; +import io.grpc.Server; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.protobuf.SimpleRequest; +import io.grpc.testing.protobuf.SimpleResponse; +import io.grpc.testing.protobuf.SimpleServiceGrpc; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Xds integration tests using a local control plane, implemented in + * {@link XdsTestControlPlaneService}. + * Test cases can inject xds configs to the control plane for testing. + */ +@RunWith(JUnit4.class) +public class FakeControlPlaneXdsIntegrationTest { + private static final Logger logger = + Logger.getLogger(FakeControlPlaneXdsIntegrationTest.class.getName()); + + protected int testServerPort = 0; + protected int controlPlaneServicePort; + private Server server; + private Server controlPlane; + private XdsTestControlPlaneService controlPlaneService; + protected SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub; + private XdsNameResolverProvider nameResolverProvider; + private static final String scheme = "test-xds"; + private static final String SERVER_LISTENER_TEMPLATE_NO_REPLACEMENT = + "grpc/server?udpa.resource.listening_address="; + private static final String rdsName = "route-config.googleapis.com"; + private static final String clusterName = "cluster0"; + private static final String edsName = "eds-service-0"; + private static final String HTTP_CONNECTION_MANAGER_TYPE_URL = + "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3" + + ".HttpConnectionManager"; + + /** + * For test purpose, use boostrapOverride to programmatically provide bootstrap info. + */ + private Map defaultBootstrapOverride() { + return ImmutableMap.of( + "node", ImmutableMap.of( + "id", UUID.randomUUID().toString(), + "cluster", "cluster0"), + "xds_servers", Collections.singletonList( + + ImmutableMap.of( + "server_uri", "localhost:" + controlPlaneServicePort, + "channel_creds", Collections.singletonList( + ImmutableMap.of("type", "insecure") + ), + "server_features", Collections.singletonList("xds_v3") + ) + ), + "server_listener_resource_name_template", SERVER_LISTENER_TEMPLATE_NO_REPLACEMENT + ); + } + + /** + * 1. Start control plane server and get control plane port. + * 2. Start xdsServer using no replacement server template, because we do not know the server + * port yet. Then get the server port. + * 3. Update control plane config using the port in 2 for necessary rds and eds resources to set + * up client and server communication for test cases. + * */ + @Before + public void setUp() throws Exception { + startControlPlane(); + nameResolverProvider = XdsNameResolverProvider.createForTest(scheme, + defaultBootstrapOverride()); + NameResolverRegistry.getDefaultRegistry().register(nameResolverProvider); + } + + @After + public void tearDown() throws Exception { + if (server != null) { + server.shutdownNow(); + if (!server.awaitTermination(5, TimeUnit.SECONDS)) { + logger.log(Level.SEVERE, "Timed out waiting for server shutdown"); + } + } + if (controlPlane != null) { + controlPlane.shutdownNow(); + if (!controlPlane.awaitTermination(5, TimeUnit.SECONDS)) { + logger.log(Level.SEVERE, "Timed out waiting for server shutdown"); + } + } + NameResolverRegistry.getDefaultRegistry().deregister(nameResolverProvider); + } + + @Test + public void pingPong() throws Exception { + String tcpListenerName = SERVER_LISTENER_TEMPLATE_NO_REPLACEMENT; + String serverHostName = "test-server"; + controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of( + tcpListenerName, serverListener(tcpListenerName), + serverHostName, clientListener(serverHostName) + )); + startServer(defaultBootstrapOverride()); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_RDS, + ImmutableMap.of(rdsName, rds(serverHostName))); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, + ImmutableMap.of(clusterName, cds())); + InetSocketAddress edsInetSocketAddress = (InetSocketAddress) server.getListenSockets().get(0); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, + ImmutableMap.of(edsName, eds(edsInetSocketAddress.getHostName(), + edsInetSocketAddress.getPort()))); + ManagedChannel channel = Grpc.newChannelBuilder(scheme + ":///" + serverHostName, + InsecureChannelCredentials.create()).build(); + blockingStub = SimpleServiceGrpc.newBlockingStub(channel); + SimpleRequest request = SimpleRequest.newBuilder() + .build(); + SimpleResponse goldenResponse = SimpleResponse.newBuilder() + .setResponseMessage("Hi, xDS!") + .build(); + assertEquals(goldenResponse, blockingStub.unaryRpc(request)); + } + + private void startServer(Map bootstrapOverride) throws Exception { + SimpleServiceGrpc.SimpleServiceImplBase simpleServiceImpl = + new SimpleServiceGrpc.SimpleServiceImplBase() { + @Override + public void unaryRpc( + SimpleRequest request, StreamObserver responseObserver) { + SimpleResponse response = + SimpleResponse.newBuilder().setResponseMessage("Hi, xDS!").build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + }; + + XdsServerBuilder serverBuilder = XdsServerBuilder.forPort( + 0, InsecureServerCredentials.create()) + .addService(simpleServiceImpl) + .overrideBootstrapForTest(bootstrapOverride); + server = serverBuilder.build().start(); + testServerPort = server.getPort(); + logger.log(Level.FINE, "server started"); + } + + private void startControlPlane() throws Exception { + controlPlaneService = new XdsTestControlPlaneService(); + NettyServerBuilder controlPlaneServerBuilder = + NettyServerBuilder.forPort(0) + .addService(controlPlaneService); + controlPlane = controlPlaneServerBuilder.build().start(); + controlPlaneServicePort = controlPlane.getPort(); + } + + private static Listener clientListener(String name) { + HttpFilter httpFilter = HttpFilter.newBuilder() + .setName("terminal-filter") + .setTypedConfig(Any.pack(Router.newBuilder().build())) + .setIsOptional(true) + .build(); + ApiListener apiListener = ApiListener.newBuilder().setApiListener(Any.pack( + io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3 + .HttpConnectionManager.newBuilder() + .setRds( + Rds.newBuilder() + .setRouteConfigName(rdsName) + .setConfigSource( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance()))) + .addAllHttpFilters(Collections.singletonList(httpFilter)) + .build(), + HTTP_CONNECTION_MANAGER_TYPE_URL)).build(); + Listener listener = Listener.newBuilder() + .setName(name) + .setApiListener(apiListener).build(); + return listener; + } + + private static Listener serverListener(String name) { + HttpFilter routerFilter = HttpFilter.newBuilder() + .setName("terminal-filter") + .setTypedConfig( + Any.pack(Router.newBuilder().build())) + .setIsOptional(true) + .build(); + VirtualHost virtualHost = io.envoyproxy.envoy.config.route.v3.VirtualHost.newBuilder() + .setName("virtual-host-0") + .addDomains("*") + .addRoutes( + Route.newBuilder() + .setMatch( + RouteMatch.newBuilder().setPrefix("/").build()) + .setNonForwardingAction(NonForwardingAction.newBuilder().build()) + .build()).build(); + RouteConfiguration routeConfig = RouteConfiguration.newBuilder() + .addVirtualHosts(virtualHost) + .build(); + Filter filter = Filter.newBuilder() + .setName("network-filter-0") + .setTypedConfig( + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(routeConfig) + .addAllHttpFilters(Collections.singletonList(routerFilter)) + .build())).build(); + FilterChainMatch filterChainMatch = FilterChainMatch.newBuilder() + .setSourceType(FilterChainMatch.ConnectionSourceType.ANY) + .build(); + FilterChain filterChain = FilterChain.newBuilder() + .setName("filter-chain-0") + .setFilterChainMatch(filterChainMatch) + .addFilters(filter) + .build(); + return Listener.newBuilder() + .setName(name) + .setTrafficDirection(TrafficDirection.INBOUND) + .addFilterChains(filterChain) + .build(); + } + + private static RouteConfiguration rds(String authority) { + VirtualHost virtualHost = VirtualHost.newBuilder() + .addDomains(authority) + .addRoutes( + Route.newBuilder() + .setMatch( + RouteMatch.newBuilder().setPrefix("/").build()) + .setRoute( + RouteAction.newBuilder().setCluster(clusterName).build()).build()).build(); + return RouteConfiguration.newBuilder().setName(rdsName).addVirtualHosts(virtualHost).build(); + } + + private static Cluster cds() { + return Cluster.newBuilder() + .setName(clusterName) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig( + Cluster.EdsClusterConfig.newBuilder() + .setServiceName(edsName) + .setEdsConfig( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder().build()) + .build()) + .build()) + .setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN) + .build(); + } + + private static ClusterLoadAssignment eds(String hostName, int port) { + Address address = Address.newBuilder() + .setSocketAddress( + SocketAddress.newBuilder().setAddress(hostName).setPortValue(port).build()).build(); + LocalityLbEndpoints endpoints = LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(10)) + .setPriority(0) + .addLbEndpoints( + LbEndpoint.newBuilder() + .setEndpoint( + Endpoint.newBuilder().setAddress(address).build()) + .setHealthStatus(HealthStatus.HEALTHY) + .build()).build(); + return ClusterLoadAssignment.newBuilder() + .setClusterName(edsName) + .addEndpoints(endpoints) + .build(); + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java b/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java new file mode 100644 index 00000000000..90c6b87cfd0 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java @@ -0,0 +1,180 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package io.grpc.xds; + +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.grpc.SynchronizationContext; +import io.grpc.stub.StreamObserver; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +final class XdsTestControlPlaneService extends + AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase { + private static final Logger logger = Logger.getLogger(XdsTestControlPlaneService.class.getName()); + + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + + static final String ADS_TYPE_URL_LDS = + "type.googleapis.com/envoy.config.listener.v3.Listener"; + static final String ADS_TYPE_URL_RDS = + "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"; + static final String ADS_TYPE_URL_CDS = + "type.googleapis.com/envoy.config.cluster.v3.Cluster"; + static final String ADS_TYPE_URL_EDS = + "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; + + private final Map> xdsResources = new HashMap<>(); + private ImmutableMap, Set>> subscribers + = ImmutableMap.of( + ADS_TYPE_URL_LDS, new HashMap, Set>(), + ADS_TYPE_URL_RDS, new HashMap, Set>(), + ADS_TYPE_URL_CDS, new HashMap, Set>(), + ADS_TYPE_URL_EDS, new HashMap, Set>() + ); + private final ImmutableMap xdsVersions = ImmutableMap.of( + ADS_TYPE_URL_LDS, new AtomicInteger(1), + ADS_TYPE_URL_RDS, new AtomicInteger(1), + ADS_TYPE_URL_CDS, new AtomicInteger(1), + ADS_TYPE_URL_EDS, new AtomicInteger(1) + ); + private final ImmutableMap, AtomicInteger>> + xdsNonces = ImmutableMap.of( + ADS_TYPE_URL_LDS, new HashMap, AtomicInteger>(), + ADS_TYPE_URL_RDS, new HashMap, AtomicInteger>(), + ADS_TYPE_URL_CDS, new HashMap, AtomicInteger>(), + ADS_TYPE_URL_EDS, new HashMap, AtomicInteger>() + ); + + + // treat all the resource types as state-of-the-world, send back all resources of a particular + // type when any of them change. + public void setXdsConfig(final String type, final Map resources) { + logger.log(Level.FINE, "setting config {0} {1}", new Object[]{type, resources}); + syncContext.execute(new Runnable() { + @SuppressWarnings("unchecked") + @Override + public void run() { + HashMap copyResources = new HashMap<>(resources); + xdsResources.put(type, copyResources); + String newVersionInfo = String.valueOf(xdsVersions.get(type).getAndDecrement()); + + for (Map.Entry, Set> entry : + subscribers.get(type).entrySet()) { + DiscoveryResponse response = generateResponse(type, newVersionInfo, + String.valueOf(xdsNonces.get(type).get(entry.getKey()).incrementAndGet()), + entry.getValue()); + entry.getKey().onNext(response); + } + } + }); + } + + @Override + public StreamObserver streamAggregatedResources( + final StreamObserver responseObserver) { + final StreamObserver requestObserver = + new StreamObserver() { + @Override + public void onNext(final DiscoveryRequest value) { + syncContext.execute(new Runnable() { + @Override + public void run() { + logger.log(Level.FINEST, "control plane received request {0}", value); + if (value.hasErrorDetail()) { + logger.log(Level.FINE, "control plane received nack resource {0}, error {1}", + new Object[]{value.getResourceNamesList(), value.getErrorDetail()}); + return; + } + String resourceType = value.getTypeUrl(); + if (!value.getResponseNonce().isEmpty() + && !String.valueOf(xdsNonces.get(resourceType)).equals(value.getResponseNonce())) { + logger.log(Level.FINE, "Resource nonce does not match, ignore."); + return; + } + Set requestedResourceNames = new HashSet<>(value.getResourceNamesList()); + if (subscribers.get(resourceType).containsKey(responseObserver) + && subscribers.get(resourceType).get(responseObserver) + .equals(requestedResourceNames)) { + logger.log(Level.FINEST, "control plane received ack for resource: {0}", + value.getResourceNamesList()); + return; + } + if (!xdsNonces.get(resourceType).containsKey(responseObserver)) { + xdsNonces.get(resourceType).put(responseObserver, new AtomicInteger(0)); + } + DiscoveryResponse response = generateResponse(resourceType, + String.valueOf(xdsVersions.get(resourceType)), + String.valueOf(xdsNonces.get(resourceType).get(responseObserver)), + requestedResourceNames); + responseObserver.onNext(response); + subscribers.get(resourceType).put(responseObserver, requestedResourceNames); + } + }); + } + + @Override + public void onError(Throwable t) { + logger.log(Level.FINE, "Control plane error: {0} ", t); + onCompleted(); + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + for (String type : subscribers.keySet()) { + subscribers.get(type).remove(responseObserver); + xdsNonces.get(type).remove(responseObserver); + } + } + }; + return requestObserver; + } + + //must run in syncContext + private DiscoveryResponse generateResponse(String resourceType, String version, String nonce, + Set resourceNames) { + DiscoveryResponse.Builder responseBuilder = DiscoveryResponse.newBuilder() + .setTypeUrl(resourceType) + .setVersionInfo(version) + .setNonce(nonce); + for (String resourceName: resourceNames) { + if (xdsResources.containsKey(resourceType) + && xdsResources.get(resourceType).containsKey(resourceName)) { + responseBuilder.addResources(Any.pack(xdsResources.get(resourceType).get(resourceName), + resourceType)); + } + } + return responseBuilder.build(); + } +}