Skip to content

Commit

Permalink
adjust PublishingTimeoutTest to new framework
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Nov 30, 2023
1 parent 23753a6 commit 27eaea3
Show file tree
Hide file tree
Showing 14 changed files with 779 additions and 488 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package pl.allegro.tech.hermes.integrationtests.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FrontendSlowClient {

public static final String MSG_BODY = "{\"field\": \"value\"}";

public String msgHeadWithContentLenght(String topicName) {
return "POST /topics/" + topicName + " HTTP/1.1\n"
+ "Host: " + host + ":" + port + "\n"
+ "Content-Type: application/json\n"
+ "Content-Length: " + MSG_BODY.length() + "\r\n\r\n";
}

private String msgHeadWithChunkedEncoding(String topicName) {
return "POST /topics/" + topicName + " HTTP/1.1\n"
+ "Host: " + host + ":" + port + "\n"
+ "Content-Type: application/json\n"
+ "Transfer-Encoding: chunked \r\n\r\n";
}

private static final Logger LOGGER = LoggerFactory.getLogger(FrontendSlowClient.class);

private final String host;
private final int port;

public FrontendSlowClient(String host, int port) {
this.host = host;
this.port = port;
}

private final ExecutorService executor = Executors.newSingleThreadExecutor();

public String slowEvent(int clientTimeout, int pauseTimeBetweenChunks, int delayBeforeSendingFirstData, String topicName)
throws IOException, InterruptedException {
return slowEvent(clientTimeout, pauseTimeBetweenChunks, delayBeforeSendingFirstData, topicName, false);
}

public String slowEvent(int clientTimeout, int pauseTimeBetweenChunks, int delayBeforeSendingFirstData,
String topicName, boolean chunkedEncoding)
throws IOException, InterruptedException {

Socket clientSocket = new Socket(host, port);

Thread.sleep(delayBeforeSendingFirstData);

DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
BufferedReader inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));

clientSocket.setSoTimeout(clientTimeout);

if (chunkedEncoding) {
outToServer.writeBytes(msgHeadWithChunkedEncoding(topicName));
outToServer.flush();
} else {
outToServer.writeBytes(msgHeadWithContentLenght(topicName));
outToServer.flush();
}
sendBodyInChunks(outToServer, MSG_BODY, pauseTimeBetweenChunks, chunkedEncoding);

String response;
try {
response = readResponse(inFromServer);
LOGGER.info("Response: {}", response);
} catch (SocketTimeoutException e) {
LOGGER.warn("client timeout");
clientSocket.close();
throw e;
}

clientSocket.close();

return response;
}

private void sendBodyInChunks(DataOutputStream outToServer, String msgBody, int pauseTime, boolean encoded) {
executor.execute(() -> {
try {
for (int index = 0; index < msgBody.length(); index++) {
outToServer.writeBytes(prepareChunk(msgBody, index, encoded));
outToServer.flush();
LOGGER.info("Sent chunk");
if (pauseTime > 0) {
Thread.sleep(pauseTime);
}
}
if (encoded) {
outToServer.writeBytes("0\r\n\r\n");
outToServer.flush();
LOGGER.info("Finished chunked encoding");
}
} catch (SocketException e) {
LOGGER.warn("Socket closed");
} catch (InterruptedException | IOException e) {
LOGGER.error("Something went wrong while sending data", e);
}
});
}

private String prepareChunk(String msg, int index, boolean encoded) {
return encoded ? String.format("1\n%c\r\n", msg.charAt(index)) : String.valueOf(msg.charAt(index));
}

