diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfiguration.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfiguration.java index 418ef194af..2588cbc167 100644 --- a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfiguration.java +++ b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfiguration.java @@ -16,17 +16,24 @@ package org.springframework.cloud.dataflow.composedtaskrunner; +import javax.sql.DataSource; + import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties; import org.springframework.cloud.task.configuration.EnableTask; import org.springframework.cloud.task.listener.TaskExecutionListener; import org.springframework.cloud.task.repository.TaskExplorer; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.transaction.PlatformTransactionManager; /** * Configures the Job that will execute the Composed Task Execution. @@ -69,4 +76,25 @@ public TaskExecutor taskExecutor(ComposedTaskProperties properties) { return taskExecutor; } + /** + * Provides the {@link JobRepository} that is configured to be used by the composed task runner. + */ + @Bean + public BeanPostProcessor jobRepositoryBeanPostProcessor(PlatformTransactionManager transactionManager, + DataSource incrementerDataSource, + ComposedTaskProperties composedTaskProperties) { + return new JobRepositoryBeanPostProcessor(transactionManager, incrementerDataSource, composedTaskProperties); + } + + /** + * Provides the {@link JobLauncher} that is configured to be used by the composed task runner. + * @param context A {@link ApplicationContext} is used to obtain the properties for the {@link JobLauncher}. + * This is because BeanPostProcessors do not process beans that are included in the bean params of the {@link BeanPostProcessor}. + * @return {@link BeanPostProcessor} + */ + @Bean + public BeanPostProcessor jobLauncherBeanPostProcessor(ApplicationContext context) { + return new JobLauncherBeanPostProcessor(context); + } + } diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/JobLauncherBeanPostProcessor.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/JobLauncherBeanPostProcessor.java new file mode 100644 index 0000000000..b48d9c7bbe --- /dev/null +++ b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/JobLauncherBeanPostProcessor.java @@ -0,0 +1,74 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.composedtaskrunner; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.configuration.BatchConfigurationException; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.context.ApplicationContext; +import org.springframework.core.Ordered; +import org.springframework.core.task.TaskExecutor; + + +/** + * CTR requires the use of the {@link java.util.concurrent.ThreadPoolExecutor}. + * As of Batch 5.x DefaultBatchConfiguration is now used to override default beans, however this disables + * BatchAutoConfiguration. To work around this CTR creates its own {@link JobLauncher} that uses this {@link TaskExecutor}. + * + * @author Glenn Renfro + */ +public class JobLauncherBeanPostProcessor implements BeanPostProcessor, Ordered { + private static final Logger logger = LoggerFactory.getLogger(JobLauncherBeanPostProcessor.class); + + private ApplicationContext context; + + public JobLauncherBeanPostProcessor(ApplicationContext context) { + this.context = context; + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if (beanName.equals("jobLauncher")) { + logger.debug("Replacing BatchAutoConfiguration's jobLauncher Bean with one provided by composed task runner."); + bean = jobLauncher(context.getBean("jobRepository", JobRepository.class), + context.getBean("taskExecutor", TaskExecutor.class)); + } + return bean; + } + + private JobLauncher jobLauncher(JobRepository jobRepository, TaskExecutor taskExecutor) { + TaskExecutorJobLauncher taskExecutorJobLauncher = new TaskExecutorJobLauncher(); + taskExecutorJobLauncher.setJobRepository(jobRepository); + taskExecutorJobLauncher.setTaskExecutor(taskExecutor); + try { + taskExecutorJobLauncher.afterPropertiesSet(); + return taskExecutorJobLauncher; + } catch (Exception e) { + throw new BatchConfigurationException("Unable to configure the default job launcher", e); + } + } +} diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/JobRepositoryBeanPostProcessor.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/JobRepositoryBeanPostProcessor.java new file mode 100644 index 0000000000..99c544cf2b --- /dev/null +++ b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/JobRepositoryBeanPostProcessor.java @@ -0,0 +1,84 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.composedtaskrunner; + +import javax.sql.DataSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties; +import org.springframework.cloud.dataflow.composedtaskrunner.support.ComposedTaskException; +import org.springframework.cloud.dataflow.core.database.support.MultiSchemaIncrementerFactory; +import org.springframework.core.Ordered; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * CTR requires that the JobRepository that it uses to have its own {@link MultiSchemaIncrementerFactory}. + * As of Batch 5.x DefaultBatchConfiguration is now used to override default beans, however this disables + * BatchAutoConfiguration. To work around this we use a bean post processor to create our own {@link JobRepository}. + * + * @author Glenn Renfro + */ +public class JobRepositoryBeanPostProcessor implements BeanPostProcessor, Ordered { + private static final Logger logger = LoggerFactory.getLogger(JobRepositoryBeanPostProcessor.class); + + private PlatformTransactionManager transactionManager; + private DataSource incrementerDataSource; + private ComposedTaskProperties composedTaskProperties; + + public JobRepositoryBeanPostProcessor(PlatformTransactionManager transactionManager, DataSource incrementerDataSource, + ComposedTaskProperties composedTaskProperties) { + this.transactionManager = transactionManager; + this.incrementerDataSource = incrementerDataSource; + this.composedTaskProperties = composedTaskProperties; + } + + @Override + public int getOrder() { + return Ordered.HIGHEST_PRECEDENCE; + } + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if (beanName.equals("jobRepository")) { + logger.debug("Replacing BatchAutoConfiguration's jobRepository Bean with one provided by composed task runner."); + bean = jobRepository(transactionManager, incrementerDataSource, composedTaskProperties); + } + return bean; + } + + private JobRepository jobRepository(PlatformTransactionManager transactionManager, DataSource incrementerDataSource, + ComposedTaskProperties composedTaskProperties) { + JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); + MultiSchemaIncrementerFactory incrementerFactory = new MultiSchemaIncrementerFactory(incrementerDataSource); + factory.setIncrementerFactory(incrementerFactory); + factory.setDataSource(incrementerDataSource); + factory.setTransactionManager(transactionManager); + factory.setIsolationLevelForCreate(composedTaskProperties.getTransactionIsolationLevel()); + try { + factory.afterPropertiesSet(); + return factory.getObject(); + } + catch (Exception exception) { + throw new ComposedTaskException(exception.getMessage()); + } + } +}