Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(amqp): differentiate between exchange and queue #1014

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
import io.github.springwolf.asyncapi.v3.bindings.ChannelBinding;
import io.github.springwolf.asyncapi.v3.bindings.MessageBinding;
import io.github.springwolf.asyncapi.v3.bindings.OperationBinding;
import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject;

import java.util.Map;

public interface BindingFactory<T> {
default String getChannelId(T annotation) {
return ReferenceUtil.toValidId(getChannelName(annotation));
}

String getChannelName(T annotation);

Map<String, ChannelBinding> buildChannelBinding(T annotation);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.core.asyncapi.scanners.common;

import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.channel.message.MessageObject;
import io.github.springwolf.asyncapi.v3.model.channel.message.MessageReference;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,14 +27,13 @@ public static Map<String, MessageReference> toMessagesMap(Set<MessageObject> mes
return toMessageReferences(messages, aggregator);
}

public static Map<String, MessageReference> toOperationsMessagesMap(
String channelName, Set<MessageObject> messages) {
if (channelName == null || channelName.isBlank()) {
throw new IllegalArgumentException("channelName must not be empty");
public static Map<String, MessageReference> toOperationsMessagesMap(String channelId, Set<MessageObject> messages) {
if (channelId == null || channelId.isBlank()) {
throw new IllegalArgumentException("channelId must not be empty");
}

Function<MessageObject, MessageReference> aggregator = (message) ->
MessageReference.toChannelMessage(ReferenceUtil.toValidId(channelName), message.getMessageId());
Function<MessageObject, MessageReference> aggregator =
(message) -> MessageReference.toChannelMessage(channelId, message.getMessageId());
return toMessageReferences(messages, aggregator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public Map<String, MessageReference> buildMessages(
.collect(toSet());

if (messageType == MessageType.OPERATION) {
String channelName = bindingFactory.getChannelName(classAnnotation);
return toOperationsMessagesMap(channelName, messages);
String channelId = bindingFactory.getChannelName(classAnnotation);
return toOperationsMessagesMap(channelId, messages);
}
return toMessagesMap(messages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package io.github.springwolf.core.asyncapi.scanners.common.operation;

import io.github.springwolf.asyncapi.v3.bindings.OperationBinding;
import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.channel.ChannelReference;
import io.github.springwolf.asyncapi.v3.model.channel.message.MessageObject;
import io.github.springwolf.asyncapi.v3.model.channel.message.MessageReference;
Expand Down Expand Up @@ -30,7 +29,7 @@ public Operation buildOperation(
MessageObject message = springAnnotationMessageService.buildMessage(annotation, payloadType, headerSchema);
Map<String, OperationBinding> operationBinding = bindingFactory.buildOperationBinding(annotation);
Map<String, OperationBinding> opBinding = operationBinding != null ? new HashMap<>(operationBinding) : null;
String channelId = ReferenceUtil.toValidId(bindingFactory.getChannelName(annotation));
String channelId = bindingFactory.getChannelId(annotation);

return Operation.builder()
.action(OperationAction.RECEIVE)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.core.asyncapi.scanners.operations.annotations;

import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.operation.Operation;
import io.github.springwolf.asyncapi.v3.model.operation.OperationAction;
import io.github.springwolf.core.asyncapi.scanners.bindings.BindingFactory;
Expand Down Expand Up @@ -44,8 +43,7 @@ private Stream<Map.Entry<String, Operation>> mapClassToOperation(
Class<?> component, Set<MethodAndAnnotation<MethodAnnotation>> annotatedMethods) {
ClassAnnotation classAnnotation = AnnotationUtil.findFirstAnnotationOrThrow(classAnnotationClass, component);

String channelName = bindingFactory.getChannelName(classAnnotation);
String channelId = ReferenceUtil.toValidId(channelName);
String channelId = bindingFactory.getChannelId(classAnnotation);
String operationId =
StringUtils.joinWith("_", channelId, OperationAction.RECEIVE.type, component.getSimpleName());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.core.asyncapi.scanners.operations.annotations;

import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.operation.Operation;
import io.github.springwolf.asyncapi.v3.model.operation.OperationAction;
import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject;
Expand Down Expand Up @@ -44,8 +43,7 @@ public Stream<Map.Entry<String, Operation>> scan(Class<?> clazz) {
private Map.Entry<String, Operation> mapMethodToOperation(MethodAndAnnotation<MethodAnnotation> method) {
MethodAnnotation annotation = AnnotationUtil.findFirstAnnotationOrThrow(methodAnnotationClass, method.method());

String channelName = bindingFactory.getChannelName(annotation);
String channelId = ReferenceUtil.toValidId(channelName);
String channelId = bindingFactory.getChannelId(annotation);
String operationId = StringUtils.joinWith(
"_", channelId, OperationAction.RECEIVE.type, method.method().getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SpringAnnotationOperationServiceTest {
@BeforeEach
void setUp() {
// when
when(bindingFactory.getChannelName(any())).thenReturn(CHANNEL_ID);
when(bindingFactory.getChannelId(any())).thenReturn(CHANNEL_ID);
doReturn(defaultOperationBinding).when(bindingFactory).buildOperationBinding(any());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ class SpringAnnotationClassLevelOperationsScannerTest {
springAnnotationOperationsService,
List.of(operationCustomizer));

private static final String CHANNEL_NAME = "test-channel";
private static final String CHANNEL_NAME_ID = "test-channel";

private static final Map<String, MessageBinding> defaultMessageBinding =
Map.of("protocol", new AMQPMessageBinding());

@BeforeEach
void setUp() {
when(bindingFactory.getChannelName(any())).thenReturn(CHANNEL_NAME);
when(bindingFactory.getChannelId(any())).thenReturn(CHANNEL_NAME_ID);
}

@Test
Expand All @@ -56,7 +56,7 @@ void scan() {
scanner.scan(ClassWithTestListenerAnnotation.class).toList();

// then
String operationName = CHANNEL_NAME + "_receive_ClassWithTestListenerAnnotation";
String operationName = CHANNEL_NAME_ID + "_receive_ClassWithTestListenerAnnotation";
assertThat(operations).containsExactly(Map.entry(operationName, operation));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ class SpringAnnotationMethodLevelOperationsScannerTest {
springAnnotationOperationService,
List.of(operationCustomizer));

private static final String CHANNEL_NAME = "test-channel";
private static final String CHANNEL_ID = "test-channel";

@BeforeEach
void setUp() {
when(bindingFactory.getChannelName(any())).thenReturn(CHANNEL_NAME);
when(bindingFactory.getChannelId(any())).thenReturn(CHANNEL_ID);
}

@Test
Expand All @@ -56,7 +56,7 @@ void scan_componentHasTestListenerMethods() {
scanner.scan(ClassWithTestListenerAnnotation.class).toList();

// then
String operationName = CHANNEL_NAME + "_receive_methodWithAnnotation";
String operationName = CHANNEL_ID + "_receive_methodWithAnnotation";
assertThat(operations).containsExactly(Map.entry(operationName, operation));
}

Expand Down
4 changes: 2 additions & 2 deletions springwolf-examples/e2e/tests/publishing.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ function testPublishingEveryChannelItem() {
messageTitle === "Message" || // Unable to instantiate ExamplePayloadProtobufDto$Message class
messageTitle === "VehicleBase" || // Unable to publish abstract class for discriminator demo
messageTitle.startsWith("GenericPayload") || // Unable to publish generic payload (amqp)
channelName === "#" || // Publishing through amqp exchange is not supported, see GH-366
channelName === "example-topic-routing-key" // Publishing through amqp exchange is not supported, see GH-366
channelName === "CRUD-topic-exchange-2" || // Publishing through amqp exchange is not supported, see GH-366
channelName === "example-topic-exchange_example-topic-routing-key" // Publishing through amqp exchange is not supported, see GH-366
) {
return; // skip
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package io.github.springwolf.examples.amqp.configuration;

import io.github.springwolf.examples.amqp.AmqpConstants;
import io.github.springwolf.examples.amqp.dtos.AnotherPayloadDto;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
Expand Down Expand Up @@ -32,11 +33,6 @@ public Queue anotherQueue() {
return new Queue(AmqpConstants.QUEUE_ANOTHER_QUEUE, false);
}

@Bean
public Queue exampleBindingsQueue() {
return new Queue(AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE, false, false, true);
}

@Bean
public Queue queueRead() {
return new Queue(AmqpConstants.QUEUE_READ, false);
Expand All @@ -52,6 +48,17 @@ public Queue multiPayloadQueue() {
return new Queue(AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE);
}

/**
* Defined by @RabbitListener annotation in {@link io.github.springwolf.examples.amqp.consumers.ExampleConsumer#bindingsExample(AnotherPayloadDto)}
*/
@Bean
public Queue exampleBindingsQueue() {
return new Queue(AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE, false, false, true);
}

/**
* Defined by @RabbitListener annotation in {@link io.github.springwolf.examples.amqp.consumers.ExampleConsumer#bindingsExample(AnotherPayloadDto)}
*/
@Bean
public Binding exampleTopicBinding(Queue exampleBindingsQueue, Exchange exampleTopicExchange) {
return BindingBuilder.bind(exampleBindingsQueue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void receiveAnotherPayload(AnotherPayloadDto payload) {
autoDelete = "true"),
key = AmqpConstants.ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY)
})
public void bindingsExample(AnotherPayloadDto payload) {
public void bindingsExample(ExamplePayloadDto payload) {
log.info(
"Received new message in {}" + " through exchange {}" + " using routing key {}: {}",
AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE,
Expand Down Expand Up @@ -112,7 +112,7 @@ public void bindingsUpdate(Message message, @Payload GenericPayloadDto<ExamplePa
log.info(
"Received new message {} in {} (GenericPayloadDto<ExamplePayloadDto>): {}",
message,
AmqpConstants.QUEUE_UPDATE,
AmqpConstants.EXCHANGE_CRUD_TOPIC_EXCHANGE_1,
payload.toString());
}

Expand All @@ -130,7 +130,7 @@ public void bindingsRead(Message message, @Payload ExamplePayloadDto payload) {
log.info(
"Received new message {} in {} (ExamplePayloadDto): {}",
message,
AmqpConstants.QUEUE_UPDATE,
AmqpConstants.EXCHANGE_CRUD_TOPIC_EXCHANGE_2,
payload.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ void testContextWithApplicationProperties() {
void testAllChannelsAreFound() {
assertThat(asyncApiService.getAsyncAPI().getChannels().keySet())
.containsExactlyInAnyOrder(
"#",
"CRUD-topic-exchange-1",
"CRUD-topic-exchange-2",
"another-queue",
"example-bindings-queue",
"example-queue",
"example-topic-exchange",
"example-topic-routing-key",
"example-topic-exchange_example-topic-routing-key",
"multi-payload-queue",
"queue-create",
"queue-delete",
Expand Down
Loading
Loading