-
Notifications
You must be signed in to change notification settings - Fork 56
Spring Batch
Spring Batch (http://projects.spring.io/spring-batch/ далее SB) - это библиотека, входящая в состав подпроектов Spring Framework, которая реализует инфраструктуру для выполнения пакетных фоновых заданий.
SB позволяет:
- декларативно описывать задания
- имеет в своём составе множество реализаций задач, адаптеров, "читателей", "писателей" и "процессоров" для различных технологий (JMS, AMQP, RDBMS, NoSQL и т.д. )
- разделять задачи на повторно используемые шаги
- имеет механизмы для запуска, останови и перезапуска задач
- позволяет настраивать пакетную обработку
- позволяет распараллеливать задачи
- позволяет повторять задачи или шаги (retry)
- реализует слежение за этапами выполнения задачи
- фиксирует результат выполнения задач
- позволяет настраивать ход выполнения задачи (например, если не выполнился некоторый шаг, то перейти на другой шаг)
- имеет в своём составе WEB-интерфейс для управления задачами
SB не умеет:
- Планировать задачи
- TODO!
Задачи в SB называются Job. Каждая задача состоит из этапов(шагов) выполнения. Задача может быть выполнена успешно только в том случае, если каждый шаг завершился успешно. Шаги могут выполняться несколько раз.
Каждый шаг(Step) осуществляет обработку данных. Обработка данных состоит из 3 этапов:
- Чтение (реализует чтение Reader)
- Обработки (реализует обработку Processor)
- Запись (реализует запись Writer)
Если шаг реализует какой-то другой механизм, то можно реализовать собственную реализацию (Tasklet).
На каждом этапе возможно слежение за этапами обработки с помощью слушателей событий (Listener).
Задача получает параметры выполнения через задания параметров задачи (JobParameters).
В составе SB есть ряд важных компонент, которые необходимо описать:
- JobRepository - основной компонент, который хранит всю мета-информацию по задаче: историю запусков и статусов, контексты, параметры, историю и статусы запуска шагов и т.д. JobRepository поставляется в 2-х реализациях: JobRepository для базы данных и JobRepository построенный поверх хранилища в памяти (ConcourrentHashMap).
- JobLauncher - компонент, который непосредственно осуществляет запуск задач на выполнение.
Кроме того, в состав SB уже входят скрипты создания соответствующих таблиц в БД для основных поставщиков баз данных.
Создадим базовый проект Maven. Добавим основные зависимости. Всё стандартно.
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<slf4j.version>1.7.12</slf4j.version>
<logback.version>1.1.3</logback.version>
<spring.version>4.2.1.RELEASE</spring.version>
</properties>
<dependencies>
<!-- logback -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>${logback.version}</version>
</dependency>
<!-- SLF4J -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<type>jar</type>
</dependency>
<!-- Spring framework dependency -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
</dependencies>
Добавим файл logback.xml
.
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
<resetJUL>true</resetJUL>
</contextListener>
<jmxConfigurator />
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{yyyy-MM-dd HH:mm:ss} %.-1level %logger %msg%n</pattern>
</encoder>
</appender>
<logger name="org.springframework" level="info" />
<root level="debug">
<appender-ref ref="console" />
</root>
</configuration>
Создадим базовый класс приложения.
package learn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class App {
public static final Logger logger = LoggerFactory.getLogger(App.class);
public static void main(String[] args) {
logger.info("Application started");
}
}
Теперь добавим зависимость непосредственно от spring batch.
<properties>
<spring.batch.version>3.0.5.RELEASE</spring.batch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring.batch.version}</version>
</dependency>
</dependencies>
Можно приступать к описанию конфигурации и задания. Создадим простейшее задание, которое только выводит сообщение на экран с переданным параметром.
Создадим заготовку для конфигурационного класса приложения.
@EnableBatchProcessing
@Configuration
public class Config {
}
Аннотация @Configuration
указывает, что это класс конфигурации, а @EnableBatchProcessing
- это аннотация для Spring Batch. Более подробно с тем, что делает данная аннотация можно ознакомиться в классе org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer
и org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration
.
Добавим получение контекста по конфигурационному классу:
public class App {
public static final Logger logger = LoggerFactory.getLogger(App.class);
public static void main(String[] args) {
logger.info("Application started");
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(Config.class);
}
}
Теперь нам нем необходимо описать какой будет использоваться JobRepository и JobLauncher. Воспользуемся простыми реализациями данным интерфейсов с хранением данных в памяти.
@Bean
public JobRepository jobRepository() {
return
new SimpleJobRepository(
new MapJobInstanceDao(),
new MapJobExecutionDao(),
new MapStepExecutionDao(),
new MapExecutionContextDao()
);
}
@Bean
public JobLauncher jobLauncher() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository());
return simpleJobLauncher;
}
Создадим простой Job, состоящий из 1 step'а.
@Bean
public Job simpleJob() {
return
jobBuilderFactory
.get("job1")
.incrementer(new RunIdIncrementer())
.start(echoLineStep())
.build();
}
Теперь создадим Step.
@Bean
public Step echoLineStep() {
return
stepBuilderFactory
.get("step1")
.tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
LoggerFactory.getLogger("echoLineTasklet").info("Hello from tasklet");
LoggerFactory
.getLogger(
"echoLineTasklet"
)
.info(
"Parameters: {}",
chunkContext
.getStepContext()
.getJobParameters()
);
return RepeatStatus.FINISHED;
}
}
)
.build();
}
Мы воспользовались builder'ами для созданияни работ и шагов. Метод get()
позволяет задать имя задаче или шагу. Метод build()
строит необходимый объект. Метод tasklet()
принимает объект тасклета, который будет реализовывать шаг.
Тасклет будет очень простым : выводить в лог сообщение с указанием параметров задачи. Более подробно.
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
LoggerFactory.getLogger("echoLineTasklet").info("Hello from tasklet");
LoggerFactory
.getLogger(
"echoLineTasklet"
)
.info(
"Parameters: {}",
chunkContext
.getStepContext()
.getJobParameters()
);
return RepeatStatus.FINISHED;
}
}
Метод execute()
это единственный метод tasklet'а. В данный метод передаётся контекст шага (stepcontext
), через который можно добраться до контекста задачи получив параметры задачи.
В качестве результат tasklet возвращает 2 значения:
-
FINISHED
- если тасклет закончился и повторений больше не нужно -
CONTINUABLE
- если выполнение может продолжится
В случае ошибки метод execute()
должен бросить исключение, которое будет обработано SB.
Полный код конфигурационного класса.
package learn;
...
@EnableBatchProcessing
@Configuration
public class Config {
@Autowired
protected JobBuilderFactory jobBuilderFactory;
@Autowired
protected StepBuilderFactory stepBuilderFactory;
@Bean
public JobRepository jobRepository() {
return
new SimpleJobRepository(
new MapJobInstanceDao(),
new MapJobExecutionDao(),
new MapStepExecutionDao(),
new MapExecutionContextDao()
);
}
@Bean
public JobLauncher jobLauncher() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository());
return simpleJobLauncher;
}
@Bean
public Job simpleJob() {
return
jobBuilderFactory
.get("job1")
.incrementer(new RunIdIncrementer())
.start(echoLineStep())
.build();
}
@Bean
public Step echoLineStep() {
return
stepBuilderFactory
.get("step1")
.tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
LoggerFactory.getLogger("echoLineTasklet").info("Hello from tasklet");
LoggerFactory
.getLogger(
"echoLineTasklet"
)
.info(
"Parameters: {}",
chunkContext
.getStepContext()
.getJobParameters()
);
return RepeatStatus.FINISHED;
}
}
)
.build();
}
}
Теперь рассмотрим код создания задачи:
Получаем объект задачи из контекста:
Job job = ctx.getBean("simpleJob", Job.class);
Создаём builder параметров задачи и задаём один из параметров:
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("param1", "param1 value");
А теперь полный код класса App.
package learn;
...
public class App {
public static final Logger logger = LoggerFactory.getLogger(App.class);
public static void main(String[] args) {
logger.info("Application started");
// Контекст в виде конфигурационного класса
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(Config.class);
// Получение объекта, который будет управлять запуском задач
JobLauncher jobLauncher = ctx.getBean(JobLauncher.class);
// Получить объект задачи
Job job = ctx.getBean("simpleJob", Job.class);
// Создание builder, который помогает в создании объекта параметров задачи
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
// Задание параметра задачи
jobParametersBuilder.addString("param1", "param1 value");
try {
// запуск задачи с параметрами
jobLauncher.run(job, jobParametersBuilder.toJobParameters());
// две неуспешные попытки запуска задачи
jobLauncher.run(job, jobParametersBuilder.toJobParameters());
jobLauncher.run(job, jobParametersBuilder.toJobParameters());
} catch (JobExecutionAlreadyRunningException e) {
logger.error("Already running task",e);
} catch (JobRestartException e) {
logger.error("Restart task",e);
} catch (JobInstanceAlreadyCompleteException e) {
logger.error("Already completed task",e);
} catch (JobParametersInvalidException e) {
logger.error("Param invalid",e);
}
}
}
Мы запускаем задачи с одинаковыми параметрами, а инфраструктура SB запрещает такое поведение. Если запустим приложение, то увидим ход выполнения задачи и ошибку, связанную с тем, что обнаружена попытка запуска задачи с одинаковыми параметрами.
$ mvn exec:java -Dexec.mainClass="learn.App"
...
2015-09-27 22:03:13 I learn.App Application started
2015-09-27 22:03:14 I org.springframework.context.annotation.AnnotationConfigApplicationContext Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@75349bd5: startup date [Sun Sep 27 22:03:14 MSK 2015]; root of context hierarchy
2015-09-27 22:03:14 I org.springframework.beans.factory.support.DefaultListableBeanFactory Overriding bean definition for bean 'jobLauncher' with a different definition: replacing [Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=3; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration; factoryMethodName=jobLauncher; initMethodName=null; destroyMethodName=(inferred); defined in class path resource [org/springframework/batch/core/configuration/annotation/SimpleBatchConfiguration.class]] with [Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=3; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=config; factoryMethodName=jobLauncher; initMethodName=null; destroyMethodName=(inferred); defined in learn.Config]
2015-09-27 22:03:14 I org.springframework.beans.factory.support.DefaultListableBeanFactory Overriding bean definition for bean 'jobRepository' with a different definition: replacing [Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=3; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration; factoryMethodName=jobRepository; initMethodName=null; destroyMethodName=(inferred); defined in class path resource [org/springframework/batch/core/configuration/annotation/SimpleBatchConfiguration.class]] with [Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=3; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=config; factoryMethodName=jobRepository; initMethodName=null; destroyMethodName=(inferred); defined in learn.Config]
2015-09-27 22:03:14 W org.springframework.context.annotation.ConfigurationClassEnhancer @Bean method ScopeConfiguration.stepScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
2015-09-27 22:03:14 W org.springframework.context.annotation.ConfigurationClassEnhancer @Bean method ScopeConfiguration.jobScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
2015-09-27 22:03:14 I org.springframework.batch.core.launch.support.SimpleJobLauncher No TaskExecutor has been set, defaulting to synchronous executor.
2015-09-27 22:03:14 I org.springframework.batch.core.launch.support.SimpleJobLauncher Job: [SimpleJob: [name=job1]] launched with the following parameters: [{param1=param1 value}]
2015-09-27 22:03:14 I org.springframework.batch.core.job.SimpleStepHandler Executing step: [step1]
2015-09-27 22:03:14 W org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer No datasource was provided...using a Map based JobRepository
2015-09-27 22:03:14 I org.springframework.batch.core.launch.support.SimpleJobLauncher No TaskExecutor has been set, defaulting to synchronous executor.
2015-09-27 22:03:14 I echoLineTasklet Hello from tasklet
2015-09-27 22:03:14 I echoLineTasklet Parameters: {param1=param1 value}
2015-09-27 22:03:14 I org.springframework.batch.core.launch.support.SimpleJobLauncher Job: [SimpleJob: [name=job1]] completed with the following parameters: [{param1=param1 value}] and the following status: [COMPLETED]
2015-09-27 22:03:14 E learn.App Already completed task
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={param1=param1 value}. If you want to run this job again, change the parameters.
at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126) ~[spring-batch-core-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) ~[spring-batch-core-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at learn.App.main(App.java:34) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_40]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_40]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_40]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_40]
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) [exec-maven-plugin-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_40]
TODO:
- ExecutionListener
- JobExecutionListener
- StepExecutionListener
- abstract parent job
- ExecutionContextPromotionListener
- TaskLet
- Quartz as scheduler
- SystemCommandTasklet
- JobParameters
- JobParameterBuilder
Links: