Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve and Update BatchAutoConfiguration #38234

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@

import javax.sql.DataSource;

import org.springframework.batch.core.configuration.ListableJobLocator;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.support.DefaultBatchConfiguration;
import org.springframework.batch.core.converter.JobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.ExecutionContextSerializer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.dao.DefaultExecutionContextSerializer;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.ExitCodeGenerator;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand Down Expand Up @@ -95,20 +93,6 @@ public JobExecutionExitCodeGenerator jobExecutionExitCodeGenerator() {
return new JobExecutionExitCodeGenerator();
}

@Bean
@ConditionalOnMissingBean(JobOperator.class)
public SimpleJobOperator jobOperator(ObjectProvider<JobParametersConverter> jobParametersConverter,
JobExplorer jobExplorer, JobLauncher jobLauncher, ListableJobLocator jobRegistry,
JobRepository jobRepository) {
SimpleJobOperator factory = new SimpleJobOperator();
factory.setJobExplorer(jobExplorer);
factory.setJobLauncher(jobLauncher);
factory.setJobRegistry(jobRegistry);
factory.setJobRepository(jobRepository);
jobParametersConverter.ifAvailable(factory::setJobParametersConverter);
return factory;
}

@Configuration(proxyBeanMethods = false)
static class SpringBootBatchConfiguration extends DefaultBatchConfiguration {

Expand All @@ -120,13 +104,18 @@ static class SpringBootBatchConfiguration extends DefaultBatchConfiguration {

private final List<BatchConversionServiceCustomizer> batchConversionServiceCustomizers;

private final ExecutionContextSerializer executionContextSerializer;

SpringBootBatchConfiguration(DataSource dataSource, @BatchDataSource ObjectProvider<DataSource> batchDataSource,
PlatformTransactionManager transactionManager, BatchProperties properties,
ObjectProvider<BatchConversionServiceCustomizer> batchConversionServiceCustomizers) {
ObjectProvider<BatchConversionServiceCustomizer> batchConversionServiceCustomizers,
ObjectProvider<ExecutionContextSerializer> executionContextSerializer) {
this.dataSource = batchDataSource.getIfAvailable(() -> dataSource);
this.transactionManager = transactionManager;
this.properties = properties;
this.batchConversionServiceCustomizers = batchConversionServiceCustomizers.orderedStream().toList();
this.executionContextSerializer = executionContextSerializer
.getIfAvailable(DefaultExecutionContextSerializer::new);
}

@Override
Expand Down Expand Up @@ -160,6 +149,11 @@ protected ConfigurableConversionService getConversionService() {
return conversionService;
}

@Override
protected ExecutionContextSerializer getExecutionContextSerializer() {
return this.executionContextSerializer;
}

}

@Configuration(proxyBeanMethods = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -230,7 +229,8 @@ private JobParameters getNextJobParameters(Job job, JobParameters jobParameters)
private JobParameters getNextJobParametersForExisting(Job job, JobParameters jobParameters) {
JobExecution lastExecution = this.jobRepository.getLastJobExecution(job.getName(), jobParameters);
if (isStoppedOrFailed(lastExecution) && job.isRestartable()) {
JobParameters previousIdentifyingParameters = getGetIdentifying(lastExecution.getJobParameters());
JobParameters previousIdentifyingParameters = new JobParameters(
lastExecution.getJobParameters().getIdentifyingParameters());
return merge(previousIdentifyingParameters, jobParameters);
}
return jobParameters;
Expand All @@ -241,16 +241,6 @@ private boolean isStoppedOrFailed(JobExecution execution) {
return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED);
}

private JobParameters getGetIdentifying(JobParameters parameters) {
HashMap<String, JobParameter<?>> nonIdentifying = new LinkedHashMap<>(parameters.getParameters().size());
parameters.getParameters().forEach((key, value) -> {
if (value.isIdentifying()) {
nonIdentifying.put(key, value);
}
});
return new JobParameters(nonIdentifying);
}

private JobParameters merge(JobParameters parameters, JobParameters additionals) {
Map<String, JobParameter<?>> merged = new LinkedHashMap<>();
merged.putAll(parameters.getParameters());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.job.AbstractJob;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.ExecutionContextSerializer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.dao.DefaultExecutionContextSerializer;
import org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
Expand Down Expand Up @@ -110,8 +114,11 @@ void testDefaultContext() {
this.contextRunner.withInitializer(ConditionEvaluationReportLoggingListener.forLogLevel(LogLevel.INFO))
.withUserConfiguration(TestConfiguration.class, EmbeddedDataSourceConfiguration.class)
.run((context) -> {
assertThat(context).hasSingleBean(JobRepository.class);
assertThat(context).hasSingleBean(JobLauncher.class);
assertThat(context).hasSingleBean(JobExplorer.class);
assertThat(context).hasSingleBean(JobRegistry.class);
assertThat(context).hasSingleBean(JobOperator.class);
assertThat(context.getBean(BatchProperties.class).getJdbc().getInitializeSchema())
.isEqualTo(DatabaseInitializationMode.EMBEDDED);
assertThat(new JdbcTemplate(context.getBean(DataSource.class))
Expand Down Expand Up @@ -458,6 +465,27 @@ void whenTheUserDefinesAJobNameThatDoesNotExistWithRegisteredJobFailsFast() {
.withMessage("No job found with name 'three'");
}

@Test
void customExecutionContextSerializerIsUsed() {
this.contextRunner.withUserConfiguration(TestConfiguration.class, EmbeddedDataSourceConfiguration.class)
.withUserConfiguration(CustomExecutionContextConfiguration.class)
.run((context) -> {
assertThat(context).hasSingleBean(Jackson2ExecutionContextStringSerializer.class);
assertThat(context.getBean(SpringBootBatchConfiguration.class).getExecutionContextSerializer())
.isInstanceOf(Jackson2ExecutionContextStringSerializer.class);
});
}

@Test
void defaultExecutionContextSerializerIsUsed() {
this.contextRunner.withUserConfiguration(TestConfiguration.class, EmbeddedDataSourceConfiguration.class)
.run((context) -> {
assertThat(context).doesNotHaveBean(ExecutionContextSerializer.class);
assertThat(context.getBean(SpringBootBatchConfiguration.class).getExecutionContextSerializer())
.isInstanceOf(DefaultExecutionContextSerializer.class);
});
}

private JobLauncherApplicationRunner createInstance(String... registeredJobNames) {
JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(mock(JobLauncher.class),
mock(JobExplorer.class), mock(JobRepository.class));
Expand Down Expand Up @@ -773,4 +801,14 @@ BatchConversionServiceCustomizer anotherBatchConversionServiceCustomizer() {

}

@Configuration(proxyBeanMethods = false)
static class CustomExecutionContextConfiguration {

@Bean
ExecutionContextSerializer executionContextSerializer() {
return new Jackson2ExecutionContextStringSerializer();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ Consider the following command:

This provides only one argument to the batch job: `someParameter=someValue`.

=== Restarting a stopped or failed Job

In order to restart a failed `Job`, all parameters (identifying and non-identifying) have to be re-specified on the command line. Non-identifying parameters are *not* copied from the previous execution. This allows them to be modified or removed.

NOTE: When using a custom `JobParametersIncrementer`: Be prepared to gather all parameters managed by the incrementer in order to restart a failed execution.

[[howto.batch.storing-job-repository]]
=== Storing the Job Repository
Expand Down