Skip to content

Commit

Permalink
GH-869 - Support for externalizing events into a Spring Messaging Mes…
Browse files Browse the repository at this point in the history
…sageChannel.
  • Loading branch information
joshlong authored and odrotbohm committed Oct 13, 2024
1 parent 413e2f5 commit 41aad24
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 0 deletions.
1 change: 1 addition & 0 deletions spring-modulith-events/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<module>spring-modulith-events-kafka</module>
<module>spring-modulith-events-mongodb</module>
<module>spring-modulith-events-neo4j</module>
<module>spring-modulith-events-messaging</module>
</modules>

<profiles>
Expand Down
80 changes: 80 additions & 0 deletions spring-modulith-events/spring-modulith-events-messaging/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events</artifactId>
<version>1.3.0-SNAPSHOT</version>
</parent>

<name>Spring Modulith - Events - Spring Messaging support</name>
<artifactId>spring-modulith-events-messaging</artifactId>

<properties>
<module.name>org.springframework.modulith.events.messaging</module.name>
</properties>

<dependencies>

<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-api</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<optional>true</optional>
</dependency>

<!-- Test dependencies -->

<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-starter-jdbc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.messaging;

import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.expression.BeanFactoryResolver;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.modulith.events.EventExternalizationConfiguration;
import org.springframework.modulith.events.config.EventExternalizationAutoConfiguration;
import org.springframework.modulith.events.support.BrokerRouting;
import org.springframework.modulith.events.support.DelegatingEventExternalizer;

/**
* Auto-configuration to set up a {@link DelegatingEventExternalizer} to externalize events to a Spring Messaging
* {@link MessageChannel message channel}.
*
* @author Josh Long
*/
@AutoConfiguration
@AutoConfigureAfter(EventExternalizationAutoConfiguration.class)
@ConditionalOnClass(MessageChannel.class)
@ConditionalOnProperty(name = "spring.modulith.events.externalization.enabled",
havingValue = "true",
matchIfMissing = true)
class SpringMessagingEventExternalizerConfiguration {

private static final Logger logger = LoggerFactory.getLogger(SpringMessagingEventExternalizerConfiguration.class);

public static final String MODULITH_ROUTING_HEADER = "modulithRouting";

@Bean
DelegatingEventExternalizer springMessagingEventExternalizer(
EventExternalizationConfiguration configuration,
BeanFactory factory) {

logger.debug("Registering domain event externalization for Spring Messaging…");

var context = new StandardEvaluationContext();
context.setBeanResolver(new BeanFactoryResolver(factory));

return new DelegatingEventExternalizer(configuration, (target, payload) -> {
var routing = BrokerRouting.of(target, context);
var message = MessageBuilder
.withPayload(payload)
.setHeader(MODULITH_ROUTING_HEADER, routing)
.build();
if (logger.isDebugEnabled()) {
logger.info("trying to find a {} with name {}", MessageChannel.class.getName(), routing.getTarget());
}
var bean = factory.getBean(routing.getTarget(), MessageChannel.class);
bean.send(message);
return CompletableFuture.completedFuture(null);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* Messaging event externalization support.
*/
@org.springframework.lang.NonNullApi
package org.springframework.modulith.events.messaging;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.modulith.events.messaging.SpringMessagingEventExternalizerConfiguration
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.messaging;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;

import lombok.RequiredArgsConstructor;

import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.core.GenericHandler;
import org.springframework.integration.dsl.DirectChannelSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;
import org.springframework.modulith.events.CompletedEventPublications;
import org.springframework.modulith.events.Externalized;
import org.springframework.transaction.annotation.Transactional;

/**
* Integration tests for Spring Messaging-based event publication.
*
* @author Josh Long
*/
@SpringBootTest
class SpringMessagingEventPublicationIntegrationTests {

private static final String CHANNEL_NAME = "target";

private static final AtomicInteger COUNTER = new AtomicInteger();

@Autowired TestPublisher publisher;
@Autowired CompletedEventPublications completed;

@SpringBootApplication
static class TestConfiguration {

@Bean
TestPublisher testPublisher(ApplicationEventPublisher publisher) {
return new TestPublisher(publisher);
}

@Bean
IntegrationFlow inboundIntegrationFlow(
@Qualifier(CHANNEL_NAME) MessageChannel inbound) {

return IntegrationFlow
.from(inbound)
.handle((GenericHandler<TestEvent>) (payload, headers) -> {
COUNTER.incrementAndGet();
return null;
})
.get();
}

@Bean(value = CHANNEL_NAME)
DirectChannelSpec target() {
return MessageChannels.direct();
}

}

@Test
void publishesEventToSpringMessaging() throws Exception {
var publishes = 2;
for (var i = 0; i < publishes; i++) {
publisher.publishEvent();
}
Thread.sleep(200);
assertThat(COUNTER.get()).isEqualTo(publishes);
assertThat(completed.findAll()).hasSize(publishes);
}

@Externalized(CHANNEL_NAME)
static class TestEvent {}

@RequiredArgsConstructor
static class TestPublisher {

private final ApplicationEventPublisher events;

@Transactional
void publishEvent() {
events.publishEvent(new TestEvent());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
spring.modulith.events.jdbc.schema-initialization.enabled=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<property name="CONSOLE_LOG_PATTERN" value="%d{HH:mm:ss.SSS} %1.-1level - %8.8t : %m%n%wEx" />

<include resource="org/springframework/boot/logging/logback/defaults.xml" />
<include resource="org/springframework/boot/logging/logback/console-appender.xml" />

<root level="INFO">
<appender-ref ref="CONSOLE" />
</root>

<logger name="org.springframework.modulith" level="INFO" />
<logger name="org.springframework.messaging" level="INFO" />

</configuration>
6 changes: 6 additions & 0 deletions src/docs/antora/modules/ROOT/pages/events.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,12 @@ When routing key is set, requires SQS queue to be configured as a FIFO queue.
|Uses Spring Cloud AWS SNS support.
The logical routing key will be used as SNS message group id.
When routing key is set, requires SNS to be configured as a FIFO topic with content based deduplication enabled.
|Spring Messaging
|`spring-modulith-events-messaging`
|Uses Spring's core `Message` and `MessageChannel` support.
Resolves the target `MessageChannel` by its bean name given the `target` in the `Externalized` annotation. Forwards routing information as a header - called `modulithRouting` - to be processed in whatever way by downstream components, typically in a Spring Integration `IntegrationFlow`.
|===
[[externalization.fundamentals]]
Expand Down

0 comments on commit 41aad24

Please sign in to comment.