Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement TestContainers Unit Tests with CaptureProxy #508

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.opensearch.migrations.testutils;


import java.io.IOException;
import java.net.ServerSocket;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;
Expand All @@ -17,7 +19,6 @@ public class PortFinder {
private PortFinder() {}

private static final int MAX_PORT_TRIES = 100;
private static final Random random = new Random();

public static class ExceededMaxPortAssigmentAttemptException extends Exception {
public ExceededMaxPortAssigmentAttemptException(Throwable cause) {
Expand All @@ -30,11 +31,11 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r)
int numTries = 0;
while (true) {
try {
int port = random.nextInt((2 << 15) - 1025) + 1025;
r.accept(Integer.valueOf(port));
int port = findOpenPort();
r.accept(port);
return port;
} catch (Exception e) {
if (++numTries <= MAX_PORT_TRIES) {
if (++numTries >= MAX_PORT_TRIES) {
log.atError().setCause(e).setMessage(()->"Exceeded max tries {} giving up")
.addArgument(MAX_PORT_TRIES).log();
throw new ExceededMaxPortAssigmentAttemptException(e);
Expand All @@ -44,4 +45,14 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r)
}
}

public static int findOpenPort() {
try (ServerSocket serverSocket = new ServerSocket(0)) {
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
int port = serverSocket.getLocalPort();
log.info("Open port found: " + port);
return port;
} catch (IOException e) {
log.error("Failed to find an open port: " + e.getMessage());
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import lombok.AllArgsConstructor;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
Expand All @@ -19,6 +21,8 @@
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.Timeout;

import java.nio.charset.Charset;

import java.io.IOException;
Expand All @@ -38,6 +42,9 @@
*/
public class SimpleHttpClientForTesting implements AutoCloseable {

private final static Timeout DEFAULT_RESPONSE_TIMEOUT = Timeout.ofSeconds(5);
private final static Timeout DEFAULT_CONNECTION_TIMEOUT = Timeout.ofSeconds(5);

private final CloseableHttpClient httpClient;

private static BasicHttpClientConnectionManager getInsecureTlsConnectionManager()
Expand Down Expand Up @@ -65,7 +72,15 @@ public SimpleHttpClientForTesting(boolean useTlsAndInsecurelyInsteadOfClearText)
}

private SimpleHttpClientForTesting(BasicHttpClientConnectionManager connectionManager) {
httpClient = HttpClients.custom().setConnectionManager(connectionManager).build();
var requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(DEFAULT_CONNECTION_TIMEOUT)
.setResponseTimeout(DEFAULT_RESPONSE_TIMEOUT)
.build();

httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
.build();
}

@AllArgsConstructor
Expand Down
5 changes: 5 additions & 0 deletions TrafficCapture/trafficCaptureProxyServer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ dependencies {
testImplementation testFixtures(project(path: ':testUtilities'))
testImplementation testFixtures(project(path: ':captureOffloader'))
testImplementation testFixtures(project(path: ':coreUtilities'))
testImplementation group: 'eu.rekawek.toxiproxy', name: 'toxiproxy-java', version: '2.1.7'
testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.5'
testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.5'
testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.5'
testImplementation group: 'org.testcontainers', name: 'toxiproxy', version: '1.19.5'
}

tasks.withType(Tar){
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package org.opensearch.migrations.trafficcapture.proxyserver;

import static org.junit.jupiter.api.Assertions.assertEquals;

import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.ThrowingConsumer;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.opensearch.migrations.testutils.SimpleHttpClientForTesting;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.CaptureProxyContainer;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.HttpdContainerTestBase;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.KafkaContainerTestBase;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.ToxiproxyContainerTestBase;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.HttpdContainerTest;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.KafkaContainerTest;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.ToxiproxyContainerTest;

@Slf4j
@KafkaContainerTest
@HttpdContainerTest
@ToxiproxyContainerTest
public class KafkaConfigurationCaptureProxyTest {

private static final KafkaContainerTestBase kafkaTestBase = new KafkaContainerTestBase();
private static final HttpdContainerTestBase httpdTestBase = new HttpdContainerTestBase();
private static final ToxiproxyContainerTestBase toxiproxyTestBase = new ToxiproxyContainerTestBase();
private static final String HTTPD_GET_EXPECTED_RESPONSE = "<html><body><h1>It works!</h1></body></html>\n";
private static final int DEFAULT_NUMBER_OF_CALLS = 3;
private static final long PROXY_EXPECTED_MAX_LATENCY_MS = Duration.ofSeconds(1).toMillis();
private Proxy kafkaProxy;
private Proxy destinationProxy;

@BeforeAll
public static void setUp() {
kafkaTestBase.start();
httpdTestBase.start();
toxiproxyTestBase.start();
}

@AfterAll
public static void tearDown() {
kafkaTestBase.stop();
httpdTestBase.stop();
toxiproxyTestBase.stop();
}

private static void assertLessThan(long ceiling, long actual) {
Assertions.assertTrue(actual < ceiling,
() -> "Expected actual value to be less than " + ceiling + " but was " + actual + ".");
}

@BeforeEach
public void setUpTest() {
kafkaProxy = toxiproxyTestBase.getProxy(kafkaTestBase.getContainer());
destinationProxy = toxiproxyTestBase.getProxy(httpdTestBase.getContainer());
}

@AfterEach
public void tearDownTest() {
toxiproxyTestBase.deleteProxy(kafkaProxy);
toxiproxyTestBase.deleteProxy(destinationProxy);
}

@ParameterizedTest
@EnumSource(FailureMode.class)
@Disabled
// TODO: Fix proxy bug and enable test
public void testCaptureProxyWithKafkaImpairedBeforeStart(FailureMode failureMode) {
try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
failureMode.apply(kafkaProxy);

captureProxy.start();

var latency = assertBasicCalls(captureProxy, DEFAULT_NUMBER_OF_CALLS);

assertLessThan(PROXY_EXPECTED_MAX_LATENCY_MS, latency.toMillis());
}
}

@ParameterizedTest
@EnumSource(FailureMode.class)
public void testCaptureProxyWithKafkaImpairedAfterStart(FailureMode failureMode) {
try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
captureProxy.start();

failureMode.apply(kafkaProxy);

var latency = assertBasicCalls(captureProxy, DEFAULT_NUMBER_OF_CALLS);

assertLessThan(PROXY_EXPECTED_MAX_LATENCY_MS, latency.toMillis());
}
}

@ParameterizedTest
@EnumSource(FailureMode.class)
public void testCaptureProxyWithKafkaImpairedDoesNotAffectRequest_proxysRequest(FailureMode failureMode) {
try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
captureProxy.start();
final int numberOfTests = 20;

// Performance is different for first few calls so throw them away
assertBasicCalls(captureProxy, 3);

var averageBaselineDuration = assertBasicCalls(captureProxy, numberOfTests);

failureMode.apply(kafkaProxy);

// Calculate average duration of impaired calls
var averageImpairedDuration = assertBasicCalls(captureProxy, numberOfTests);

long acceptableDifference = Duration.ofMillis(25).toMillis();

log.info("Baseline Duration: {}ms, Impaired Duration: {}ms", averageBaselineDuration.toMillis(),
averageImpairedDuration.toMillis());

assertEquals(averageBaselineDuration.toMillis(), averageImpairedDuration.toMillis(), acceptableDifference,
"The average durations are not close enough");
}
}

@Test
public void testCaptureProxyLatencyAddition() {
try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
captureProxy.start();
final int numberOfTests = 25;

// Performance is different for first few calls so throw them away
assertBasicCalls(captureProxy, 3);

var averageRequestDurationWithProxy = assertBasicCalls(captureProxy, numberOfTests);

var averageNoProxyDuration = assertBasicCalls(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
numberOfTests);

var acceptableProxyLatencyAdd = Duration.ofMillis(25);

assertLessThan(averageNoProxyDuration.plus(acceptableProxyLatencyAdd).toMillis(),
averageRequestDurationWithProxy.toMillis());
}
}

private Duration assertBasicCalls(CaptureProxyContainer proxy, int numberOfCalls) {
return assertBasicCalls(CaptureProxyContainer.getUriFromContainer(proxy), numberOfCalls);
}

private Duration assertBasicCalls(String endpoint, int numberOfCalls) {
return IntStream.range(0, numberOfCalls).mapToObj(i -> assertBasicCall(endpoint))
.reduce(Duration.ZERO, Duration::plus).dividedBy(numberOfCalls);
}


private Duration assertBasicCall(String endpoint) {
try (var client = new SimpleHttpClientForTesting()) {
long startTimeNanos = System.nanoTime();
var response = client.makeGetRequest(URI.create(endpoint), Stream.empty());
long endTimeNanos = System.nanoTime();

var responseBody = new String(response.payloadBytes);
assertEquals(HTTPD_GET_EXPECTED_RESPONSE, responseBody);
return Duration.ofNanos(endTimeNanos - startTimeNanos);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public enum FailureMode {
LATENCY(
(proxy) -> proxy.toxics().latency("latency", ToxicDirection.UPSTREAM, 5000)),
BANDWIDTH(
(proxy) -> proxy.toxics().bandwidth("bandwidth", ToxicDirection.DOWNSTREAM, 1)),
TIMEOUT(
(proxy) -> proxy.toxics().timeout("timeout", ToxicDirection.UPSTREAM, 5000)),
SLICER(
(proxy) -> {
proxy.toxics().slicer("slicer_down", ToxicDirection.DOWNSTREAM, 1, 1000);
proxy.toxics().slicer("slicer_up", ToxicDirection.UPSTREAM, 1, 1000);
}),
SLOW_CLOSE(
(proxy) -> proxy.toxics().slowClose("slow_close", ToxicDirection.UPSTREAM, 5000)),
RESET_PEER(
(proxy) -> proxy.toxics().resetPeer("reset_peer", ToxicDirection.UPSTREAM, 5000)),
LIMIT_DATA(
(proxy) -> proxy.toxics().limitData("limit_data", ToxicDirection.UPSTREAM, 10)),
DISCONNECT(Proxy::disable);
private final ThrowingConsumer<Proxy> failureModeApplier;

FailureMode(ThrowingConsumer<Proxy> applier) {
this.failureModeApplier = applier;
}

public void apply(Proxy proxy) {
try {
this.failureModeApplier.accept(proxy);
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
}
}
Loading
Loading