Skip to content

Commit

Permalink
Implement TestContainers Unit Tests with CaptureProxy with the KafkaC…
Browse files Browse the repository at this point in the history
…onfigurationProxyTest

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Feb 12, 2024
1 parent ffbb46e commit a9d5d4e
Show file tree
Hide file tree
Showing 6 changed files with 431 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.opensearch.migrations.testutils;


import java.io.IOException;
import java.net.ServerSocket;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;
Expand All @@ -17,7 +19,6 @@ public class PortFinder {
private PortFinder() {}

private static final int MAX_PORT_TRIES = 100;
private static final Random random = new Random();

public static class ExceededMaxPortAssigmentAttemptException extends Exception {
public ExceededMaxPortAssigmentAttemptException(Throwable cause) {
Expand All @@ -30,11 +31,11 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r)
int numTries = 0;
while (true) {
try {
int port = random.nextInt((2 << 15) - 1025) + 1025;
r.accept(Integer.valueOf(port));
int port = findOpenPort();
r.accept(port);
return port;
} catch (Exception e) {
if (++numTries <= MAX_PORT_TRIES) {
if (++numTries >= MAX_PORT_TRIES) {
log.atError().setCause(e).setMessage(()->"Exceeded max tries {} giving up")
.addArgument(MAX_PORT_TRIES).log();
throw new ExceededMaxPortAssigmentAttemptException(e);
Expand All @@ -44,4 +45,14 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r)
}
}

public static int findOpenPort() {
try (ServerSocket serverSocket = new ServerSocket(0)) {
int port = serverSocket.getLocalPort();
log.info("Open port found: " + port);
return port;
} catch (IOException e) {
log.error("Failed to find an open port: " + e.getMessage());
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import lombok.AllArgsConstructor;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
Expand All @@ -19,6 +21,8 @@
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.Timeout;

import java.nio.charset.Charset;

import java.io.IOException;
Expand All @@ -38,6 +42,9 @@
*/
public class SimpleHttpClientForTesting implements AutoCloseable {

private final static Timeout DEFAULT_RESPONSE_TIMEOUT = Timeout.ofSeconds(5);
private final static Timeout DEFAULT_CONNECTION_TIMEOUT = Timeout.ofSeconds(5);

private final CloseableHttpClient httpClient;

private static BasicHttpClientConnectionManager getInsecureTlsConnectionManager()
Expand Down Expand Up @@ -65,7 +72,15 @@ public SimpleHttpClientForTesting(boolean useTlsAndInsecurelyInsteadOfClearText)
}

private SimpleHttpClientForTesting(BasicHttpClientConnectionManager connectionManager) {
httpClient = HttpClients.custom().setConnectionManager(connectionManager).build();
var requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(DEFAULT_CONNECTION_TIMEOUT)
.setResponseTimeout(DEFAULT_RESPONSE_TIMEOUT)
.build();

httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
.build();
}

@AllArgsConstructor
Expand Down
5 changes: 5 additions & 0 deletions TrafficCapture/trafficCaptureProxyServer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ dependencies {
testImplementation testFixtures(project(path: ':testUtilities'))
testImplementation testFixtures(project(path: ':captureOffloader'))
testImplementation testFixtures(project(path: ':coreUtilities'))
testImplementation group: 'eu.rekawek.toxiproxy', name: 'toxiproxy-java', version: '2.1.7'
testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.0'
testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.0'
testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.0'
testImplementation group: 'org.testcontainers', name: 'toxiproxy', version: '1.19.0'
}

tasks.withType(Tar){
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package org.opensearch.migrations.trafficcapture.proxyserver;

import com.github.dockerjava.api.command.InspectContainerResponse;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.testutils.PortFinder;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
import org.testcontainers.lifecycle.Startable;

@Slf4j
public class CaptureProxyContainer
extends GenericContainer implements AutoCloseable, WaitStrategyTarget, Startable {

private static final Duration TIMEOUT_DURATION = Duration.ofSeconds(30);
private final Supplier<String> destinationUriSupplier;
private final Supplier<String> kafkaUriSupplier;
private Integer listeningPort;
private Thread serverThread;

public CaptureProxyContainer(final Supplier<String> destinationUriSupplier,
final Supplier<String> kafkaUriSupplier) {
this.destinationUriSupplier = destinationUriSupplier;
this.kafkaUriSupplier = kafkaUriSupplier;
}

public CaptureProxyContainer(final String destinationUri, final String kafkaUri) {
this.destinationUriSupplier = () -> destinationUri;
this.kafkaUriSupplier = () -> kafkaUri;
}

public CaptureProxyContainer(final Container<?> destination, final KafkaContainer kafka) {
this(() -> getUriFromContainer(destination), () -> getUriFromContainer(kafka));
}

public static String getUriFromContainer(final Container<?> container) {
return "http://" + container.getHost() + ":" + container.getFirstMappedPort();
}

@Override
public void start() {
this.listeningPort = PortFinder.findOpenPort();
serverThread = new Thread(() -> {
try {
String[] args = {
"--kafkaConnection", kafkaUriSupplier.get(),
"--destinationUri", destinationUriSupplier.get(),
"--listenPort", String.valueOf(listeningPort),
"--insecureDestination"
};

CaptureProxy.main(args);
} catch (Exception e) {
throw new AssertionError("Should not have exception", e);
}
});

serverThread.start();
new HttpWaitStrategy().forPort(listeningPort)
.withStartupTimeout(TIMEOUT_DURATION)
.waitUntilReady(this);
}

@Override
public boolean isRunning() {
return serverThread != null;
}

@Override
public void stop() {
if (serverThread != null) {
serverThread.interrupt();
this.serverThread = null;
}
this.listeningPort = null;
close();
}

@Override
public void close() {
}

@Override
public Set<Integer> getLivenessCheckPortNumbers() {
return getExposedPorts()
.stream()
.map(this::getMappedPort)
.collect(Collectors.toSet());
}

@Override
public String getHost() {
return "localhost";
}

@Override
public Integer getMappedPort(int originalPort) {
if (getExposedPorts().contains(originalPort)) {
return listeningPort;
}
return null;
}

@Override
public List<Integer> getExposedPorts() {
// Internal and External ports are the same
return List.of(listeningPort);
}

@Override
public InspectContainerResponse getContainerInfo() {
return new InspectNonContainerResponse("captureProxy");
}

@AllArgsConstructor
static class InspectNonContainerResponse extends InspectContainerResponse {

private String name;

@Override
public String getName() {
return name;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.opensearch.migrations.trafficcapture.proxyserver;

import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import java.io.IOException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

@Testcontainers(disabledWithoutDocker = true)
public class ContainerTestBase {

private final ToxiproxyContainer toxiproxy = new ToxiproxyContainer(
"ghcr.io/shopify/toxiproxy:latest").withAccessToHost(true);
private final int DESTINATION_PROXY_PORT = 8666;
private final int KAFKA_PROXY_PORT = 8667;
public Proxy kafkaProxy = null;
public Proxy destinationProxy = null;
public String kafkaProxyUrl;
public String destinationProxyUrl;

@BeforeEach
public void setUp() throws IOException {

var kafkaHostPort = ReusableContainerTestSetup.kafka.getFirstMappedPort();
var destinationHostPort = ReusableContainerTestSetup.destination.getFirstMappedPort();

toxiproxy.start();

org.testcontainers.Testcontainers.exposeHostPorts(kafkaHostPort);
org.testcontainers.Testcontainers.exposeHostPorts(destinationHostPort);


final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(),
toxiproxy.getControlPort());

kafkaProxy = toxiproxyClient.createProxy("kafka", "0.0.0.0:" + KAFKA_PROXY_PORT,
"host.testcontainers.internal" + ":" + kafkaHostPort);
destinationProxy = toxiproxyClient.createProxy("destination",
"0.0.0.0:" + DESTINATION_PROXY_PORT,
"host.testcontainers.internal" + ":" + destinationHostPort);

kafkaProxyUrl =
"http://" + toxiproxy.getHost() + ":" + toxiproxy.getMappedPort(KAFKA_PROXY_PORT);
destinationProxyUrl =
"http://" + toxiproxy.getHost() + ":" + toxiproxy.getMappedPort(DESTINATION_PROXY_PORT);

kafkaProxy.enable();
destinationProxy.enable();
}

@AfterEach
public void tearDown() {
if (kafkaProxy != null) {
try {
kafkaProxy.delete();
} catch (IOException ignored) {}
kafkaProxy = null;
}
if (destinationProxy != null) {
try {
destinationProxy.delete();
} catch (IOException ignored) {}
destinationProxy = null;
}
}

public static class ReusableContainerTestSetup {

static private final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:latest"));
static private final GenericContainer<?> destination = new GenericContainer(
"httpd:alpine").withExposedPorts(80); // Container Port

static {
Startables.deepStart(kafka, destination).join();
}
}
}
Loading

0 comments on commit a9d5d4e

Please sign in to comment.