From 4bcf46e847e9d2eb1cfdd293d46f7141fd21d857 Mon Sep 17 00:00:00 2001 From: adampoplawski Date: Sat, 16 Nov 2024 21:47:52 +0100 Subject: [PATCH] Added integration tests for SQS Reactive Messaging (#1483) Co-authored-by: adam.poplawski@tui.co.uk --- integration-tests/pom.xml | 6 +- .../SqsMessagingConnectorManager.java | 36 ++++++++++++ .../SqsMessagingConnectorResource.java | 28 ++++++++++ .../src/main/resources/application.properties | 3 + .../AmazonSqsMessagingConnectorITCase.java | 7 +++ .../AmazonSqsMessagingConnectorTest.java | 55 +++++++++++++++++++ 6 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 integration-tests/src/main/java/io/quarkiverse/it/amazon/sqsmessagingconnector/SqsMessagingConnectorManager.java create mode 100644 integration-tests/src/main/java/io/quarkiverse/it/amazon/sqsmessagingconnector/SqsMessagingConnectorResource.java create mode 100644 integration-tests/src/test/java/io/quarkiverse/it/amazon/AmazonSqsMessagingConnectorITCase.java create mode 100644 integration-tests/src/test/java/io/quarkiverse/it/amazon/AmazonSqsMessagingConnectorTest.java diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index e3fb34a4..3613489e 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -117,7 +117,11 @@ io.quarkiverse.amazonservices quarkus-amazon-s3-transfer-manager - + + io.quarkiverse.amazonservices + quarkus-messaging-amazon-sqs + + io.opentelemetry diff --git a/integration-tests/src/main/java/io/quarkiverse/it/amazon/sqsmessagingconnector/SqsMessagingConnectorManager.java b/integration-tests/src/main/java/io/quarkiverse/it/amazon/sqsmessagingconnector/SqsMessagingConnectorManager.java new file mode 100644 index 00000000..5701ee7b --- /dev/null +++ b/integration-tests/src/main/java/io/quarkiverse/it/amazon/sqsmessagingconnector/SqsMessagingConnectorManager.java @@ -0,0 +1,36 @@ +package io.quarkiverse.it.amazon.sqsmessagingconnector; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +@ApplicationScoped +public class SqsMessagingConnectorManager { + @Inject + SqsClient sqsClient; + List messages = new CopyOnWriteArrayList<>(); + + @Incoming("messages") + public void process(String incomingMessage) { + messages.add(incomingMessage); + } + + public void sendMessage(String message, String queueName) { + sqsClient.sendMessage(SendMessageRequest.builder().queueUrl(getQueueUrl(queueName)).messageBody(message).build()); + } + + public List getMessages() { + return messages; + } + + private String getQueueUrl(String queueName) { + return sqsClient.listQueues(r -> r.queueNamePrefix(queueName)).queueUrls().get(0); + } +} diff --git a/integration-tests/src/main/java/io/quarkiverse/it/amazon/sqsmessagingconnector/SqsMessagingConnectorResource.java b/integration-tests/src/main/java/io/quarkiverse/it/amazon/sqsmessagingconnector/SqsMessagingConnectorResource.java new file mode 100644 index 00000000..30134ef8 --- /dev/null +++ b/integration-tests/src/main/java/io/quarkiverse/it/amazon/sqsmessagingconnector/SqsMessagingConnectorResource.java @@ -0,0 +1,28 @@ +package io.quarkiverse.it.amazon.sqsmessagingconnector; + +import java.util.List; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; + +import org.jboss.resteasy.reactive.RestQuery; + +@Path("/sqs-messaging-connector") +public class SqsMessagingConnectorResource { + @Inject + SqsMessagingConnectorManager connectorManager; + + @Path("messages/{queueName}") + @POST + public void sendSyncMessage(String queueName, @RestQuery String message) { + connectorManager.sendMessage(message, queueName); + } + + @Path("messages/{queueName}") + @GET + public List getMessages() { + return connectorManager.getMessages(); + } +} diff --git a/integration-tests/src/main/resources/application.properties b/integration-tests/src/main/resources/application.properties index 753bf048..e0e34f6f 100644 --- a/integration-tests/src/main/resources/application.properties +++ b/integration-tests/src/main/resources/application.properties @@ -52,3 +52,6 @@ quarkus.s3.telemetry.enabled=true quarkus.iam.custom.aws.credentials.type=static quarkus.iam.custom.aws.credentials.static-provider.access-key-id=112233445566 quarkus.iam.custom.aws.credentials.static-provider.secret-access-key=test + +quarkus.sqs.devservices.queues=quarkus-messaging-test-queue +mp.messaging.incoming.messages.queue=quarkus-messaging-test-queue diff --git a/integration-tests/src/test/java/io/quarkiverse/it/amazon/AmazonSqsMessagingConnectorITCase.java b/integration-tests/src/test/java/io/quarkiverse/it/amazon/AmazonSqsMessagingConnectorITCase.java new file mode 100644 index 00000000..528cad61 --- /dev/null +++ b/integration-tests/src/test/java/io/quarkiverse/it/amazon/AmazonSqsMessagingConnectorITCase.java @@ -0,0 +1,7 @@ +package io.quarkiverse.it.amazon; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class AmazonSqsMessagingConnectorITCase extends AmazonSqsMessagingConnectorTest { +} diff --git a/integration-tests/src/test/java/io/quarkiverse/it/amazon/AmazonSqsMessagingConnectorTest.java b/integration-tests/src/test/java/io/quarkiverse/it/amazon/AmazonSqsMessagingConnectorTest.java new file mode 100644 index 00000000..11c710ee --- /dev/null +++ b/integration-tests/src/test/java/io/quarkiverse/it/amazon/AmazonSqsMessagingConnectorTest.java @@ -0,0 +1,55 @@ +package io.quarkiverse.it.amazon; + +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.is; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import jakarta.ws.rs.core.Response; + +import org.junit.jupiter.api.Test; + +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class AmazonSqsMessagingConnectorTest { + + private final static String QUEUE_NAME = "quarkus-messaging-test-queue"; + private final static List MESSAGES = new ArrayList<>(); + + static { + MESSAGES.add("First Message"); + MESSAGES.add("Second Message"); + MESSAGES.add("Third Message"); + } + + @Test + public void testReceiveMessages() { + //Publish messages + MESSAGES.forEach(msg -> { + given() + .pathParam("queueName", QUEUE_NAME) + .queryParam("message", msg) + .when().post("/test/sqs-messaging-connector/messages/{queueName}") + .then().body(any(String.class)); + }); + + await() + .atMost(Duration.ofSeconds(10L)) + .pollInterval(Duration.ofSeconds(1L)) + .untilAsserted(() -> assertThat(given() + .pathParam("queueName", QUEUE_NAME) + .get("/test/sqs-messaging-connector/messages/{queueName}") + .then() + .assertThat() + .statusCode(is(Response.Status.OK.getStatusCode())) + .extract() + .as(String[].class), arrayWithSize(3))); + } +}