Skip to content

Commit

Permalink
Reactive Messaging codestart
Browse files Browse the repository at this point in the history
 * fix quarkusio#20872
 * created `MyReactiveMessagingApplication` to demonstrates `@Emitter`, `@Incoming` and `@Outgoing` annotations.
 * created dynamic application.yml based on selected extension
  • Loading branch information
vgallet committed Nov 15, 2021
1 parent fa595c7 commit e97bdb8
Show file tree
Hide file tree
Showing 14 changed files with 386 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{#include readme-header /}

{#each input.selected-extensions-ga}
{#if it.equals("io.quarkus:quarkus-smallrye-reactive-messaging-kafka")}
// TODO : which one to choose ?
[Related Apache Kafka guide section...](https://quarkus.io/guides/kafka)
[Related Apache Kafka guide section...](https://quarkus.io/guides/kafka-reactive-getting-started)
{/if}
{#if it.equals("io.quarkus:quarkus-smallrye-reactive-messaging-amqp")}
[Related Apache AMQP 1.0 guide section...](https://quarkus.io/guides/amqp)
{/if}
{/each}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
{#each input.selected-extensions-ga}
{#if it.equals("io.quarkus:quarkus-smallrye-reactive-messaging-kafka")}
{#if count == 1 }

mp:
messaging:
outgoing:
source-out:
connector: smallrye-kafka
topic: word
uppercase-out:
connector: smallrye-kafka
topic: uppercase-word
incoming:
source-in:
connector: smallrye-kafka
topic: word
uppercase-in:
connector: smallrye-kafka
topic: uppercase-word
{#else}

#mp:
# messaging:
# outgoing:
# source-out:
# connector: smallrye-kafka
# topic: word
# uppercase-out:
# connector: smallrye-kafka
# topic: uppercase-word
# incoming:
# source-in:
# connector: smallrye-kafka
# topic: word
# uppercase-in:
# connector: smallrye-kafka
# topic: uppercase-word
{/if}
{/if}
{#if it.equals("io.quarkus:quarkus-smallrye-reactive-messaging-mqtt")}
{#if count == 1 }

mp:
messaging:
outgoing:
source-out:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: word
uppercase-out:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: uppercase-word
incoming:
source-in:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: word
uppercase-in:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: uppercase-word
{#else}

#mp:
# messaging:
# outgoing:
# source-out:
# connector: smallrye-mqtt
# host: localhost
# port: '1883'
# topic: word
# uppercase-out:
# connector: smallrye-mqtt
# host: localhost
# port: '1883'
# topic: uppercase-word
# incoming:
# source-in:
# connector: smallrye-mqtt
# host: localhost
# port: '1883'
# topic: word
# uppercase-in:
# connector: smallrye-mqtt
# host: localhost
# port: '1883'
# topic: uppercase-word
{/if}
{/if}
{#if it.equals("io.quarkus:quarkus-smallrye-reactive-messaging-amqp")}
{#if count == 1 }

mp:
messaging:
outgoing:
source-out:
address: word
connector: smallrye-amqp
uppercase-out:
connector: smallrye-amqp
address: uppercase-word
incoming:
source-in:
connector: smallrye-amqp
address: word
uppercase-in:
address: uppercase-word
connector: smallrye-amqp
{#else}

#mp:
# messaging:
# outgoing:
# source-out:
# address: word
# connector: smallrye-amqp
# uppercase-out:
# connector: smallrye-amqp
# address: uppercase-word
# incoming:
# source-in:
# connector: smallrye-amqp
# address: word
# uppercase-in:
# address: uppercase-word
# connector: smallrye-amqp
{/if}
{/if}
{/each}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: reactive-messaging-codestart
ref: reactive-messaging
tags: extension-codestart
type: code
metadata:
title: Reactive Messaging codestart
description: Use SmallRye Reactive Messaging
language:
base:
dependencies:
test-dependencies:
- io.smallrye.reactive:smallrye-reactive-messaging-in-memory
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.acme;

import io.quarkus.runtime.StartupEvent;
import org.eclipse.microprofile.reactive.messaging.*;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.stream.Stream;

@ApplicationScoped
public class MyReactiveMessagingApplication {

@Inject
@Channel("source-out")
Emitter<String> emitter;

/** Sends message to the source channel, can be used from a JAX-RS resource or any bean of your application **/
void onStart(@Observes StartupEvent ev) {
Stream.of("Hello", "with", "SmallRye", "reactive", "message").forEach(string -> emitter.send(string));
}

/** Consume the message from the source channel, uppercase it and send it to the uppercase channel **/
@Incoming("source-in")
@Outgoing("uppercase-out")
public Message<String> toUpperCase(Message<String> message) {
return message.withPayload(message.getPayload().toUpperCase());
}

/** Consume the uppercase channel and print the message **/
@Incoming("uppercase-in")
public void sink(String word) {
System.out.println(">> " + word);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.acme;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.connectors.InMemoryConnector;
import io.smallrye.reactive.messaging.connectors.InMemorySink;
import io.smallrye.reactive.messaging.connectors.InMemorySource;
import org.junit.jupiter.api.Test;

import javax.enterprise.inject.Any;
import javax.inject.Inject;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@QuarkusTest
@QuarkusTestResource(MyReactiveMessagingApplicationTest.InMemoryChannelTestResource.class)
class MyReactiveMessagingApplicationTest {

@Inject
@Any
InMemoryConnector connector;

@Test
void test() {
InMemorySource<String> source = connector.source("source-in");
InMemorySink<String> uppercase = connector.sink("uppercase-out");

source.send("Hello");
source.send("In-memory");
source.send("Connectors");

assertEquals(3, uppercase.received().size());
assertTrue(uppercase.received().stream().anyMatch(message -> message.getPayload().equals("HELLO")));
assertTrue(uppercase.received().stream().anyMatch(message -> message.getPayload().equals("IN-MEMORY")));
assertTrue(uppercase.received().stream().anyMatch(message -> message.getPayload().equals("CONNECTORS")));

}

public static class InMemoryChannelTestResource implements QuarkusTestResourceLifecycleManager {

@Override
public Map<String, String> start() {
Map<String, String> env = new HashMap<>();
env.putAll(InMemoryConnector.switchIncomingChannelsToInMemory("source-in"));
env.putAll(InMemoryConnector.switchOutgoingChannelsToInMemory("uppercase-out"));
return env;
}

@Override
public void stop() {
InMemoryConnector.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ metadata:
- "web"
- "reactive"
- "messaging"
status: "experimental"
status: "experimental"
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ metadata:
- "mp.messaging."
- "quarkus.reactive-messaging."
- "quarkus.amqp."
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ metadata:
- "mp.messaging."
- "quarkus.reactive-messaging."
- "quarkus.kafka."
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ metadata:
config:
- "mp.messaging."
- "quarkus.reactive-messaging."
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class QuarkusCodestartBuildIT extends PlatformAwareTestBase {

private static final Path testDirPath = Paths.get("target/quarkus-codestart-build-test");

private static final Set<String> EXCLUDED = Sets.newHashSet("spring-web-codestart", "picocli-codestart");
private static final Set<String> EXCLUDED = Sets.newHashSet("spring-web-codestart", "picocli-codestart",
"reactive-messaging");

@BeforeAll
static void setUp() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.devtools.codestarts.quarkus;

import static io.quarkus.devtools.codestarts.quarkus.QuarkusCodestartCatalog.Language.JAVA;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.devtools.testing.codestarts.QuarkusCodestartTest;
import io.quarkus.maven.ArtifactKey;

public class ReactiveMessagingCodestartIT {

@RegisterExtension
public static QuarkusCodestartTest codestartTest = QuarkusCodestartTest.builder()
.codestarts("reactive-messaging")
.extension(ArtifactKey.fromString("io.quarkus:quarkus-smallrye-reactive-messaging-kafka"))
.extension(ArtifactKey.fromString("io.quarkus:quarkus-smallrye-reactive-messaging-amqp"))
.extension(ArtifactKey.fromString("io.quarkus:quarkus-smallrye-reactive-messaging-mqtt"))
.languages(JAVA)
.build();

@Test
void testContent() throws Throwable {
codestartTest.checkGeneratedSource("org.acme.MyReactiveMessagingApplication");
// TODO : the generated application.properties is empty
codestartTest.assertThatGeneratedFileMatchSnapshot(JAVA, "src/main/resources/application.properties");
// TODO : the test class doesn't work using the inner class InMemoryChannelTestResource
codestartTest.checkGeneratedTestSource("org.acme.MyReactiveMessagingApplicationTest");
}

@Test
void buildAll() throws Throwable {
codestartTest.buildAllProjects();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package ilove.quark.us;

import io.quarkus.runtime.StartupEvent;
import org.eclipse.microprofile.reactive.messaging.*;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.stream.Stream;

@ApplicationScoped
public class MyReactiveMessagingApplication {

@Inject
@Channel("source-out")
Emitter<String> emitter;

/** Sends message to the source channel, can be used from a JAX-RS resource or any bean of your application **/
void onStart(@Observes StartupEvent ev) {
Stream.of("Hello", "with", "SmallRye", "reactive", "message").forEach(string -> emitter.send(string));
}

/** Consume the message from the source channel, uppercase it and send it to the uppercase channel **/
@Incoming("source-in")
@Outgoing("uppercase-out")
public Message<String> toUpperCase(Message<String> message) {
return message.withPayload(message.getPayload().toUpperCase());
}

/** Consume the uppercase channel and print the message **/
@Incoming("uppercase-in")
public void sink(String word) {
System.out.println(">> " + word);
}
}
Loading

0 comments on commit e97bdb8

Please sign in to comment.