Skip to content

Commit

Permalink
Reactive Messaging codestart
Browse files Browse the repository at this point in the history
 * fix quarkusio#20872
 * created `MyReactiveMessagingApplication` to demonstrates `@Emitter`, `@Incoming` and `@Outgoing` annotations.
 * created dynamic application.yml based on selected extension
 * added codestart integration test
  • Loading branch information
vgallet committed Nov 17, 2021
1 parent a815f0b commit c64c335
Show file tree
Hide file tree
Showing 17 changed files with 389 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{#include readme-header /}

{#each input.selected-extensions-ga}
{#switch it}
{#case 'io.quarkus:quarkus-smallrye-reactive-messaging-kafka'}
[Related Apache Kafka guide section...](https://quarkus.io/guides/kafka-reactive-getting-started)

{#case 'io.quarkus:quarkus-smallrye-reactive-messaging-amqp'}
[Related Apache AMQP 1.0 guide section...](https://quarkus.io/guides/amqp)

{/switch}
{/each}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{#each input.selected-extensions-ga}
{#switch it}
{#case 'io.quarkus:quarkus-smallrye-reactive-messaging-kafka'}
mp:
messaging:
outgoing:
source-out:
connector: smallrye-kafka
topic: word
uppercase-out:
connector: smallrye-kafka
topic: uppercase-word
incoming:
source-in:
connector: smallrye-kafka
topic: word
uppercase-in:
connector: smallrye-kafka
topic: uppercase-word

{#case 'io.quarkus:quarkus-smallrye-reactive-messaging-mqtt'}
mp:
messaging:
outgoing:
source-out:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: word
uppercase-out:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: uppercase-word
incoming:
source-in:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: word
uppercase-in:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: uppercase-word

{#case 'io.quarkus:quarkus-smallrye-reactive-messaging-amqp'}
mp:
messaging:
outgoing:
source-out:
address: word
connector: smallrye-amqp
uppercase-out:
connector: smallrye-amqp
address: uppercase-word
incoming:
source-in:
connector: smallrye-amqp
address: word
uppercase-in:
address: uppercase-word
connector: smallrye-amqp

{/switch}
{/each}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: reactive-messaging-codestart
ref: reactive-messaging
tags: extension-codestart
type: code
metadata:
title: Reactive Messaging codestart
description: Use SmallRye Reactive Messaging
language:
base:
dependencies:
test-dependencies:
- io.smallrye.reactive:smallrye-reactive-messaging-in-memory
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.acme;

import io.quarkus.runtime.StartupEvent;
import org.eclipse.microprofile.reactive.messaging.*;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.stream.Stream;

@ApplicationScoped
public class MyReactiveMessagingApplication {

@Inject
@Channel("source-out")
Emitter<String> emitter;

/** Sends message to the source channel, can be used from a JAX-RS resource or any bean of your application **/
void onStart(@Observes StartupEvent ev) {
Stream.of("Hello", "with", "SmallRye", "reactive", "message").forEach(string -> emitter.send(string));
}

/** Consume the message from the source channel, uppercase it and send it to the uppercase channel **/
@Incoming("source-in")
@Outgoing("uppercase-out")
public Message<String> toUpperCase(Message<String> message) {
return message.withPayload(message.getPayload().toUpperCase());
}

/** Consume the uppercase channel and print the message **/
@Incoming("uppercase-in")
public void sink(String word) {
System.out.println(">> " + word);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.acme;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.connectors.InMemoryConnector;
import io.smallrye.reactive.messaging.connectors.InMemorySink;
import io.smallrye.reactive.messaging.connectors.InMemorySource;
import org.junit.jupiter.api.Test;

import javax.enterprise.inject.Any;
import javax.inject.Inject;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@QuarkusTest
@QuarkusTestResource(MyReactiveMessagingApplicationTest.InMemoryChannelTestResource.class)
class MyReactiveMessagingApplicationTest {

@Inject
@Any
InMemoryConnector connector;

@Test
void test() {
InMemorySource<String> source = connector.source("source-in");
InMemorySink<String> uppercase = connector.sink("uppercase-out");

source.send("Hello");
source.send("In-memory");
source.send("Connectors");

assertEquals(3, uppercase.received().size());
assertTrue(uppercase.received().stream().anyMatch(message -> message.getPayload().equals("HELLO")));
assertTrue(uppercase.received().stream().anyMatch(message -> message.getPayload().equals("IN-MEMORY")));
assertTrue(uppercase.received().stream().anyMatch(message -> message.getPayload().equals("CONNECTORS")));

}

public static class InMemoryChannelTestResource implements QuarkusTestResourceLifecycleManager {

@Override
public Map<String, String> start() {
Map<String, String> env = new HashMap<>();
env.putAll(InMemoryConnector.switchIncomingChannelsToInMemory("source-in"));
env.putAll(InMemoryConnector.switchOutgoingChannelsToInMemory("uppercase-out"));
return env;
}

@Override
public void stop() {
InMemoryConnector.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ metadata:
- "web"
- "reactive"
- "messaging"
status: "experimental"
status: "experimental"
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ metadata:
- "mp.messaging."
- "quarkus.reactive-messaging."
- "quarkus.amqp."
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ metadata:
- "mp.messaging."
- "quarkus.reactive-messaging."
- "quarkus.kafka."
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ metadata:
config:
- "mp.messaging."
- "quarkus.reactive-messaging."
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -280,7 +281,8 @@ private AbstractPathAssert<?> checkGeneratedSource(String sourceDir, Language la

private String getTestId() {
String tool = buildTool != null ? buildTool.getKey() + "-" : "";
return tool + String.join("-", codestarts);
String suffix = codestarts.isEmpty() ? String.valueOf(Instant.now().toEpochMilli()) : String.join("-", codestarts);
return tool + suffix;
}

private void generateRealDataProjectIfNeeded(Path path, Language language) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class QuarkusCodestartBuildIT extends PlatformAwareTestBase {
private static final Path testDirPath = Paths.get("target/quarkus-codestart-build-test");

private static final Set<String> EXCLUDED = Sets.newHashSet("spring-web-codestart", "picocli-codestart",
"hibernate-orm-codestart");
"hibernate-orm-codestart", "reactive-messaging");

@BeforeAll
static void setUp() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.quarkus.devtools.codestarts.quarkus;

import static io.quarkus.devtools.codestarts.quarkus.QuarkusCodestartCatalog.Language.JAVA;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.devtools.testing.codestarts.QuarkusCodestartTest;
import io.quarkus.maven.ArtifactKey;

public class ReactiveMessagingCodestartIT {

@RegisterExtension
public static QuarkusCodestartTest kafkaCodestartTest = QuarkusCodestartTest.builder()
.extension(ArtifactKey.fromString("io.quarkus:quarkus-smallrye-reactive-messaging-kafka"))
.languages(JAVA)
.build();

@RegisterExtension
public static QuarkusCodestartTest amqpCodestartTest = QuarkusCodestartTest.builder()
.extension(ArtifactKey.fromString("io.quarkus:quarkus-smallrye-reactive-messaging-amqp"))
.languages(JAVA)
.build();

@RegisterExtension
public static QuarkusCodestartTest mqttCodestartTest = QuarkusCodestartTest.builder()
.extension(ArtifactKey.fromString("io.quarkus:quarkus-smallrye-reactive-messaging-mqtt"))
.languages(JAVA)
.build();

@Test
void testKafkaContent() throws Throwable {
kafkaCodestartTest.checkGeneratedSource("org.acme.MyReactiveMessagingApplication");
kafkaCodestartTest.assertThatGeneratedFileMatchSnapshot(JAVA, "src/main/resources/application.properties");
kafkaCodestartTest.checkGeneratedTestSource("org.acme.MyReactiveMessagingApplicationTest");
}

@Test
void testMQTTContent() throws Throwable {
mqttCodestartTest.assertThatGeneratedFileMatchSnapshot(JAVA, "src/main/resources/application.properties");
}

@Test
void testAMQPContent() throws Throwable {
amqpCodestartTest.assertThatGeneratedFileMatchSnapshot(JAVA, "src/main/resources/application.properties");
}

@Test
void buildAll() throws Throwable {
kafkaCodestartTest.buildAllProjects();
mqttCodestartTest.buildAllProjects();
amqpCodestartTest.buildAllProjects();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mp.messaging.outgoing.uppercase-out.connector=smallrye-amqp
mp.messaging.outgoing.uppercase-out.address=uppercase-word
mp.messaging.incoming.source-in.connector=smallrye-amqp
mp.messaging.incoming.uppercase-in.address=uppercase-word
mp.messaging.outgoing.source-out.address=word
mp.messaging.incoming.source-in.address=word
mp.messaging.outgoing.source-out.connector=smallrye-amqp
mp.messaging.incoming.uppercase-in.connector=smallrye-amqp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package ilove.quark.us;

import io.quarkus.runtime.StartupEvent;
import org.eclipse.microprofile.reactive.messaging.*;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.stream.Stream;

@ApplicationScoped
public class MyReactiveMessagingApplication {

@Inject
@Channel("source-out")
Emitter<String> emitter;

/** Sends message to the source channel, can be used from a JAX-RS resource or any bean of your application **/
void onStart(@Observes StartupEvent ev) {
Stream.of("Hello", "with", "SmallRye", "reactive", "message").forEach(string -> emitter.send(string));
}

/** Consume the message from the source channel, uppercase it and send it to the uppercase channel **/
@Incoming("source-in")
@Outgoing("uppercase-out")
public Message<String> toUpperCase(Message<String> message) {
return message.withPayload(message.getPayload().toUpperCase());
}

/** Consume the uppercase channel and print the message **/
@Incoming("uppercase-in")
public void sink(String word) {
System.out.println(">> " + word);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mp.messaging.outgoing.uppercase-out.connector=smallrye-kafka
mp.messaging.incoming.source-in.connector=smallrye-kafka
mp.messaging.outgoing.source-out.topic=word
mp.messaging.incoming.source-in.topic=word
mp.messaging.incoming.uppercase-in.topic=uppercase-word
mp.messaging.outgoing.source-out.connector=smallrye-kafka
mp.messaging.outgoing.uppercase-out.topic=uppercase-word
mp.messaging.incoming.uppercase-in.connector=smallrye-kafka
Loading

0 comments on commit c64c335

Please sign in to comment.