diff --git a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java
index 3a5d5112b..7ef0bae24 100644
--- a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java
+++ b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java
@@ -1,6 +1,8 @@
package org.opensearch.migrations.testutils;
+import java.io.IOException;
+import java.net.ServerSocket;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
@@ -17,7 +19,6 @@ public class PortFinder {
private PortFinder() {}
private static final int MAX_PORT_TRIES = 100;
- private static final Random random = new Random();
public static class ExceededMaxPortAssigmentAttemptException extends Exception {
public ExceededMaxPortAssigmentAttemptException(Throwable cause) {
@@ -30,11 +31,11 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r)
int numTries = 0;
while (true) {
try {
- int port = random.nextInt((2 << 15) - 1025) + 1025;
- r.accept(Integer.valueOf(port));
+ int port = findOpenPort();
+ r.accept(port);
return port;
} catch (Exception e) {
- if (++numTries <= MAX_PORT_TRIES) {
+ if (++numTries >= MAX_PORT_TRIES) {
log.atError().setCause(e).setMessage(()->"Exceeded max tries {} giving up")
.addArgument(MAX_PORT_TRIES).log();
throw new ExceededMaxPortAssigmentAttemptException(e);
@@ -44,4 +45,14 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r)
}
}
+ public static int findOpenPort() {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ int port = serverSocket.getLocalPort();
+ log.info("Open port found: " + port);
+ return port;
+ } catch (IOException e) {
+ log.error("Failed to find an open port: " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
}
\ No newline at end of file
diff --git a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java
index a2bba88f2..13799499b 100644
--- a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java
+++ b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java
@@ -5,6 +5,8 @@
import lombok.AllArgsConstructor;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPut;
+import org.apache.hc.client5.http.config.ConnectionConfig;
+import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
@@ -19,6 +21,8 @@
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.ssl.SSLContexts;
+import org.apache.hc.core5.util.Timeout;
+
import java.nio.charset.Charset;
import java.io.IOException;
@@ -38,6 +42,9 @@
*/
public class SimpleHttpClientForTesting implements AutoCloseable {
+ private final static Timeout DEFAULT_RESPONSE_TIMEOUT = Timeout.ofSeconds(5);
+ private final static Timeout DEFAULT_CONNECTION_TIMEOUT = Timeout.ofSeconds(5);
+
private final CloseableHttpClient httpClient;
private static BasicHttpClientConnectionManager getInsecureTlsConnectionManager()
@@ -65,7 +72,15 @@ public SimpleHttpClientForTesting(boolean useTlsAndInsecurelyInsteadOfClearText)
}
private SimpleHttpClientForTesting(BasicHttpClientConnectionManager connectionManager) {
- httpClient = HttpClients.custom().setConnectionManager(connectionManager).build();
+ var requestConfig = RequestConfig.custom()
+ .setConnectionRequestTimeout(DEFAULT_CONNECTION_TIMEOUT)
+ .setResponseTimeout(DEFAULT_RESPONSE_TIMEOUT)
+ .build();
+
+ httpClient = HttpClients.custom()
+ .setConnectionManager(connectionManager)
+ .setDefaultRequestConfig(requestConfig)
+ .build();
}
@AllArgsConstructor
diff --git a/TrafficCapture/trafficCaptureProxyServer/build.gradle b/TrafficCapture/trafficCaptureProxyServer/build.gradle
index 74fd3dfe8..fdec3d131 100644
--- a/TrafficCapture/trafficCaptureProxyServer/build.gradle
+++ b/TrafficCapture/trafficCaptureProxyServer/build.gradle
@@ -41,6 +41,11 @@ dependencies {
testImplementation testFixtures(project(path: ':testUtilities'))
testImplementation testFixtures(project(path: ':captureOffloader'))
testImplementation testFixtures(project(path: ':coreUtilities'))
+ testImplementation group: 'eu.rekawek.toxiproxy', name: 'toxiproxy-java', version: '2.1.7'
+ testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.5'
+ testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.5'
+ testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.5'
+ testImplementation group: 'org.testcontainers', name: 'toxiproxy', version: '1.19.5'
}
tasks.withType(Tar){
diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/KafkaConfigurationCaptureProxyTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/KafkaConfigurationCaptureProxyTest.java
new file mode 100644
index 000000000..cfae95c80
--- /dev/null
+++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/KafkaConfigurationCaptureProxyTest.java
@@ -0,0 +1,217 @@
+package org.opensearch.migrations.trafficcapture.proxyserver;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import eu.rekawek.toxiproxy.Proxy;
+import eu.rekawek.toxiproxy.model.ToxicDirection;
+import java.io.IOException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.ThrowingConsumer;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.opensearch.migrations.testutils.SimpleHttpClientForTesting;
+import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.CaptureProxyContainer;
+import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.HttpdContainerTestBase;
+import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.KafkaContainerTestBase;
+import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.ToxiproxyContainerTestBase;
+import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.HttpdContainerTest;
+import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.KafkaContainerTest;
+import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.ToxiproxyContainerTest;
+
+@Slf4j
+@KafkaContainerTest
+@HttpdContainerTest
+@ToxiproxyContainerTest
+public class KafkaConfigurationCaptureProxyTest {
+
+ private static final KafkaContainerTestBase kafkaTestBase = new KafkaContainerTestBase();
+ private static final HttpdContainerTestBase httpdTestBase = new HttpdContainerTestBase();
+ private static final ToxiproxyContainerTestBase toxiproxyTestBase = new ToxiproxyContainerTestBase();
+ private static final String HTTPD_GET_EXPECTED_RESPONSE = "
It works!
\n";
+ private static final int DEFAULT_NUMBER_OF_CALLS = 3;
+ private static final long PROXY_EXPECTED_MAX_LATENCY_MS = Duration.ofSeconds(1).toMillis();
+ private Proxy kafkaProxy;
+ private Proxy destinationProxy;
+
+ @BeforeAll
+ public static void setUp() {
+ kafkaTestBase.start();
+ httpdTestBase.start();
+ toxiproxyTestBase.start();
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ kafkaTestBase.stop();
+ httpdTestBase.stop();
+ toxiproxyTestBase.stop();
+ }
+
+ private static void assertLessThan(long ceiling, long actual) {
+ Assertions.assertTrue(actual < ceiling,
+ () -> "Expected actual value to be less than " + ceiling + " but was " + actual + ".");
+ }
+
+ @BeforeEach
+ public void setUpTest() {
+ kafkaProxy = toxiproxyTestBase.getProxy(kafkaTestBase.getContainer());
+ destinationProxy = toxiproxyTestBase.getProxy(httpdTestBase.getContainer());
+ }
+
+ @AfterEach
+ public void tearDownTest() {
+ toxiproxyTestBase.deleteProxy(kafkaProxy);
+ toxiproxyTestBase.deleteProxy(destinationProxy);
+ }
+
+ @ParameterizedTest
+ @EnumSource(FailureMode.class)
+ @Disabled
+ // TODO: Fix proxy bug and enable test
+ public void testCaptureProxyWithKafkaImpairedBeforeStart(FailureMode failureMode) {
+ try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
+ toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
+ failureMode.apply(kafkaProxy);
+
+ captureProxy.start();
+
+ var latency = assertBasicCalls(captureProxy, DEFAULT_NUMBER_OF_CALLS);
+
+ assertLessThan(PROXY_EXPECTED_MAX_LATENCY_MS, latency.toMillis());
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(FailureMode.class)
+ public void testCaptureProxyWithKafkaImpairedAfterStart(FailureMode failureMode) {
+ try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
+ toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
+ captureProxy.start();
+
+ failureMode.apply(kafkaProxy);
+
+ var latency = assertBasicCalls(captureProxy, DEFAULT_NUMBER_OF_CALLS);
+
+ assertLessThan(PROXY_EXPECTED_MAX_LATENCY_MS, latency.toMillis());
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(FailureMode.class)
+ public void testCaptureProxyWithKafkaImpairedDoesNotAffectRequest_proxysRequest(FailureMode failureMode) {
+ try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
+ toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
+ captureProxy.start();
+ final int numberOfTests = 20;
+
+ // Performance is different for first few calls so throw them away
+ assertBasicCalls(captureProxy, 3);
+
+ var averageBaselineDuration = assertBasicCalls(captureProxy, numberOfTests);
+
+ failureMode.apply(kafkaProxy);
+
+ // Calculate average duration of impaired calls
+ var averageImpairedDuration = assertBasicCalls(captureProxy, numberOfTests);
+
+ long acceptableDifference = Duration.ofMillis(25).toMillis();
+
+ log.info("Baseline Duration: {}ms, Impaired Duration: {}ms", averageBaselineDuration.toMillis(),
+ averageImpairedDuration.toMillis());
+
+ assertEquals(averageBaselineDuration.toMillis(), averageImpairedDuration.toMillis(), acceptableDifference,
+ "The average durations are not close enough");
+ }
+ }
+
+ @Test
+ public void testCaptureProxyLatencyAddition() {
+ try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
+ toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
+ captureProxy.start();
+ final int numberOfTests = 25;
+
+ // Performance is different for first few calls so throw them away
+ assertBasicCalls(captureProxy, 3);
+
+ var averageRequestDurationWithProxy = assertBasicCalls(captureProxy, numberOfTests);
+
+ var averageNoProxyDuration = assertBasicCalls(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
+ numberOfTests);
+
+ var acceptableProxyLatencyAdd = Duration.ofMillis(25);
+
+ assertLessThan(averageNoProxyDuration.plus(acceptableProxyLatencyAdd).toMillis(),
+ averageRequestDurationWithProxy.toMillis());
+ }
+ }
+
+ private Duration assertBasicCalls(CaptureProxyContainer proxy, int numberOfCalls) {
+ return assertBasicCalls(CaptureProxyContainer.getUriFromContainer(proxy), numberOfCalls);
+ }
+
+ private Duration assertBasicCalls(String endpoint, int numberOfCalls) {
+ return IntStream.range(0, numberOfCalls).mapToObj(i -> assertBasicCall(endpoint))
+ .reduce(Duration.ZERO, Duration::plus).dividedBy(numberOfCalls);
+ }
+
+
+ private Duration assertBasicCall(String endpoint) {
+ try (var client = new SimpleHttpClientForTesting()) {
+ long startTimeNanos = System.nanoTime();
+ var response = client.makeGetRequest(URI.create(endpoint), Stream.empty());
+ long endTimeNanos = System.nanoTime();
+
+ var responseBody = new String(response.payloadBytes);
+ assertEquals(HTTPD_GET_EXPECTED_RESPONSE, responseBody);
+ return Duration.ofNanos(endTimeNanos - startTimeNanos);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public enum FailureMode {
+ LATENCY(
+ (proxy) -> proxy.toxics().latency("latency", ToxicDirection.UPSTREAM, 5000)),
+ BANDWIDTH(
+ (proxy) -> proxy.toxics().bandwidth("bandwidth", ToxicDirection.DOWNSTREAM, 1)),
+ TIMEOUT(
+ (proxy) -> proxy.toxics().timeout("timeout", ToxicDirection.UPSTREAM, 5000)),
+ SLICER(
+ (proxy) -> {
+ proxy.toxics().slicer("slicer_down", ToxicDirection.DOWNSTREAM, 1, 1000);
+ proxy.toxics().slicer("slicer_up", ToxicDirection.UPSTREAM, 1, 1000);
+ }),
+ SLOW_CLOSE(
+ (proxy) -> proxy.toxics().slowClose("slow_close", ToxicDirection.UPSTREAM, 5000)),
+ RESET_PEER(
+ (proxy) -> proxy.toxics().resetPeer("reset_peer", ToxicDirection.UPSTREAM, 5000)),
+ LIMIT_DATA(
+ (proxy) -> proxy.toxics().limitData("limit_data", ToxicDirection.UPSTREAM, 10)),
+ DISCONNECT(Proxy::disable);
+ private final ThrowingConsumer failureModeApplier;
+
+ FailureMode(ThrowingConsumer applier) {
+ this.failureModeApplier = applier;
+ }
+
+ public void apply(Proxy proxy) {
+ try {
+ this.failureModeApplier.accept(proxy);
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+ }
+}
diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java
new file mode 100644
index 000000000..ed9f44d67
--- /dev/null
+++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java
@@ -0,0 +1,134 @@
+package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import java.time.Duration;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.opensearch.migrations.testutils.PortFinder;
+import org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
+import org.testcontainers.lifecycle.Startable;
+
+@Slf4j
+public class CaptureProxyContainer
+ extends GenericContainer implements AutoCloseable, WaitStrategyTarget, Startable {
+
+ private static final Duration TIMEOUT_DURATION = Duration.ofSeconds(30);
+ private final Supplier destinationUriSupplier;
+ private final Supplier kafkaUriSupplier;
+ private Integer listeningPort;
+ private Thread serverThread;
+
+ public CaptureProxyContainer(final Supplier destinationUriSupplier,
+ final Supplier kafkaUriSupplier) {
+ this.destinationUriSupplier = destinationUriSupplier;
+ this.kafkaUriSupplier = kafkaUriSupplier;
+ }
+
+ public CaptureProxyContainer(final String destinationUri, final String kafkaUri) {
+ this.destinationUriSupplier = () -> destinationUri;
+ this.kafkaUriSupplier = () -> kafkaUri;
+ }
+
+ public CaptureProxyContainer(final Container> destination, final KafkaContainer kafka) {
+ this(() -> getUriFromContainer(destination), () -> getUriFromContainer(kafka));
+ }
+
+ public static String getUriFromContainer(final Container> container) {
+ return "http://" + container.getHost() + ":" + container.getFirstMappedPort();
+ }
+
+ @Override
+ public void start() {
+ this.listeningPort = PortFinder.findOpenPort();
+ serverThread = new Thread(() -> {
+ try {
+ String[] args = {
+ "--kafkaConnection", kafkaUriSupplier.get(),
+ "--destinationUri", destinationUriSupplier.get(),
+ "--listenPort", String.valueOf(listeningPort),
+ "--insecureDestination"
+ };
+
+ CaptureProxy.main(args);
+ } catch (Exception e) {
+ throw new AssertionError("Should not have exception", e);
+ }
+ });
+
+ serverThread.start();
+ new HttpWaitStrategy().forPort(listeningPort)
+ .withStartupTimeout(TIMEOUT_DURATION)
+ .waitUntilReady(this);
+ }
+
+ @Override
+ public boolean isRunning() {
+ return serverThread != null;
+ }
+
+ @Override
+ public void stop() {
+ if (serverThread != null) {
+ serverThread.interrupt();
+ this.serverThread = null;
+ }
+ this.listeningPort = null;
+ close();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Set getLivenessCheckPortNumbers() {
+ return getExposedPorts()
+ .stream()
+ .map(this::getMappedPort)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public String getHost() {
+ return "localhost";
+ }
+
+ @Override
+ public Integer getMappedPort(int originalPort) {
+ if (getExposedPorts().contains(originalPort)) {
+ return listeningPort;
+ }
+ return null;
+ }
+
+ @Override
+ public List getExposedPorts() {
+ // Internal and External ports are the same
+ return List.of(listeningPort);
+ }
+
+ @Override
+ public InspectContainerResponse getContainerInfo() {
+ return new InspectNonContainerResponse("captureProxy");
+ }
+
+ @AllArgsConstructor
+ static class InspectNonContainerResponse extends InspectContainerResponse {
+
+ private String name;
+
+ @Override
+ public String getName() {
+ return name;
+ }
+ }
+}
diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/HttpdContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/HttpdContainerTestBase.java
new file mode 100644
index 000000000..f16993eb7
--- /dev/null
+++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/HttpdContainerTestBase.java
@@ -0,0 +1,13 @@
+package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers;
+
+import org.testcontainers.containers.GenericContainer;
+
+public class HttpdContainerTestBase extends TestContainerTestBase> {
+
+ private static final GenericContainer> httpd = new GenericContainer("httpd:alpine")
+ .withExposedPorts(80); // Container Port
+
+ public GenericContainer> getContainer() {
+ return httpd;
+ }
+}
diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java
new file mode 100644
index 000000000..be90048b6
--- /dev/null
+++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java
@@ -0,0 +1,14 @@
+package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers;
+
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class KafkaContainerTestBase extends TestContainerTestBase {
+
+ private static final KafkaContainer kafka = new KafkaContainer(
+ DockerImageName.parse("confluentinc/cp-kafka:latest"));
+
+ public KafkaContainer getContainer() {
+ return kafka;
+ }
+}
diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/TestContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/TestContainerTestBase.java
new file mode 100644
index 000000000..0d1e23f50
--- /dev/null
+++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/TestContainerTestBase.java
@@ -0,0 +1,16 @@
+package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers;
+
+import org.testcontainers.containers.GenericContainer;
+
+abstract class TestContainerTestBase> {
+
+ public void start() {
+ getContainer().start();
+ }
+
+ public void stop() {
+ getContainer().start();
+ }
+
+ abstract T getContainer();
+}
diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/ToxiproxyContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/ToxiproxyContainerTestBase.java
new file mode 100644
index 000000000..de773a9b9
--- /dev/null
+++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/ToxiproxyContainerTestBase.java
@@ -0,0 +1,76 @@
+package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers;
+
+import eu.rekawek.toxiproxy.Proxy;
+import eu.rekawek.toxiproxy.ToxiproxyClient;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.ToxiproxyContainer;
+
+public class ToxiproxyContainerTestBase extends TestContainerTestBase {
+
+ private static final ToxiproxyContainer toxiproxy = new ToxiproxyContainer(
+ "ghcr.io/shopify/toxiproxy:latest")
+ .withAccessToHost(true);
+
+ final ConcurrentSkipListSet toxiproxyUnusedExposedPorts = new ConcurrentSkipListSet<>();
+
+ static int getListeningPort(Proxy proxy) {
+ return Integer.parseInt(proxy.getListen().replaceAll(".*:", ""));
+ }
+
+ public ToxiproxyContainer getContainer() {
+ return toxiproxy;
+ }
+
+ @Override
+ public void start() {
+ final int TOXIPROXY_CONTROL_PORT = 8474;
+ getContainer().start();
+ var concurrentPortSet = new HashSet<>(getContainer().getExposedPorts());
+ concurrentPortSet.remove(TOXIPROXY_CONTROL_PORT);
+ toxiproxyUnusedExposedPorts.addAll(concurrentPortSet);
+ }
+
+ @Override
+ public void stop() {
+ toxiproxyUnusedExposedPorts.clear();
+ getContainer().stop();
+ }
+
+ public void deleteProxy(Proxy proxy) {
+ var proxyPort = getListeningPort(proxy);
+ try {
+ proxy.delete();
+ toxiproxyUnusedExposedPorts.add(proxyPort);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Proxy getProxy(GenericContainer> container) {
+ var containerPort = container.getFirstMappedPort();
+ final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(),
+ getContainer().getControlPort());
+ org.testcontainers.Testcontainers.exposeHostPorts(containerPort);
+ try {
+ var containerName = (container.getDockerImageName() + "_" + container.getContainerName() + "_"
+ + Thread.currentThread().getId()).replaceAll("[^a-zA-Z0-9_]+", "_");
+ synchronized (toxiproxyUnusedExposedPorts) {
+ var proxyPort = toxiproxyUnusedExposedPorts.first();
+ var proxy = toxiproxyClient.createProxy(containerName, "0.0.0.0:" + proxyPort,
+ "host.testcontainers.internal" + ":" + containerPort);
+ toxiproxyUnusedExposedPorts.remove(proxyPort);
+ proxy.enable();
+ return proxy;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getProxyUrlHttp(Proxy proxy) {
+ return "http://" + getContainer().getHost() + ":" + getContainer().getMappedPort(getListeningPort(proxy));
+ }
+}
\ No newline at end of file
diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/HttpdContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/HttpdContainerTest.java
new file mode 100644
index 000000000..719069bae
--- /dev/null
+++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/HttpdContainerTest.java
@@ -0,0 +1,11 @@
+package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations;
+
+import java.lang.annotation.Inherited;
+import org.junit.jupiter.api.parallel.ResourceLock;
+
+@Inherited
+@ResourceLock("HttpdContainer")
+@TestContainerTest
+public @interface HttpdContainerTest {
+
+}
diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/KafkaContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/KafkaContainerTest.java
new file mode 100644
index 000000000..30c5a2cd2
--- /dev/null
+++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/KafkaContainerTest.java
@@ -0,0 +1,11 @@
+package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations;
+
+import java.lang.annotation.Inherited;
+import org.junit.jupiter.api.parallel.ResourceLock;
+
+@Inherited
+@ResourceLock("KafkaContainer")
+@TestContainerTest
+public @interface KafkaContainerTest {
+
+}
diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java
new file mode 100644
index 000000000..1163dff3e
--- /dev/null
+++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java
@@ -0,0 +1,11 @@
+package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations;
+
+import java.lang.annotation.Inherited;
+import org.junit.jupiter.api.Tag;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Inherited
+@Tag("longTest")
+@Testcontainers(disabledWithoutDocker = true, parallel = true)
+public @interface TestContainerTest {
+}
diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/ToxiproxyContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/ToxiproxyContainerTest.java
new file mode 100644
index 000000000..85146b080
--- /dev/null
+++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/ToxiproxyContainerTest.java
@@ -0,0 +1,10 @@
+package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations;
+
+import java.lang.annotation.Inherited;
+import org.junit.jupiter.api.parallel.ResourceLock;
+
+@Inherited
+@ResourceLock("ToxiproxyContainerTestBase")
+@TestContainerTest
+public @interface ToxiproxyContainerTest {
+}
diff --git a/TrafficCapture/trafficReplayer/build.gradle b/TrafficCapture/trafficReplayer/build.gradle
index 368eb1901..4e0a52e03 100644
--- a/TrafficCapture/trafficReplayer/build.gradle
+++ b/TrafficCapture/trafficReplayer/build.gradle
@@ -82,9 +82,9 @@ dependencies {
testImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2.1'
testImplementation group: 'org.junit.jupiter', name:'junit-jupiter-api', version:'5.x.x'
- testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.0'
- testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.0'
- testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.0'
+ testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.5'
+ testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.5'
+ testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.5'
testImplementation group: 'org.mockito', name:'mockito-core', version:'4.6.1'
testImplementation group: 'org.mockito', name:'mockito-junit-jupiter', version:'4.6.1'
testRuntimeOnly group:'org.junit.jupiter', name:'junit-jupiter-engine', version:'5.x.x'