private String readResponse(BufferedReader bufferedReader) throws IOException {
String line;
StringBuilder response = new StringBuilder();

while (!(line = bufferedReader.readLine()).equals("")) {
response.append(line);
}

return response.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package pl.allegro.tech.hermes.integrationtests.client;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;
import org.glassfish.jersey.client.ClientConfig;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static jakarta.ws.rs.client.ClientBuilder.newClient;
import static org.awaitility.Awaitility.waitAtMost;
import static org.glassfish.jersey.client.ClientProperties.REQUEST_ENTITY_PROCESSING;
import static org.glassfish.jersey.client.RequestEntityProcessing.CHUNKED;

public class FrontendTestClient {

Expand All @@ -16,14 +25,18 @@ public class FrontendTestClient {
private static final String STATUS_READY_PATH = "/status/ready";

private final WebTestClient webTestClient;
private final FrontendSlowClient slowTestClient;
private final String frontendContainerUrl;
private final Client chunkedClient;

public FrontendTestClient(int frontendPort) {
this.frontendContainerUrl = "http://localhost:" + frontendPort;
this.webTestClient = WebTestClient
.bindToServer()
.baseUrl(frontendContainerUrl)
.build();
this.slowTestClient = new FrontendSlowClient("localhost", frontendPort);
this.chunkedClient = newClient(new ClientConfig().property(REQUEST_ENTITY_PROCESSING, CHUNKED));
}

public WebTestClient.ResponseSpec publishUntilSuccess(String topicQualifiedName, String body) {
Expand All @@ -33,13 +46,36 @@ public WebTestClient.ResponseSpec publishUntilSuccess(String topicQualifiedName,
return response.get();
}

public Response publishChunked(String topicQualifiedName, String body) {
return chunkedClient.target(UriBuilder
.fromUri(frontendContainerUrl)
.path(TOPIC_PATH)
.build(topicQualifiedName))
.request().post(Entity.text(body));

}

public WebTestClient.ResponseSpec publishUntilSuccess(String topicQualifiedName, String body, Map<String, String> headers) {
AtomicReference<WebTestClient.ResponseSpec> response = new AtomicReference<>();
waitAtMost(Duration.ofSeconds(10))
.untilAsserted(() -> response.set(publish(topicQualifiedName, body, headers).expectStatus().isCreated()));
return response.get();
}

public WebTestClient.ResponseSpec publishUntilSuccess(String topicQualifiedName, byte[] body) {
AtomicReference<WebTestClient.ResponseSpec> response = new AtomicReference<>();
waitAtMost(Duration.ofSeconds(10))
.untilAsserted(() -> response.set(publish(topicQualifiedName, body).expectStatus().isCreated()));
return response.get();
}

public WebTestClient.ResponseSpec publishUntilStatus(String topicQualifiedName, String body, int statusCode) {
AtomicReference<WebTestClient.ResponseSpec> response = new AtomicReference<>();
waitAtMost(Duration.ofSeconds(10))
.untilAsserted(() -> response.set(publish(topicQualifiedName, body).expectStatus().isEqualTo(statusCode)));
return response.get();
}

WebTestClient.ResponseSpec publish(String topicQualifiedName, String body) {
return webTestClient.post().uri(UriBuilder
.fromUri(frontendContainerUrl)
Expand All @@ -49,6 +85,16 @@ WebTestClient.ResponseSpec publish(String topicQualifiedName, String body) {
.exchange();
}

WebTestClient.ResponseSpec publish(String topicQualifiedName, String body, Map<String, String> headers) {
return webTestClient.post().uri(UriBuilder
.fromUri(frontendContainerUrl)
.path(TOPIC_PATH)
.build(topicQualifiedName))
.body(Mono.just(body), String.class)
.headers(requestHeaders -> headers.forEach(requestHeaders::add))
.exchange();
}

WebTestClient.ResponseSpec publish(String topicQualifiedName, byte[] body) {
return webTestClient.post().uri(UriBuilder
.fromUri(frontendContainerUrl)
Expand All @@ -58,6 +104,13 @@ WebTestClient.ResponseSpec publish(String topicQualifiedName, byte[] body) {
.exchange();
}

String publishSlowly(int clientTimeout, int pauseTimeBetweenChunks, int delayBeforeSendingFirstData,
String topicName, boolean chunkedEncoding) throws IOException, InterruptedException{
return slowTestClient.slowEvent(clientTimeout, pauseTimeBetweenChunks, delayBeforeSendingFirstData, topicName, chunkedEncoding);
}



public WebTestClient.ResponseSpec getStatusHealth() {
return webTestClient.get().uri(UriBuilder
.fromUri(frontendContainerUrl)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.integrationtests.client;

import com.jayway.awaitility.Duration;
import jakarta.ws.rs.core.Response;
import org.springframework.test.web.reactive.server.WebTestClient;
import pl.allegro.tech.hermes.api.BlacklistStatus;
import pl.allegro.tech.hermes.api.Group;
Expand All @@ -11,7 +12,9 @@
import pl.allegro.tech.hermes.api.TopicWithSchema;
import pl.allegro.tech.hermes.consumers.supervisor.process.RunningSubscriptionStatus;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static com.jayway.awaitility.Awaitility.waitAtMost;

Expand Down Expand Up @@ -61,25 +64,77 @@ public WebTestClient.ResponseSpec getSubscriptionResponse(String topicQualifiedN
return managementTestClient.getSubscription(topicQualifiedName, subscriptionName);
}

public WebTestClient.ResponseSpec getSubscriptionMetrics(String topicQualifiedName, String subscriptionName) {
return managementTestClient.getSubscriptionMetrics(topicQualifiedName, subscriptionName);
}

public void suspendSubscription(Topic topic, String subscription) {
managementTestClient.updateSubscriptionState(topic, subscription, Subscription.State.SUSPENDED)
.expectStatus()
.is2xxSuccessful();
}

public void waitUntilSubscriptionActivated(String topicQualifiedName, String subscriptionName) {
waitAtMost(Duration.TEN_SECONDS)
.until(() -> managementTestClient.getSubscription(topicQualifiedName, subscriptionName)
.expectStatus()
.is2xxSuccessful()
.expectBody(Subscription.class)
.returnResult().getResponseBody().getState().equals(Subscription.State.ACTIVE)
);
}

public void waitUntilSubscriptionSuspended(String topicQualifiedName, String subscriptionName) {
waitAtMost(Duration.TEN_SECONDS)
.until(() -> managementTestClient.getSubscription(topicQualifiedName, subscriptionName)
.expectStatus()
.is2xxSuccessful()
.expectBody(Subscription.class)
.returnResult().getResponseBody().getState().equals(Subscription.State.SUSPENDED)
);
}

// PUBLISH
public WebTestClient.ResponseSpec publishUntilSuccess(String topicQualifiedName, String body) {
return frontendTestClient.publishUntilSuccess(topicQualifiedName, body);
}

public WebTestClient.ResponseSpec publishUntilSuccess(String topicQualifiedName, String body, Map<String, String> headers) {
return frontendTestClient.publishUntilSuccess(topicQualifiedName, body, headers);
}

public WebTestClient.ResponseSpec publishUntilSuccess(String topicQualifiedName, byte[] body) {
return frontendTestClient.publishUntilSuccess(topicQualifiedName, body);
}

public WebTestClient.ResponseSpec publishUntilStatus(String topicQualifiedName, String body, int statusCode) {
return frontendTestClient.publishUntilStatus(topicQualifiedName, body, statusCode);
}

public void updateSubscription(Topic topic, String subscription, PatchData patch) {
managementTestClient.updateSubscription(topic, subscription, patch)
.expectStatus()
.is2xxSuccessful();
.expectStatus()
.is2xxSuccessful();
}

public WebTestClient.ResponseSpec publish(String topicQualifiedName, String body) {
return frontendTestClient.publish(topicQualifiedName, body);
}

public Response publishChunked(String topicQualifiedName, String body) {
return frontendTestClient.publishChunked(topicQualifiedName, body);
}

public String publishSlowly(int clientTimeout, int pauseTimeBetweenChunks, int delayBeforeSendingFirstData,
String topicName, boolean chunkedEncoding) throws IOException, InterruptedException {
return frontendTestClient.publishSlowly(clientTimeout, pauseTimeBetweenChunks, delayBeforeSendingFirstData, topicName, chunkedEncoding);
}

public String publishSlowly(int clientTimeout, int pauseTimeBetweenChunks, int delayBeforeSendingFirstData, String topicName)
throws IOException, InterruptedException {
return publishSlowly(clientTimeout, pauseTimeBetweenChunks, delayBeforeSendingFirstData, topicName, false);
}

private void waitUntilSubscriptionCreated(String topicQualifiedName, String subscriptionName) {
waitAtMost(Duration.TEN_SECONDS)
.until(() -> managementTestClient.getSubscription(topicQualifiedName, subscriptionName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public class ManagementTestClient {

private static final String SUBSCRIPTION_PATH = "/topics/{topicName}/subscriptions/{subscriptionName}";

private static final String SUBSCRIPTION_STATE_PATH = "/topics/{topicName}/subscriptions/{subscriptionName}/state";

private static final String SUBSCRIPTION_METRICS_PATH = "/topics/{topicName}/subscriptions/{subscriptionName}/metrics";

private static final String GROUPS_PATH = "/groups";

private static final String RETRANSMISSION_PATH = "/topics/{topicName}/subscriptions/{subscriptionName}/retransmission";
Expand Down Expand Up @@ -86,10 +90,27 @@ public WebTestClient.ResponseSpec updateSubscription(Topic topic, String subscri
.exchange();
}

public WebTestClient.ResponseSpec updateSubscriptionState(Topic topic, String subscription, Subscription.State state) {
return webTestClient.put().uri(UriBuilder
.fromUri(managementContainerUrl)
.path(SUBSCRIPTION_STATE_PATH)
.build(topic.getQualifiedName(), subscription))
.body(Mono.just(state), Subscription.State.class)
.exchange();
}

public WebTestClient.ResponseSpec getSubscription(String topicQualifiedName, String subscriptionName) {
return getSingleSubscription(topicQualifiedName, subscriptionName);
}

public WebTestClient.ResponseSpec getSubscriptionMetrics(String topicQualifiedName, String subscriptionName) {
return webTestClient.get().uri(UriBuilder
.fromUri(managementContainerUrl)
.path(SUBSCRIPTION_METRICS_PATH)
.build(topicQualifiedName, subscriptionName))
.exchange();
}

private WebTestClient.ResponseSpec getSingleTopic(String topicQualifiedName) {
return webTestClient.get().uri(
UriBuilder.fromUri(managementContainerUrl)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pl.allegro.tech.hermes.integrationtests.helpers;

import pl.allegro.tech.hermes.integrationtests.metadata.TraceContext;

import java.util.Map;

public class TraceHeaders {
public static Map<String, String> fromTraceContext(TraceContext traceContext) {
return Map.of(
"Trace-Id", traceContext.getTraceId(),
"Span-Id", traceContext.getSpanId(),
"Parent-Span-Id", traceContext.getParentSpanId(),
"Trace-Sampled", traceContext.getTraceSampled(),
"Trace-Reported", traceContext.getTraceReported()
);
}
}
Loading

0 comments on commit 27eaea3

Please sign in to comment.