Skip to content

Commit

Permalink
feat(amqp): differentiate between exchange and queue (#1014)
Browse files Browse the repository at this point in the history
* feat(amqp): differentiate between exchange and queue

* feat(jms): handle routing key for exchange

* feat(amqp): update deps

* test(amqp): update e2e channel name
  • Loading branch information
timonback authored Oct 18, 2024
1 parent baf8f70 commit 6810a81
Show file tree
Hide file tree
Showing 28 changed files with 704 additions and 409 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
import io.github.springwolf.asyncapi.v3.bindings.ChannelBinding;
import io.github.springwolf.asyncapi.v3.bindings.MessageBinding;
import io.github.springwolf.asyncapi.v3.bindings.OperationBinding;
import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject;

import java.util.Map;

public interface BindingFactory<T> {
default String getChannelId(T annotation) {
return ReferenceUtil.toValidId(getChannelName(annotation));
}

String getChannelName(T annotation);

Map<String, ChannelBinding> buildChannelBinding(T annotation);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.core.asyncapi.scanners.common;

import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.channel.message.MessageObject;
import io.github.springwolf.asyncapi.v3.model.channel.message.MessageReference;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,14 +27,13 @@ public static Map<String, MessageReference> toMessagesMap(Set<MessageObject> mes
return toMessageReferences(messages, aggregator);
}

public static Map<String, MessageReference> toOperationsMessagesMap(
String channelName, Set<MessageObject> messages) {
if (channelName == null || channelName.isBlank()) {
throw new IllegalArgumentException("channelName must not be empty");
public static Map<String, MessageReference> toOperationsMessagesMap(String channelId, Set<MessageObject> messages) {
if (channelId == null || channelId.isBlank()) {
throw new IllegalArgumentException("channelId must not be empty");
}

Function<MessageObject, MessageReference> aggregator = (message) ->
MessageReference.toChannelMessage(ReferenceUtil.toValidId(channelName), message.getMessageId());
Function<MessageObject, MessageReference> aggregator =
(message) -> MessageReference.toChannelMessage(channelId, message.getMessageId());
return toMessageReferences(messages, aggregator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public Map<String, MessageReference> buildMessages(
.collect(toSet());

if (messageType == MessageType.OPERATION) {
String channelName = bindingFactory.getChannelName(classAnnotation);
return toOperationsMessagesMap(channelName, messages);
String channelId = bindingFactory.getChannelName(classAnnotation);
return toOperationsMessagesMap(channelId, messages);
}
return toMessagesMap(messages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package io.github.springwolf.core.asyncapi.scanners.common.operation;

import io.github.springwolf.asyncapi.v3.bindings.OperationBinding;
import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.channel.ChannelReference;
import io.github.springwolf.asyncapi.v3.model.channel.message.MessageObject;
import io.github.springwolf.asyncapi.v3.model.channel.message.MessageReference;
Expand Down Expand Up @@ -30,7 +29,7 @@ public Operation buildOperation(
MessageObject message = springAnnotationMessageService.buildMessage(annotation, payloadType, headerSchema);
Map<String, OperationBinding> operationBinding = bindingFactory.buildOperationBinding(annotation);
Map<String, OperationBinding> opBinding = operationBinding != null ? new HashMap<>(operationBinding) : null;
String channelId = ReferenceUtil.toValidId(bindingFactory.getChannelName(annotation));
String channelId = bindingFactory.getChannelId(annotation);

return Operation.builder()
.action(OperationAction.RECEIVE)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.core.asyncapi.scanners.operations.annotations;

import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.operation.Operation;
import io.github.springwolf.asyncapi.v3.model.operation.OperationAction;
import io.github.springwolf.core.asyncapi.scanners.bindings.BindingFactory;
Expand Down Expand Up @@ -44,8 +43,7 @@ private Stream<Map.Entry<String, Operation>> mapClassToOperation(
Class<?> component, Set<MethodAndAnnotation<MethodAnnotation>> annotatedMethods) {
ClassAnnotation classAnnotation = AnnotationUtil.findFirstAnnotationOrThrow(classAnnotationClass, component);

String channelName = bindingFactory.getChannelName(classAnnotation);
String channelId = ReferenceUtil.toValidId(channelName);
String channelId = bindingFactory.getChannelId(classAnnotation);
String operationId =
StringUtils.joinWith("_", channelId, OperationAction.RECEIVE.type, component.getSimpleName());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.core.asyncapi.scanners.operations.annotations;

import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.operation.Operation;
import io.github.springwolf.asyncapi.v3.model.operation.OperationAction;
import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject;
Expand Down Expand Up @@ -44,8 +43,7 @@ public Stream<Map.Entry<String, Operation>> scan(Class<?> clazz) {
private Map.Entry<String, Operation> mapMethodToOperation(MethodAndAnnotation<MethodAnnotation> method) {
MethodAnnotation annotation = AnnotationUtil.findFirstAnnotationOrThrow(methodAnnotationClass, method.method());

String channelName = bindingFactory.getChannelName(annotation);
String channelId = ReferenceUtil.toValidId(channelName);
String channelId = bindingFactory.getChannelId(annotation);
String operationId = StringUtils.joinWith(
"_", channelId, OperationAction.RECEIVE.type, method.method().getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SpringAnnotationOperationServiceTest {
@BeforeEach
void setUp() {
// when
when(bindingFactory.getChannelName(any())).thenReturn(CHANNEL_ID);
when(bindingFactory.getChannelId(any())).thenReturn(CHANNEL_ID);
doReturn(defaultOperationBinding).when(bindingFactory).buildOperationBinding(any());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ class SpringAnnotationClassLevelOperationsScannerTest {
springAnnotationOperationsService,
List.of(operationCustomizer));

private static final String CHANNEL_NAME = "test-channel";
private static final String CHANNEL_NAME_ID = "test-channel";

private static final Map<String, MessageBinding> defaultMessageBinding =
Map.of("protocol", new AMQPMessageBinding());

@BeforeEach
void setUp() {
when(bindingFactory.getChannelName(any())).thenReturn(CHANNEL_NAME);
when(bindingFactory.getChannelId(any())).thenReturn(CHANNEL_NAME_ID);
}

@Test
Expand All @@ -56,7 +56,7 @@ void scan() {
scanner.scan(ClassWithTestListenerAnnotation.class).toList();

// then
String operationName = CHANNEL_NAME + "_receive_ClassWithTestListenerAnnotation";
String operationName = CHANNEL_NAME_ID + "_receive_ClassWithTestListenerAnnotation";
assertThat(operations).containsExactly(Map.entry(operationName, operation));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ class SpringAnnotationMethodLevelOperationsScannerTest {
springAnnotationOperationService,
List.of(operationCustomizer));

private static final String CHANNEL_NAME = "test-channel";
private static final String CHANNEL_ID = "test-channel";

@BeforeEach
void setUp() {
when(bindingFactory.getChannelName(any())).thenReturn(CHANNEL_NAME);
when(bindingFactory.getChannelId(any())).thenReturn(CHANNEL_ID);
}

@Test
Expand All @@ -56,7 +56,7 @@ void scan_componentHasTestListenerMethods() {
scanner.scan(ClassWithTestListenerAnnotation.class).toList();

// then
String operationName = CHANNEL_NAME + "_receive_methodWithAnnotation";
String operationName = CHANNEL_ID + "_receive_methodWithAnnotation";
assertThat(operations).containsExactly(Map.entry(operationName, operation));
}

Expand Down
4 changes: 2 additions & 2 deletions springwolf-examples/e2e/tests/publishing.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ function testPublishingEveryChannelItem() {
messageTitle === "Message" || // Unable to instantiate ExamplePayloadProtobufDto$Message class
messageTitle === "VehicleBase" || // Unable to publish abstract class for discriminator demo
messageTitle.startsWith("GenericPayload") || // Unable to publish generic payload (amqp)
channelName === "#" || // Publishing through amqp exchange is not supported, see GH-366
channelName === "example-topic-routing-key" // Publishing through amqp exchange is not supported, see GH-366
channelName === "CRUD-topic-exchange-2" || // Publishing through amqp exchange is not supported, see GH-366
channelName === "example-topic-exchange_example-topic-routing-key" // Publishing through amqp exchange is not supported, see GH-366
) {
return; // skip
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package io.github.springwolf.examples.amqp.configuration;

import io.github.springwolf.examples.amqp.AmqpConstants;
import io.github.springwolf.examples.amqp.dtos.AnotherPayloadDto;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
Expand Down Expand Up @@ -32,11 +33,6 @@ public Queue anotherQueue() {
return new Queue(AmqpConstants.QUEUE_ANOTHER_QUEUE, false);
}

@Bean
public Queue exampleBindingsQueue() {
return new Queue(AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE, false, false, true);
}

@Bean
public Queue queueRead() {
return new Queue(AmqpConstants.QUEUE_READ, false);
Expand All @@ -52,6 +48,17 @@ public Queue multiPayloadQueue() {
return new Queue(AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE);
}

/**
* Defined by @RabbitListener annotation in {@link io.github.springwolf.examples.amqp.consumers.ExampleConsumer#bindingsExample(AnotherPayloadDto)}
*/
@Bean
public Queue exampleBindingsQueue() {
return new Queue(AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE, false, false, true);
}

/**
* Defined by @RabbitListener annotation in {@link io.github.springwolf.examples.amqp.consumers.ExampleConsumer#bindingsExample(AnotherPayloadDto)}
*/
@Bean
public Binding exampleTopicBinding(Queue exampleBindingsQueue, Exchange exampleTopicExchange) {
return BindingBuilder.bind(exampleBindingsQueue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void receiveAnotherPayload(AnotherPayloadDto payload) {
autoDelete = "true"),
key = AmqpConstants.ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY)
})
public void bindingsExample(AnotherPayloadDto payload) {
public void bindingsExample(ExamplePayloadDto payload) {
log.info(
"Received new message in {}" + " through exchange {}" + " using routing key {}: {}",
AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE,
Expand Down Expand Up @@ -112,7 +112,7 @@ public void bindingsUpdate(Message message, @Payload GenericPayloadDto<ExamplePa
log.info(
"Received new message {} in {} (GenericPayloadDto<ExamplePayloadDto>): {}",
message,
AmqpConstants.QUEUE_UPDATE,
AmqpConstants.EXCHANGE_CRUD_TOPIC_EXCHANGE_1,
payload.toString());
}

Expand All @@ -130,7 +130,7 @@ public void bindingsRead(Message message, @Payload ExamplePayloadDto payload) {
log.info(
"Received new message {} in {} (ExamplePayloadDto): {}",
message,
AmqpConstants.QUEUE_UPDATE,
AmqpConstants.EXCHANGE_CRUD_TOPIC_EXCHANGE_2,
payload.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ void testContextWithApplicationProperties() {
void testAllChannelsAreFound() {
assertThat(asyncApiService.getAsyncAPI().getChannels().keySet())
.containsExactlyInAnyOrder(
"#",
"CRUD-topic-exchange-1",
"CRUD-topic-exchange-2",
"another-queue",
"example-bindings-queue",
"example-queue",
"example-topic-exchange",
"example-topic-routing-key",
"example-topic-exchange_example-topic-routing-key",
"multi-payload-queue",
"queue-create",
"queue-delete",
Expand Down
Loading

0 comments on commit 6810a81

Please sign in to comment.