Skip to content

Commit

Permalink
Add BiConsumer to spring cloud stream plugin (springwolf#1077)
Browse files Browse the repository at this point in the history
* Add BiConsumer to spring cloud stream plugin
  • Loading branch information
LeovR authored and ruskaof committed Nov 20, 2024
1 parent fc9488c commit f7e3bae
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.core.asyncapi.scanners.beans;

import io.github.springwolf.core.asyncapi.scanners.classes.spring.ConfigurationClassScanner;
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ComponentClassScanner;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.AnnotationUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
Expand All @@ -16,11 +16,11 @@
@RequiredArgsConstructor
public class DefaultBeanMethodsScanner implements BeanMethodsScanner {

private final ConfigurationClassScanner configurationClassScanner;
private final ComponentClassScanner componentClassScanner;

@Override
public Set<Method> getBeanMethods() {
return configurationClassScanner.scan().stream()
return componentClassScanner.scan().stream()
.map(Class::getDeclaredMethods)
.map(Arrays::asList)
.flatMap(List::stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.github.springwolf.core.asyncapi.scanners.channels.annotations.AsyncAnnotationMethodLevelChannelsScanner;
import io.github.springwolf.core.asyncapi.scanners.classes.SpringwolfClassScanner;
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ComponentClassScanner;
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ConfigurationClassScanner;
import io.github.springwolf.core.asyncapi.scanners.common.AsyncAnnotationProvider;
import io.github.springwolf.core.asyncapi.scanners.common.channel.AsyncAnnotationChannelService;
import io.github.springwolf.core.asyncapi.scanners.common.message.AsyncAnnotationMessageService;
Expand Down Expand Up @@ -55,14 +54,8 @@ public ComponentClassScanner componentClassScanner(

@Bean
@ConditionalOnMissingBean
public ConfigurationClassScanner configurationClassScanner(ComponentClassScanner componentClassScanner) {
return new ConfigurationClassScanner(componentClassScanner);
}

@Bean
@ConditionalOnMissingBean
public BeanMethodsScanner beanMethodsScanner(ConfigurationClassScanner configurationClassScanner) {
return new DefaultBeanMethodsScanner(configurationClassScanner);
public BeanMethodsScanner beanMethodsScanner(ComponentClassScanner componentClassScanner) {
return new DefaultBeanMethodsScanner(componentClassScanner);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.core.asyncapi.scanners.beans;

import io.github.springwolf.core.asyncapi.scanners.classes.spring.ConfigurationClassScanner;
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ComponentClassScanner;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -26,11 +26,11 @@ class DefaultBeanMethodsScannerIntegrationTest {
private DefaultBeanMethodsScanner beanMethodsScanner;

@MockBean
private ConfigurationClassScanner configurationClassScanner;
private ComponentClassScanner componentClassScanner;

@Test
void name() {
when(configurationClassScanner.scan()).thenReturn(Set.of(ConfigurationClass.class));
when(componentClassScanner.scan()).thenReturn(Set.of(ConfigurationClass.class));

Set<String> beanMethods = beanMethodsScanner.getBeanMethods().stream()
.map(Method::getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -33,6 +35,12 @@ public Consumer<AnotherPayloadDto> consumerMethod() {
return input -> log.info("Received new message in another-topic: {}", input.toString());
}

@Bean
public BiConsumer<AnotherPayloadDto, Map<String, Object>> biConsumerMethod() {
return (input, headers) ->
log.info("Received new message in biconsumer-topic: {}. Headers {}.", input.toString(), headers);
}

@GooglePubSubAsyncChannelBinding(
schemaSettings = @GooglePubSubAsyncSchemaSetting(encoding = "BINARY", name = "project/test"))
@GooglePubSubAsyncMessageBinding(schema = @GooglePubSubAsyncMessageSchema(name = "project/test"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ spring.cloud.stream.default-binder=kafka
spring.cloud.stream.binders.kafka.type=kafka
spring.cloud.stream.binders.kafka.environment.spring.kafka.bootstrap-servers=${spring.kafka.bootstrap-servers}
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.function.definition=process;consumerMethod;consumerClass;googlePubSubConsumerMethod
spring.cloud.function.definition=process;consumerMethod;consumerClass;googlePubSubConsumerMethod;biConsumerMethod

spring.cloud.stream.bindings.process-in-0.destination=example-topic
spring.cloud.stream.bindings.process-in-0.group=springwolf
Expand All @@ -19,6 +19,9 @@ spring.cloud.stream.bindings.process-out-0.destination=another-topic
spring.cloud.stream.bindings.consumerMethod-in-0.destination=consumer-topic
spring.cloud.stream.bindings.consumerMethod-in-0.group=springwolf

spring.cloud.stream.bindings.biConsumerMethod-in-0.destination=biconsumer-topic
spring.cloud.stream.bindings.biConsumerMethod-in-0.group=springwolf

spring.cloud.stream.bindings.consumerClass-in-0.destination=consumer-class-topic
spring.cloud.stream.bindings.consumerClass-in-0.group=springwolf

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@
"kafka": { }
}
},
"biconsumer-topic": {
"address": "biconsumer-topic",
"messages": {
"AnotherPayloadDto": {
"$ref": "#/components/messages/AnotherPayloadDto"
}
},
"bindings": {
"kafka": { }
}
},
"consumer-class-topic": {
"address": "consumer-class-topic",
"messages": {
Expand Down Expand Up @@ -278,6 +289,21 @@
}
]
},
"biconsumer-topic_publish_biConsumerMethod": {
"action": "receive",
"channel": {
"$ref": "#/channels/biconsumer-topic"
},
"description": "Auto-generated description",
"bindings": {
"kafka": { }
},
"messages": [
{
"$ref": "#/channels/biconsumer-topic/messages/AnotherPayloadDto"
}
]
},
"consumer-class-topic_publish_ConsumerClass": {
"action": "receive",
"channel": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -24,7 +25,7 @@ public class FunctionalChannelBeanBuilder {
public Set<FunctionalChannelBeanData> build(AnnotatedElement element) {
Class<?> type = getRawType(element);

if (Consumer.class.isAssignableFrom(type)) {
if (Consumer.class.isAssignableFrom(type) || BiConsumer.class.isAssignableFrom(type)) {
Type payloadType = getTypeGenerics(element).get(0);
return Set.of(ofConsumer(element, payloadType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -163,6 +164,51 @@ void testConsumerBinding() {
assertThat(componentsService.getMessages()).contains(Map.entry(String.class.getName(), message));
}

@Test
void testBiConsumerBinding() {
// Given a binding "spring.cloud.stream.bindings.testBiConsumer-in-0.destination=test-consumer-input-topic"
BindingProperties testBiConsumerInBinding = new BindingProperties();
String topicName = "test-biconsumer-input-topic";
testBiConsumerInBinding.setDestination(topicName);
when(bindingServiceProperties.getBindings()).thenReturn(Map.of("testBiConsumer-in-0", testBiConsumerInBinding));

// When scan is called
Map<String, ChannelObject> actualChannels = channelsScanner.scan();
Map<String, Operation> actualOperations = operationsScanner.scan();

// Then the returned channels contain a ChannelItem with the correct data
MessageObject message = MessageObject.builder()
.name(String.class.getName())
.title("string")
.payload(MessagePayload.of(MultiFormatSchema.builder()
.schema(SchemaObject.builder().type(SchemaType.STRING).build())
.build()))
.headers(MessageHeaders.of(
MessageReference.toSchema(AsyncHeadersNotDocumented.NOT_DOCUMENTED.getTitle())))
.bindings(Map.of("kafka", new EmptyMessageBinding()))
.build();

ChannelObject expectedChannel = ChannelObject.builder()
.channelId(topicName)
.address(topicName)
.bindings(channelBinding)
.messages(Map.of(message.getMessageId(), MessageReference.toComponentMessage(message)))
.build();

Operation expectedOperation = Operation.builder()
.action(OperationAction.RECEIVE)
.bindings(operationBinding)
.description("Auto-generated description")
.channel(ChannelReference.fromChannel(topicName))
.messages(List.of(MessageReference.toChannelMessage(topicName, message)))
.build();

assertThat(actualChannels).containsExactly(Map.entry(topicName, expectedChannel));
assertThat(actualOperations)
.containsExactly(Map.entry("test-biconsumer-input-topic_publish_testBiConsumer", expectedOperation));
assertThat(componentsService.getMessages()).contains(Map.entry(String.class.getName(), message));
}

@Test
void testSupplierBinding() {
// Given a binding "spring.cloud.stream.bindings.testSupplier-out-0.destination=test-supplier-output-topic"
Expand Down Expand Up @@ -488,5 +534,10 @@ public Function<String, Integer> testFunction() {
public Function<KStream<Void, String>, KStream<Void, Integer>> kStreamTestFunction() {
return stream -> stream.mapValues(s -> 1);
}

@Bean
public BiConsumer<String, Map<String, Object>> testBiConsumer() {
return (value, header) -> System.out.println(value);
}
}
}

0 comments on commit f7e3bae

Please sign in to comment.