From fef31be70141947c236ffe1d0b85b8e3a56c89e5 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Fri, 15 Jul 2022 00:27:40 -0500 Subject: [PATCH] Add AOT support for binder child contexts Fixes #2455 --- .../binder/BinderChildContextInitializer.java | 167 +++++++++++++ .../stream/binder/DefaultBinderFactory.java | 220 ++++++------------ .../cloud/stream/binding/BindingService.java | 8 +- .../config/BindingServiceConfiguration.java | 13 +- .../function/FunctionConfiguration.java | 6 +- .../stream/binding/BindingServiceTests.java | 8 +- 6 files changed, 267 insertions(+), 155 deletions(-) create mode 100644 core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/BinderChildContextInitializer.java diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/BinderChildContextInitializer.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/BinderChildContextInitializer.java new file mode 100644 index 0000000000..cf665bf6c4 --- /dev/null +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/BinderChildContextInitializer.java @@ -0,0 +1,167 @@ +/* + * Copyright 2022-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.lang.model.element.Modifier; + +import org.apache.commons.logging.LogFactory; + +import org.springframework.aot.generate.GeneratedMethod; +import org.springframework.aot.generate.GenerationContext; +import org.springframework.aot.generate.MethodReference; +import org.springframework.beans.factory.aot.BeanRegistrationAotContribution; +import org.springframework.beans.factory.aot.BeanRegistrationAotProcessor; +import org.springframework.beans.factory.aot.BeanRegistrationCode; +import org.springframework.beans.factory.aot.BeanRegistrationExcludeFilter; +import org.springframework.beans.factory.support.RegisteredBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.aot.ApplicationContextAotGenerator; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.core.log.LogAccessor; +import org.springframework.javapoet.ClassName; +import org.springframework.util.Assert; + +/** + * @author Chris Bono + * @since 4.0 + */ +public class BinderChildContextInitializer implements ApplicationContextAware, BeanRegistrationAotProcessor, BeanRegistrationExcludeFilter { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + private DefaultBinderFactory binderFactory; + private final Map> childContextInitializers; + private volatile ConfigurableApplicationContext context; + + public BinderChildContextInitializer() { + this.childContextInitializers = new HashMap<>(); + } + + public BinderChildContextInitializer( + Map> childContextInitializers) { + this.childContextInitializers = childContextInitializers; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) { + Assert.isInstanceOf(ConfigurableApplicationContext.class, applicationContext); + this.context = (ConfigurableApplicationContext) applicationContext; + } + + public void setBinderFactory(DefaultBinderFactory binderFactory) { + Assert.notNull(binderFactory, () -> "binderFactory must be non-null"); + this.binderFactory = binderFactory; + if (!this.childContextInitializers.isEmpty()) { + this.logger.debug(() -> "Setting binder child context initializers on binder factory"); + this.binderFactory.setBinderChildContextInitializers(this.childContextInitializers); + } + } + + @Override + public boolean isExcluded(RegisteredBean registeredBean) { + return false; + } + + @Override + public BeanRegistrationAotContribution processAheadOfTime(RegisteredBean registeredBean) { + if (registeredBean.getBeanClass().equals(getClass())) { //&& registeredBean.getBeanFactory().equals(this.context)) { + this.logger.debug(() -> "Beginning AOT processing for binder child contexts"); + ensureBinderFactoryIsSet(); + Map binderConfigurations = this.binderFactory.getBinderConfigurations(); + Map binderChildContexts = binderConfigurations.entrySet().stream() + .map(e -> Map.entry(e.getKey(), binderFactory.createBinderContextForAOT(e.getKey()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return new BinderChildContextAotContribution(binderChildContexts); + } + return null; + } + + private void ensureBinderFactoryIsSet() { + if (this.binderFactory == null) { + Assert.notNull(this.context, () -> "Unable to lookup binder factory from context as this.context is null"); + this.binderFactory = this.context.getBean(DefaultBinderFactory.class); + } + } + + /** + * Callback for AOT generated {@link BinderChildContextAotContribution#applyTo(GenerationContext, BeanRegistrationCode) + * post-process method} which basically swaps the instance with one that uses the AOT generated child context + * initializers. + * @param childContextInitializers the child context initializers to use + * @return copy of this instance that uses the AOT generated child context initializers + */ + @SuppressWarnings({"unused", "raw"}) + public BinderChildContextInitializer withChildContextInitializers( + Map> childContextInitializers) { + this.logger.debug(() -> "Replacing instance w/ one that uses; child context initializers"); + Map> downcastedInitializers = + childContextInitializers.entrySet().stream() + .map(e -> Map.entry(e.getKey(), (ApplicationContextInitializer) e.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return new BinderChildContextInitializer(downcastedInitializers); + } + + /** + * An AOT contribution that generates a application context initializers that can be used at runtime to populate + * the child binder contexts. + */ + private static class BinderChildContextAotContribution implements BeanRegistrationAotContribution { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + private final Map childContexts; + + BinderChildContextAotContribution(Map childContexts) { + this.childContexts = childContexts.entrySet().stream() + .filter(e -> GenericApplicationContext.class.isInstance(e.getValue())) + .map(e -> Map.entry(e.getKey(), GenericApplicationContext.class.cast(e.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public void applyTo(GenerationContext generationContext, BeanRegistrationCode beanRegistrationCode) { + ApplicationContextAotGenerator aotGenerator = new ApplicationContextAotGenerator(); + GeneratedMethod postProcessorMethod = beanRegistrationCode.getMethodGenerator() + .generateMethod("addChildContextInitializers").using(builder -> { + builder.addJavadoc("Use AOT child context initialization"); + builder.addModifiers(Modifier.PRIVATE, Modifier.STATIC); + builder.addParameter(RegisteredBean.class, "registeredBean"); + builder.addParameter(BinderChildContextInitializer.class, "instance"); + builder.returns(BinderChildContextInitializer.class); + builder.addStatement("$T> initializers = new $T<>()", Map.class, + ApplicationContextInitializer.class, ConfigurableApplicationContext.class, HashMap.class); + this.childContexts.forEach((name, context) -> { + this.logger.debug(() -> "Generating AOT child context initializer for " + name); + GenerationContext childGenerationContext = generationContext.withName(name + "Binder"); + ClassName initializerClassName = aotGenerator.generateApplicationContext(context, childGenerationContext); + builder.addStatement("$T initializer = new $L()", ApplicationContextInitializer.class, + ConfigurableApplicationContext.class, initializerClassName); + builder.addStatement("initializers.put($S, initializer)", name); + }); + builder.addStatement("return instance.withChildContextInitializers(initializers)"); + }); + beanRegistrationCode.addInstancePostProcessor( + MethodReference.ofStatic(beanRegistrationCode.getClassName(), postProcessorMethod.getName())); + } + } + +} diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java index bf60f3efba..0c7a203367 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java @@ -33,14 +33,10 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.DisposableBean; -import org.springframework.boot.Banner.Mode; -import org.springframework.boot.WebApplicationType; -import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; import org.springframework.cloud.stream.config.ListenerContainerCustomizer; import org.springframework.cloud.stream.config.SpelExpressionConverterConfiguration; -import org.springframework.cloud.stream.config.SpelExpressionConverterConfiguration.SpelConverter; import org.springframework.cloud.stream.reflection.GenericsUtils; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -50,10 +46,8 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.support.GenericApplicationContext; -import org.springframework.core.convert.support.GenericConversionService; import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.MapPropertySource; -import org.springframework.core.env.StandardEnvironment; import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.converter.MessageConverter; @@ -72,6 +66,7 @@ * @author Soby Chacko * @author Artem Bilan * @author Anshul Mehra + * @author Chris Bono */ public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware { @@ -81,9 +76,10 @@ public class DefaultBinderFactory implements BinderFactory, DisposableBean, Appl private final Map, ConfigurableApplicationContext>> binderInstanceCache = new HashMap<>(); - private final Map defaultBinderForBindingTargetType = new HashMap<>(); + private final Map> binderChildContextInitializers = new HashMap<>(); + private final BinderTypeRegistry binderTypeRegistry; private final BinderCustomizer binderCustomizer; @@ -117,8 +113,7 @@ public void setListeners(Collection listeners) { @Override public void destroy() { - this.binderInstanceCache.values().stream().map(Entry::getValue) - .forEach(ConfigurableApplicationContext::close); + this.binderInstanceCache.values().stream().map(Entry::getValue).forEach(ConfigurableApplicationContext::close); this.defaultBinderForBindingTargetType.clear(); } @@ -259,21 +254,28 @@ private boolean verifyBinderTypeMatchesTarget(Binder binderInstance @SuppressWarnings("unchecked") private Binder getBinderInstance(String configurationName) { if (!this.binderInstanceCache.containsKey(configurationName)) { - logger.info("Creating binder: " + configurationName); - BinderConfiguration binderConfiguration = this.binderConfigurations - .get(configurationName); - Assert.state(binderConfiguration != null, - "Unknown binder configuration: " + configurationName); - BinderType binderType = this.binderTypeRegistry - .get(binderConfiguration.getBinderType()); - Assert.notNull(binderType, "Binder type " - + binderConfiguration.getBinderType() + " is not defined"); - + this.logger.info("Creating binder: " + configurationName); + BinderConfiguration binderConfiguration = this.binderConfigurations.get(configurationName); + Assert.state(binderConfiguration != null, "Unknown binder configuration: " + configurationName); + BinderType binderType = this.binderTypeRegistry.get(binderConfiguration.getBinderType()); + Assert.notNull(binderType, "Binder type " + binderConfiguration.getBinderType() + " is not defined"); Map binderProperties = new HashMap<>(); this.flatten(null, binderConfiguration.getProperties(), binderProperties); - ConfigurableApplicationContext binderProducingContext = - this.initializeBinderContextSimple(configurationName, binderProperties, binderType, binderConfiguration); + ConfigurableApplicationContext binderProducingContext; + if (this.binderChildContextInitializers.containsKey(configurationName)) { + this.logger.info("Using AOT pre-prepared initializer to construct binder child context for " + configurationName); + binderProducingContext = this.initializeBinderContextSimple(configurationName, binderProperties, + binderType, binderConfiguration, false); + GenericApplicationContext c = null; + this.binderChildContextInitializers.get(configurationName).initialize(binderProducingContext); + binderProducingContext.refresh(); + } + else { + this.logger.info("Constructing binder child context for " + configurationName); + binderProducingContext = this.initializeBinderContextSimple(configurationName, binderProperties, + binderType, binderConfiguration, true); + } Map messageConverters = binderProducingContext.getBeansOfType(MessageConverter.class); if (!CollectionUtils.isEmpty(messageConverters) && !ObjectUtils.isEmpty(context.getBeansOfType(FunctionCatalog.class))) { @@ -302,99 +304,61 @@ private Binder getBinderInstance( this.binderInstanceCache.put(configurationName, new SimpleImmutableEntry<>(binder, binderProducingContext)); } - logger.info("Retrieving cached binder: " + configurationName); + logger.trace("Retrieving cached binder: " + configurationName); return (Binder) this.binderInstanceCache .get(configurationName).getKey(); } - @SuppressWarnings("unused") - private ConfigurableApplicationContext initializeBinderContextBoot(String configurationName, Map binderProperties, - BinderType binderType, BinderConfiguration binderConfiguration) { - // Convert all properties to arguments, so that they receive maximum -// // precedence - ArrayList args = new ArrayList<>(); - for (Map.Entry property : binderProperties.entrySet()) { - args.add( - String.format("--%s=%s", property.getKey(), property.getValue())); - } - // Initialize the domain with a unique name based on the bootstrapping context - // setting - ConfigurableEnvironment environment = this.context != null - ? this.context.getEnvironment() : null; - String defaultDomain = environment != null - ? environment.getProperty("spring.jmx.default-domain") : ""; - args.add("--spring.jmx.default-domain=" + defaultDomain + "binder." - + configurationName); - - SpringApplicationBuilder springApplicationBuilder = new SpringApplicationBuilder( - binderType.getConfigurationClasses()) - .bannerMode(Mode.OFF).logStartupInfo(false) - .web(WebApplicationType.NONE); - // If the environment is not customized and a main context is available, we - // will set the latter as parent. - // This ensures that the defaults and user-defined customizations (e.g. custom - // connection factory beans) - // are propagated to the binder context. If the environment is customized, - // then the binder context should - // not inherit any beans from the parent - boolean useApplicationContextAsParent = binderProperties.isEmpty() - && this.context != null; - - if (useApplicationContextAsParent) { - springApplicationBuilder.parent(this.context); - } - else { - this.customizeParentChildContextRelationship(springApplicationBuilder, this.context); - springApplicationBuilder.listeners(new ApplicationListener() { - @Override - public void onApplicationEvent(ApplicationEvent event) { - if (context != null) { - try { - context.publishEvent(event); - } - catch (Exception e) { - logger.warn("Failed to publish " + event, e); - } - } - } - }); - } - // If the current application context is not set as parent and the environment - // is set, - // provide the current context as an additional bean in the BeanFactory. - if (environment != null && !useApplicationContextAsParent) { - springApplicationBuilder - .initializers(new InitializerWithOuterContext(this.context)); - } - if (environment != null && (useApplicationContextAsParent - || binderConfiguration.isInheritEnvironment())) { - StandardEnvironment binderEnvironment = new StandardEnvironment(); - binderEnvironment.merge(environment); - // See ConfigurationPropertySources.ATTACHED_PROPERTY_SOURCE_NAME - binderEnvironment.getPropertySources().remove("configurationProperties"); - /* - * Ensure that the web mode is set to NONE despite what the - * parent application context says. - * https://github.com/spring-cloud/spring-cloud-stream/issues/1708 - */ - binderEnvironment.getPropertySources() - .addFirst(new MapPropertySource("defaultBinderFactoryProperties", - Collections.singletonMap("spring.main.web-application-type", "NONE"))); + /** + * @return map of binder name to binder configuration + */ + Map getBinderConfigurations() { + return this.binderConfigurations; + } - springApplicationBuilder.environment(binderEnvironment); - } + /** + * Creates a binder child application context that can be used by AOT for pre-generation. + * @param configurationName binder configuration name + * @return binder child application context that has not been refreshed + */ + @SuppressWarnings("rawtypes") + ConfigurableApplicationContext createBinderContextForAOT(String configurationName) { + logger.info("Pre-creating binder child context (AOT) for " + configurationName); + BinderConfiguration binderConfiguration = this.binderConfigurations.get(configurationName); + Assert.state(binderConfiguration != null, "Unknown binder configuration: " + configurationName); + BinderType binderType = this.binderTypeRegistry.get(binderConfiguration.getBinderType()); + Assert.notNull(binderType, "Binder type " + binderConfiguration.getBinderType() + " is not defined"); + Map binderProperties = new HashMap<>(); + this.flatten(null, binderConfiguration.getProperties(), binderProperties); + return initializeBinderContextSimple(configurationName, binderProperties, binderType, binderConfiguration, false); + } - ConfigurableApplicationContext binderProducingContext = springApplicationBuilder - .run(args.toArray(new String[0])); - return binderProducingContext; + /** + * Sets the initializers to use when populating a binder child application context. + *

