Skip to content

Commit

Permalink
Added integration tests for SQS Reactive Messaging (#1483)
Browse files Browse the repository at this point in the history
  • Loading branch information
adampoplawski and [email protected] authored Nov 16, 2024
1 parent 2bb2738 commit 4bcf46e
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 1 deletion.
6 changes: 5 additions & 1 deletion integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@
<groupId>io.quarkiverse.amazonservices</groupId>
<artifactId>quarkus-amazon-s3-transfer-manager</artifactId>
</dependency>

<dependency>
<groupId>io.quarkiverse.amazonservices</groupId>
<artifactId>quarkus-messaging-amazon-sqs</artifactId>
</dependency>

<!-- Needed for InMemorySpanExporter to verify captured traces -->
<dependency>
<groupId>io.opentelemetry</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkiverse.it.amazon.sqsmessagingconnector;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

@ApplicationScoped
public class SqsMessagingConnectorManager {
@Inject
SqsClient sqsClient;
List<String> messages = new CopyOnWriteArrayList<>();

@Incoming("messages")
public void process(String incomingMessage) {
messages.add(incomingMessage);
}

public void sendMessage(String message, String queueName) {
sqsClient.sendMessage(SendMessageRequest.builder().queueUrl(getQueueUrl(queueName)).messageBody(message).build());
}

public List<String> getMessages() {
return messages;
}

private String getQueueUrl(String queueName) {
return sqsClient.listQueues(r -> r.queueNamePrefix(queueName)).queueUrls().get(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.quarkiverse.it.amazon.sqsmessagingconnector;

import java.util.List;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.jboss.resteasy.reactive.RestQuery;

@Path("/sqs-messaging-connector")
public class SqsMessagingConnectorResource {
@Inject
SqsMessagingConnectorManager connectorManager;

@Path("messages/{queueName}")
@POST
public void sendSyncMessage(String queueName, @RestQuery String message) {
connectorManager.sendMessage(message, queueName);
}

@Path("messages/{queueName}")
@GET
public List<String> getMessages() {
return connectorManager.getMessages();
}
}
3 changes: 3 additions & 0 deletions integration-tests/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,6 @@ quarkus.s3.telemetry.enabled=true
quarkus.iam.custom.aws.credentials.type=static
quarkus.iam.custom.aws.credentials.static-provider.access-key-id=112233445566
quarkus.iam.custom.aws.credentials.static-provider.secret-access-key=test

quarkus.sqs.devservices.queues=quarkus-messaging-test-queue
mp.messaging.incoming.messages.queue=quarkus-messaging-test-queue
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.quarkiverse.it.amazon;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
public class AmazonSqsMessagingConnectorITCase extends AmazonSqsMessagingConnectorTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.quarkiverse.it.amazon;

import static io.restassured.RestAssured.given;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.is;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import jakarta.ws.rs.core.Response;

import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
public class AmazonSqsMessagingConnectorTest {

private final static String QUEUE_NAME = "quarkus-messaging-test-queue";
private final static List<String> MESSAGES = new ArrayList<>();

static {
MESSAGES.add("First Message");
MESSAGES.add("Second Message");
MESSAGES.add("Third Message");
}

@Test
public void testReceiveMessages() {
//Publish messages
MESSAGES.forEach(msg -> {
given()
.pathParam("queueName", QUEUE_NAME)
.queryParam("message", msg)
.when().post("/test/sqs-messaging-connector/messages/{queueName}")
.then().body(any(String.class));
});

await()
.atMost(Duration.ofSeconds(10L))
.pollInterval(Duration.ofSeconds(1L))
.untilAsserted(() -> assertThat(given()
.pathParam("queueName", QUEUE_NAME)
.get("/test/sqs-messaging-connector/messages/{queueName}")
.then()
.assertThat()
.statusCode(is(Response.Status.OK.getStatusCode()))
.extract()
.as(String[].class), arrayWithSize(3)));
}
}

0 comments on commit 4bcf46e

Please sign in to comment.