Skip to content

Commit

Permalink
Use FunctionCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
LeovR committed Nov 15, 2024
1 parent c5eab5c commit d8264d6
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 408 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
import io.github.springwolf.asyncapi.v3.model.server.Server;
import io.github.springwolf.core.asyncapi.components.ComponentsService;
import io.github.springwolf.core.asyncapi.scanners.ChannelsScanner;
import io.github.springwolf.core.asyncapi.scanners.beans.BeanMethodsScanner;
import io.github.springwolf.core.asyncapi.scanners.bindings.channels.ChannelBindingProcessor;
import io.github.springwolf.core.asyncapi.scanners.bindings.messages.MessageBindingProcessor;
import io.github.springwolf.core.asyncapi.scanners.channels.ChannelMerger;
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ComponentClassScanner;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.AsyncAnnotationUtil;
import io.github.springwolf.core.asyncapi.scanners.common.headers.AsyncHeadersNotDocumented;
import io.github.springwolf.core.asyncapi.scanners.common.payload.PayloadSchemaObject;
Expand All @@ -30,11 +28,12 @@
import io.github.springwolf.plugins.cloudstream.asyncapi.scanners.common.FunctionalChannelBeanData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.stream.config.BindingServiceProperties;

import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Type;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -44,22 +43,18 @@
public class CloudStreamFunctionChannelsScanner implements ChannelsScanner {

private final AsyncApiDocketService asyncApiDocketService;
private final BeanMethodsScanner beanMethodsScanner;
private final ComponentClassScanner componentClassScanner;
private final ComponentsService componentsService;
private final PayloadService payloadService;
private final BindingServiceProperties cloudStreamBindingsProperties;
private final FunctionalChannelBeanBuilder functionalChannelBeanBuilder;
private final FunctionCatalog functionCatalog;
protected final List<ChannelBindingProcessor> channelBindingProcessors;
protected final List<MessageBindingProcessor> messageBindingProcessors;

@Override
public Map<String, ChannelObject> scan() {
Set<AnnotatedElement> elements = new HashSet<>();
elements.addAll(componentClassScanner.scan());
elements.addAll(beanMethodsScanner.getBeanMethods());

List<ChannelObject> channels = elements.stream()
List<ChannelObject> channels = functionCatalog.getNames(null).stream()
.<SimpleFunctionRegistry.FunctionInvocationWrapper>map(functionCatalog::lookup)
.map(functionalChannelBeanBuilder::build)
.flatMap(Set::stream)
.filter(this::isChannelBean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,122 +3,66 @@

import io.github.springwolf.core.asyncapi.scanners.common.payload.internal.TypeExtractor;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;

import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

@RequiredArgsConstructor
public class FunctionalChannelBeanBuilder {
private final TypeExtractor typeExtractor;

public Set<FunctionalChannelBeanData> build(AnnotatedElement element) {
Class<?> type = getRawType(element);

if (Consumer.class.isAssignableFrom(type)) {
Type payloadType = getTypeGenerics(element).get(0);
return Set.of(ofConsumer(element, payloadType));
}

if (Supplier.class.isAssignableFrom(type)) {
Type payloadType = getTypeGenerics(element).get(0);
return Set.of(ofSupplier(element, payloadType));
}

if (Function.class.isAssignableFrom(type)) {
Type inputType = getTypeGenerics(element).get(0);
Type outputType = getTypeGenerics(element).get(1);

return Set.of(ofConsumer(element, inputType), ofSupplier(element, outputType));
public Set<FunctionalChannelBeanData> build(SimpleFunctionRegistry.FunctionInvocationWrapper wrapper) {
if (wrapper.isConsumer() || wrapper.isWrappedBiConsumer()) {
Type payloadType = getTypeGenerics(wrapper.getInputType()).get(0);
return Set.of(ofConsumer(wrapper, payloadType));
} else if (wrapper.isSupplier()) {
Type payloadType = getTypeGenerics(wrapper.getOutputType()).get(0);
return Set.of(ofSupplier(wrapper, payloadType));
} else if (wrapper.isFunction()) {
Type inputType = getTypeGenerics(wrapper.getInputType()).get(0);
Type outputType = getTypeGenerics(wrapper.getOutputType()).get(0);
return Set.of(ofConsumer(wrapper, inputType), ofSupplier(wrapper, outputType));
}

return Collections.emptySet();
}

private static Class<?> getRawType(AnnotatedElement element) {
if (element instanceof Method m) {
return m.getReturnType();
}

if (element instanceof Class<?> c) {
return c;
}

throw new IllegalArgumentException("Must be a Method or Class");
}

private static FunctionalChannelBeanData ofConsumer(AnnotatedElement element, Type payloadType) {
String name = getElementName(element);
private static FunctionalChannelBeanData ofConsumer(
SimpleFunctionRegistry.FunctionInvocationWrapper wrapper, Type payloadType) {
String name = wrapper.getFunctionDefinition();
String cloudStreamBinding = firstCharToLowerCase(name) + "-in-0";
return new FunctionalChannelBeanData(
name, element, payloadType, FunctionalChannelBeanData.BeanType.CONSUMER, cloudStreamBinding);
name,
wrapper.getTarget().getClass(),
payloadType,
FunctionalChannelBeanData.BeanType.CONSUMER,
cloudStreamBinding);
}

private static FunctionalChannelBeanData ofSupplier(AnnotatedElement element, Type payloadType) {
String name = getElementName(element);
private static FunctionalChannelBeanData ofSupplier(
SimpleFunctionRegistry.FunctionInvocationWrapper wrapper, Type payloadType) {
String name = wrapper.getFunctionDefinition();
String cloudStreamBinding = firstCharToLowerCase(name) + "-out-0";
return new FunctionalChannelBeanData(
name, element, payloadType, FunctionalChannelBeanData.BeanType.SUPPLIER, cloudStreamBinding);
name,
wrapper.getTarget().getClass(),
payloadType,
FunctionalChannelBeanData.BeanType.SUPPLIER,
cloudStreamBinding);
}

private static String firstCharToLowerCase(String name) {
return name.substring(0, 1).toLowerCase() + name.substring(1);
}

private static String getElementName(AnnotatedElement element) {
if (element instanceof Method m) {
return m.getName();
}

if (element instanceof Class<?> c) {
return c.getSimpleName();
}

throw new IllegalArgumentException("Must be a Method or Class");
}

private List<Type> getTypeGenerics(AnnotatedElement element) {
if (element instanceof Method m) {
ParameterizedType genericReturnType = (ParameterizedType) m.getGenericReturnType();
return getTypeGenerics(genericReturnType);
}

if (element instanceof Class<?> c) {
return getTypeGenerics(c);
private List<Type> getTypeGenerics(Type type) {
if (type instanceof ParameterizedType) {
return List.of(typeExtractor.extractActualType(type));
}

throw new IllegalArgumentException("Must be a Method or Class");
}

private List<Type> getTypeGenerics(Class<?> c) {
Predicate<Class<?>> isConsumerPredicate = Consumer.class::isAssignableFrom;
Predicate<Class<?>> isSupplierPredicate = Supplier.class::isAssignableFrom;
Predicate<Class<?>> isFunctionPredicate = Function.class::isAssignableFrom;
Predicate<Class<?>> hasFunctionalInterfacePredicate =
isConsumerPredicate.or(isSupplierPredicate).or(isFunctionPredicate);

return Arrays.stream(c.getGenericInterfaces())
.filter(type -> type instanceof ParameterizedType)
.map(type -> (ParameterizedType) type)
.filter(type -> type.getRawType() instanceof Class<?>)
.filter(type -> hasFunctionalInterfacePredicate.test((Class<?>) type.getRawType()))
.map(this::getTypeGenerics)
.findFirst()
.orElse(Collections.emptyList());
}

private List<Type> getTypeGenerics(ParameterizedType parameterizedType) {
return Arrays.stream(parameterizedType.getActualTypeArguments())
.map(typeExtractor::extractActualType)
.toList();
return List.of(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import io.github.springwolf.asyncapi.v3.model.server.Server;
import io.github.springwolf.core.asyncapi.components.ComponentsService;
import io.github.springwolf.core.asyncapi.scanners.OperationsScanner;
import io.github.springwolf.core.asyncapi.scanners.beans.BeanMethodsScanner;
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ComponentClassScanner;
import io.github.springwolf.core.asyncapi.scanners.common.headers.AsyncHeadersNotDocumented;
import io.github.springwolf.core.asyncapi.scanners.common.payload.PayloadSchemaObject;
import io.github.springwolf.core.asyncapi.scanners.common.payload.internal.PayloadService;
Expand All @@ -29,11 +27,11 @@
import io.github.springwolf.plugins.cloudstream.asyncapi.scanners.common.FunctionalChannelBeanData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.stream.config.BindingServiceProperties;

import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Type;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -43,20 +41,16 @@
public class CloudStreamFunctionOperationsScanner implements OperationsScanner {

private final AsyncApiDocketService asyncApiDocketService;
private final BeanMethodsScanner beanMethodsScanner;
private final ComponentClassScanner componentClassScanner;
private final ComponentsService componentsService;
private final PayloadService payloadService;
private final FunctionCatalog functionCatalog;
private final BindingServiceProperties cloudStreamBindingsProperties;
private final FunctionalChannelBeanBuilder functionalChannelBeanBuilder;

@Override
public Map<String, Operation> scan() {
Set<AnnotatedElement> elements = new HashSet<>();
elements.addAll(componentClassScanner.scan());
elements.addAll(beanMethodsScanner.getBeanMethods());

List<Map.Entry<String, Operation>> operations = elements.stream()
List<Map.Entry<String, Operation>> operations = functionCatalog.getNames(null).stream()
.<SimpleFunctionRegistry.FunctionInvocationWrapper>map(functionCatalog::lookup)
.map(functionalChannelBeanBuilder::build)
.flatMap(Set::stream)
.filter(this::isChannelBean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
package io.github.springwolf.plugins.cloudstream.configuration;

import io.github.springwolf.core.asyncapi.components.ComponentsService;
import io.github.springwolf.core.asyncapi.scanners.beans.BeanMethodsScanner;
import io.github.springwolf.core.asyncapi.scanners.bindings.channels.ChannelBindingProcessor;
import io.github.springwolf.core.asyncapi.scanners.bindings.messages.MessageBindingProcessor;
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ComponentClassScanner;
import io.github.springwolf.core.asyncapi.scanners.common.payload.internal.PayloadService;
import io.github.springwolf.core.asyncapi.scanners.common.payload.internal.TypeExtractor;
import io.github.springwolf.core.configuration.docket.AsyncApiDocketService;
Expand All @@ -15,6 +13,7 @@
import io.github.springwolf.plugins.cloudstream.asyncapi.scanners.operations.CloudStreamFunctionOperationsScanner;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;

Expand All @@ -30,41 +29,37 @@ public class SpringwolfCloudStreamAutoConfiguration {
@Bean
public CloudStreamFunctionChannelsScanner cloudStreamFunctionChannelsScanner(
AsyncApiDocketService asyncApiDocketService,
BeanMethodsScanner beanMethodsScanner,
ComponentClassScanner componentClassScanner,
ComponentsService componentsService,
PayloadService payloadService,
BindingServiceProperties cloudstreamBindingServiceProperties,
FunctionalChannelBeanBuilder functionalChannelBeanBuilder,
FunctionCatalog functionCatalog,
List<ChannelBindingProcessor> channelBindingProcessors,
List<MessageBindingProcessor> messageBindingProcessors) {
return new CloudStreamFunctionChannelsScanner(
asyncApiDocketService,
beanMethodsScanner,
componentClassScanner,
componentsService,
payloadService,
cloudstreamBindingServiceProperties,
functionalChannelBeanBuilder,
functionCatalog,
channelBindingProcessors,
messageBindingProcessors);
}

@Bean
public CloudStreamFunctionOperationsScanner cloudStreamFunctionOperationsScanner(
AsyncApiDocketService asyncApiDocketService,
BeanMethodsScanner beanMethodsScanner,
ComponentClassScanner componentClassScanner,
ComponentsService componentsService,
PayloadService payloadService,
FunctionCatalog functionCatalog,
BindingServiceProperties cloudstreamBindingServiceProperties,
FunctionalChannelBeanBuilder functionalChannelBeanBuilder) {
return new CloudStreamFunctionOperationsScanner(
asyncApiDocketService,
beanMethodsScanner,
componentClassScanner,
componentsService,
payloadService,
functionCatalog,
cloudstreamBindingServiceProperties,
functionalChannelBeanBuilder);
}
Expand Down
Loading

0 comments on commit d8264d6

Please sign in to comment.