+ * This is useful for the AOT scenario where the child binder contexts have been pre-generated into the form of an + * application context initializer. + * + * @param binderChildContextInitializers map of binder configuration name to initializer for the binder child context + */ + void setBinderChildContextInitializers(Map> binderChildContextInitializers) { + this.binderChildContextInitializers.clear(); + this.binderChildContextInitializers.putAll(binderChildContextInitializers); } + /** + * Creates and optionally refreshes a binder child application context. + * @param configurationName binder configuration name + * @param binderProperties binder properties + * @param binderType binder type + * @param binderConfiguration binder configuration + * @param refresh whether to refresh the context + * @return refreshed binder child application context + */ @SuppressWarnings("rawtypes") - private ConfigurableApplicationContext initializeBinderContextSimple(String configurationName, Map binderProperties, - BinderType binderType, BinderConfiguration binderConfiguration) { - //======= NEW CODE - + ConfigurableApplicationContext initializeBinderContextSimple(String configurationName, Map binderProperties, + BinderType binderType, BinderConfiguration binderConfiguration, boolean refresh) { AnnotationConfigApplicationContext binderProducingContext = new AnnotationConfigApplicationContext(); List sourceClasses = new ArrayList<>(); @@ -475,45 +439,12 @@ public void onApplicationEvent(ApplicationEvent event) { Collections.singletonMap("spring.main.web-application-type", "NONE"))); } } - binderProducingContext.refresh(); - return binderProducingContext; - } - /* - * This will propagate/copy ListenerContainerCustomizer(s) from parent context to child context for cases when multiple binders are used. - * It will also register SpelConverter with child context - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - private void customizeParentChildContextRelationship(SpringApplicationBuilder applicationBuilder, ApplicationContext context) { - if (context != null) { - Map customizers = context.getBeansOfType(ListenerContainerCustomizer.class); - applicationBuilder.initializers(childContext -> { - if (!CollectionUtils.isEmpty(customizers)) { - for (Entry customizerEntry : customizers.entrySet()) { - ListenerContainerCustomizer customizerWrapper = new ListenerContainerCustomizer() { - @Override - public void configure(Object container, String destinationName, String group) { - try { - customizerEntry.getValue().configure(container, destinationName, group); - } - catch (Exception e) { - logger.warn("Failed while applying ListenerContainerCustomizer. In situations when multiple " - + "binders are used this is expected, since a particular customizer may not be applicable" - + "to a particular binder. Customizer: " + customizerEntry.getValue() - + " Binder: " + childContext.getBean(AbstractMessageChannelBinder.class), e); - } - } - }; - - ((GenericApplicationContext) childContext).registerBean(customizerEntry.getKey(), - ListenerContainerCustomizer.class, () -> customizerWrapper); - } - } - GenericConversionService cs = (GenericConversionService) ((GenericApplicationContext) childContext).getBeanFactory().getConversionService(); - SpelConverter spelConverter = new SpelConverter(); - cs.addConverter(spelConverter); - }); + if (refresh) { + binderProducingContext.refresh(); } + + return binderProducingContext; } /** @@ -524,12 +455,10 @@ public void configure(Object container, String destinationName, String group) { * @param flattenedProperties map to which we'll add the falttened property */ @SuppressWarnings("unchecked") - private void flatten(String propertyName, Object value, - Map flattenedProperties) { + private void flatten(String propertyName, Object value, Map flattenedProperties) { if (value instanceof Map) { ((Map) value).forEach((k, v) -> flatten( - (propertyName != null ? propertyName + "." : "") + k, v, - flattenedProperties)); + (propertyName != null ? propertyName + "." : "") + k, v, flattenedProperties)); } else { flattenedProperties.put(propertyName, value.toString()); @@ -571,5 +500,4 @@ public void initialize(ConfigurableApplicationContext applicationContext) { } } - } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingService.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingService.java index 56c9f262fd..4d43eb06ed 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingService.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingService.java @@ -16,10 +16,10 @@ package org.springframework.cloud.stream.binding; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -61,6 +61,7 @@ * @author Janne Valkealahti * @author Soby Chacko * @author Michael Michailidis + * @author Chris Bono */ public class BindingService { @@ -404,9 +405,8 @@ private void validate(Object properties) { } private void scheduleTask(Runnable task) { - Date d = new Date(System.currentTimeMillis() - + this.bindingServiceProperties.getBindingRetryInterval() * 1_000); - this.taskScheduler.schedule(task, d.toInstant()); + this.taskScheduler.schedule(task, Instant.ofEpochMilli(System.currentTimeMillis() + + this.bindingServiceProperties.getBindingRetryInterval() * 1_000)); } private void assertNotIllegalException(RuntimeException exception) diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java index a4d3dcc35b..e7cd48a774 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java @@ -35,6 +35,7 @@ import org.springframework.boot.autoconfigure.condition.SearchStrategy; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.source.ConfigurationPropertyName; +import org.springframework.cloud.stream.binder.BinderChildContextInitializer; import org.springframework.cloud.stream.binder.BinderConfiguration; import org.springframework.cloud.stream.binder.BinderCustomizer; import org.springframework.cloud.stream.binder.BinderFactory; @@ -77,6 +78,7 @@ * @author Artem Bilan * @author Oleg Zhurakousky * @author Soby Chacko + * @author Chris Bono */ @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties({ BindingServiceProperties.class, @@ -174,17 +176,24 @@ public BindingHandlerAdvise BindingHandlerAdvise( @Bean @ConditionalOnMissingBean(BinderFactory.class) - public BinderFactory binderFactory(BinderTypeRegistry binderTypeRegistry, + public DefaultBinderFactory binderFactory(BinderTypeRegistry binderTypeRegistry, BindingServiceProperties bindingServiceProperties, - ObjectProvider binderCustomizerProvider) { + ObjectProvider binderCustomizerProvider, + BinderChildContextInitializer binderChildContextInitializer) { DefaultBinderFactory binderFactory = new DefaultBinderFactory( getBinderConfigurations(binderTypeRegistry, bindingServiceProperties), binderTypeRegistry, binderCustomizerProvider.getIfUnique()); binderFactory.setDefaultBinder(bindingServiceProperties.getDefaultBinder()); binderFactory.setListeners(this.binderFactoryListeners); + binderChildContextInitializer.setBinderFactory(binderFactory); return binderFactory; } + @Bean + public BinderChildContextInitializer binderChildContextInitializer() { + return new BinderChildContextInitializer(); + } + @Bean // This conditional is intentionally not in an autoconfig (usually a bad idea) because // it is used to detect a BindingService in the parent context (which we know diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java index ffd447506a..62e8a15e57 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java @@ -123,6 +123,7 @@ * @author David Turanski * @author Ilayaperumal Gopinathan * @author Soby Chacko + * @author Chris Bono * @since 2.1 */ @Configuration(proxyBeanMethods = false) @@ -230,7 +231,10 @@ public void afterPropertiesSet() throws Exception { if (!functionProperties.isComposeFrom() && !functionProperties.isComposeTo()) { String integrationFlowName = proxyFactory.getFunctionDefinition() + "_integrationflow"; - PollableBean pollable = extractPollableAnnotation(functionProperties, context, proxyFactory); + + // Add back once https://github.com/spring-projects/spring-framework/issues/28748 is fixed + // PollableBean pollable = extractPollableAnnotation(functionProperties, context, proxyFactory); + PollableBean pollable = null; if (functionWrapper != null) { IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(functionWrapper, context, producerProperties), diff --git a/core/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/BindingServiceTests.java b/core/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/BindingServiceTests.java index 115d5eb593..b51d9e8a01 100644 --- a/core/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/BindingServiceTests.java +++ b/core/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/BindingServiceTests.java @@ -41,6 +41,7 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; import org.springframework.cloud.stream.binder.Binder; +import org.springframework.cloud.stream.binder.BinderChildContextInitializer; import org.springframework.cloud.stream.binder.BinderConfiguration; import org.springframework.cloud.stream.binder.BinderFactory; import org.springframework.cloud.stream.binder.BinderType; @@ -86,6 +87,7 @@ * @author Janne Valkealahti * @author Soby Chacko * @author Michael Michailidis + * @author Chris Bono */ public class BindingServiceTests { @@ -437,7 +439,8 @@ void testUnrecognizedBinderAllowedIfNotUsed() { BindingServiceProperties bindingServiceProperties = createBindingServiceProperties( properties); BinderFactory binderFactory = new BindingServiceConfiguration() - .binderFactory(createMockBinderTypeRegistry(), bindingServiceProperties, Mockito.mock(ObjectProvider.class)); + .binderFactory(createMockBinderTypeRegistry(), bindingServiceProperties, Mockito.mock(ObjectProvider.class), + Mockito.mock(BinderChildContextInitializer.class)); BindingService bindingService = new BindingService(bindingServiceProperties, binderFactory, new ObjectMapper()); bindingService.bindConsumer(new DirectChannel(), "input"); @@ -457,7 +460,8 @@ void testUnrecognizedBinderDisallowedIfUsed() { BindingServiceProperties bindingServiceProperties = createBindingServiceProperties( properties); BinderFactory binderFactory = new BindingServiceConfiguration() - .binderFactory(createMockBinderTypeRegistry(), bindingServiceProperties, Mockito.mock(ObjectProvider.class)); + .binderFactory(createMockBinderTypeRegistry(), bindingServiceProperties, Mockito.mock(ObjectProvider.class), + Mockito.mock(BinderChildContextInitializer.class)); BindingService bindingService = new BindingService(bindingServiceProperties, binderFactory, new ObjectMapper()); bindingService.bindConsumer(new DirectChannel(), "input");