1、概览 {#1概览}
本文将带你深入了解 Spring Batch。它是一个批处理框架,专为执行批处理作业而设计。
当前的 5.0.0 版本支持 Spring 6 和 Java 17。
2、工作流基础 {#2工作流基础}
Spring Batch 遵循传统的批处理架构,其中 Job Repository 负责 Job 的调度和交互。
一项工作(Job)可以有多个步骤(Step)。每个步骤通常都遵循 读取数据 、处理数据 和 写入数据 的顺序。
当然,框架会在这里完成大部分繁重的工作,尤其是在处理 Job 的底层持久化时 - 本文使用 h2 作为 Job Repository。
2.1、应用示例 {#21应用示例}
在本例中,我们需要将一些财务交易数据从 CSV 迁移到 XML。
输入文件的结构非常简单。
每行包含一笔交易,由用户名(username)、用户 ID(userid)、交易日期(transaction_date)和金额(transaction_amount)组成:
username, userid, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411
3、Maven 依赖 {#3maven-依赖}
本项目需要依赖 Spring Core、Spring Batch 和 H2 数据库:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>6.0.6</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.214</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>5.0.0</version>
</dependency>
4、Spring Batch 和 Job 配置 {#4spring-batch-和-job-配置}
基本的 Spring Batch 配置以及 CSV 转 XML 功能的 Job 描述如下所示。
基于 Java 的 Job 配置:
@Profile("spring") public class SpringBatchConfig {
@Value("input/record.csv") private Resource inputCsv; @Value("file:xml/output.xml") private Resource outputXml; @Bean public ItemReader<Transaction> itemReader() throws UnexpectedInputException, ParseException { FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); String[] tokens = { "username", "userid", "transactiondate", "amount" }; tokenizer.setNames(tokens); reader.setResource(inputCsv); DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); reader.setLineMapper(lineMapper); return reader; } @Bean public ItemProcessor<Transaction, Transaction> itemProcessor() { return new CustomItemProcessor(); } @Bean public ItemWriter<Transaction> itemWriter(Marshaller marshaller) throws MalformedURLException { StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<Transaction>(); itemWriter.setMarshaller(marshaller); itemWriter.setRootTagName("transactionRecord"); itemWriter.setResource(outputXml); return itemWriter; } @Bean public Marshaller marshaller() { Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); marshaller.setClassesToBeBound(new Class[] { Transaction.class }); return marshaller; } @Bean protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader<Transaction> reader, ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer, ) { return new StepBuilder("step1", jobRepository).<Transaction, Transaction> chunk(10, transactionManager) .reader(reader).processor(processor).writer(writer).build(); } @Bean(name = "firstBatchJob") public Job job(JobRepository jobRepository, @Qualifier("step1") Step step1) { return new JobBuilder("firstBatchJob", jobRepository).preventRestart().start(step1).build(); } public DataSource dataSource() { EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder(); return builder.setType(EmbeddedDatabaseType.H2) .addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql") .addScript("classpath:org/springframework/batch/core/schema-h2.sql") .build(); } @Bean(name = "transactionManager") public PlatformTransactionManager getTransactionManager() { return new ResourcelessTransactionManager(); } @Bean(name = "jobRepository") public JobRepository getJobRepository() throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory.setDataSource(dataSource()); factory.setTransactionManager(getTransactionManager()); factory.afterPropertiesSet(); return factory.getObject(); } @Bean(name = "jobLauncher") public JobLauncher getJobLauncher() throws Exception { TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher(); jobLauncher.setJobRepository(getJobRepository()); jobLauncher.afterPropertiesSet(); return jobLauncher; }
}
以及,基于 XML 的配置:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="input/record.csv" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="names" value="username,userid,transactiondate,amount" /> </bean> </property> <property name="fieldSetMapper"> <bean class="com.baeldung.batch.service.RecordFieldSetMapper" /> </property> </bean> </property> <property name="linesToSkip" value="1" /> </bean>
<bean id="itemProcessor" class="com.baeldung.batch.service.CustomItemProcessor" />
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter"> <property name="resource" value="file:xml/output.xml" /> <property name="marshaller" ref="marshaller" /> <property name="rootTagName" value="transactionRecord" /> </bean>
<bean id="marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller"> <property name="classesToBeBound"> <list> <value>com.baeldung.batch.model.Transaction</value> </list> </property> </bean>
<batch:job id="firstBatchJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="itemReader" writer="itemWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job>
<!-- 连接到 H2 database --> <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="org.h2.Driver" /> <property name="url" value="jdbc:h2:file:~/repository" /> <property name="username" value="" /> <property name="password" value="" /> </bean>
<!-- 自动创建 job-meta 表 --> <jdbc:initialize-database data-source="dataSource"> <jdbc:script location="org/springframework/batch/core/schema-drop-h2.sql" /> <jdbc:script location="org/springframework/batch/core/schema-h2.sql" /> </jdbc:initialize-database>
<!-- 在数据库中存储 job-meta --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <property name="dataSource" ref="dataSource" /> <property name="transactionManager" ref="transactionManager" /> <property name="databaseType" value="h2" /> </bean>
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean>
整个配置如上,接下来细细讨论。
4.1、使用 ItemReader
读取数据和创建对象 {#41使用-itemreader-读取数据和创建对象}
首先,配置了 cvsFileItemReader
,它会读取 record.csv
中的数据并将其转换为 Transaction
对象:
@SuppressWarnings("restriction") @XmlRootElement(name = "transactionRecord") public class Transaction { private String username; private int userId; private LocalDateTime transactionDate; private double amount;
/* get、set 省略 */ @Override public String toString() { return "Transaction [username=" + username + ", userId=" + userId + ", transactionDate=" + transactionDate + ", amount=" + amount + "]"; }
}
为此,使用自定义 mapper:
public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
public Transaction mapFieldSet(FieldSet fieldSet) throws BindException { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/M/yyy"); Transaction transaction = new Transaction(); transaction.setUsername(fieldSet.readString("username")); transaction.setUserId(fieldSet.readInt(1)); transaction.setAmount(fieldSet.readDouble(3)); String dateString = fieldSet.readString(2); transaction.setTransactionDate(LocalDate.parse(dateString, formatter).atStartOfDay()); return transaction; }
}
4.2、使用 ItemProcessor
处理数据 {#42使用-itemprocessor-处理数据}
创建自定义的项目处理器 CustomItemProcessor
。它不会处理任何与 Transaction
对象相关的内容。
它所做的只是将来自 reader 的原始对象传递给 writer者:
public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {
public Transaction process(Transaction item) { return item; }
}
4.3、使用 ItemWriter
将对象写入 FS {#43使用-itemwriter-将对象写入-fs}
最后,把该 transaction
存储到一个 XML 文件中,该文件位于 xml/output.xml
:
<bean id="itemWriter"
class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output.xml" />
<property name="marshaller" ref="marshaller" />
<property name="rootTagName" value="transactionRecord" />
</bean>
4.4、配置 Batch Job {#44配置-batch-job}
我们要做的就是使用 batch:job
语法将这些点与 Job 连接起来。
注意 commit-interval
(提交间隔)。这是在向 itemWriter
提交批处理之前内存中要保留的 Transaction
数量。
它会将 Transaction
保留在内存中,直到该点(或输入数据结束)。
Java Bean 和相应的 XML 配置如下:
@Bean
protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Qualifier("itemProcessor") ItemProcessor<Transaction,
Transaction> processor, ItemWriter<Transaction> writer) {
return new StepBuilder("step1", jobRepository)
.<Transaction, Transaction> chunk(10, transactionManager)
.reader(itemReader(inputCsv))
.processor(processor)
.writer(writer)
.build();
}
<batch:job id="firstBatchJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter"
processor="itemProcessor" commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
4.5、运行 Batch Job {#45运行-batch-job}
设置好一切后,运行应用:
@Profile("spring") public class App { public static void main(String[] args) { // Spring Java 配置 AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.register(SpringConfig.class); context.register(SpringBatchConfig.class); context.refresh();
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("firstBatchJob"); System.out.println("Starting the batch job"); try { JobExecution execution = jobLauncher.run(job, new JobParameters()); System.out.println("Job Status : " + execution.getStatus()); System.out.println("Job completed"); } catch (Exception e) { e.printStackTrace(); System.out.println("Job failed"); } }
}
使用 -Dspring.profiles.active=spring
Profile 运行 Spring 应用。
5、Spring Boot 配置 {#5spring-boot-配置}
创建一个 Spring Boot 应用,并将之前的 Spring Batch 配置转换为在 Spring Boot 环境中运行。
基本上,这与之前的 Spring Batch 示例大致相同。
5.1、Maven 依赖 {#51maven-依赖}
在 pom.xml
中添加 spring-boot-starter-batch
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
还需要使用数据库来存储 Spring Batch Job 信息,在本例中仍然使用 H2 内存数据库。
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.214</version>
<scope>runtime</scope>
</dependency>
5.2、Spring Boot 配置 {#52spring-boot-配置}
使用 @Profile
注解来区分 Spring 和 Spring Boot 配置。
在应用中设置 spring-boot
Profile:
@SpringBootApplication public class SpringBatchApplication {
public static void main(String[] args) { SpringApplication springApp = new SpringApplication(SpringBatchApplication.class); springApp.setAdditionalProfiles("spring-boot"); springApp.run(args); }
}
5.3、Spring Batch Job 配置 {#53spring-batch-job-配置}
使用的 Batch Job 配置与之前的 SpringBatchConfig
类相同:
@Configuration public class SpringBootBatchConfig {
@Value("input/record.csv") private Resource inputCsv; @Value("input/recordWithInvalidData.csv") private Resource invalidInputCsv; @Value("file:xml/output.xml") private Resource outputXml; // ...
}
从 spring-boot 3.0 开始,不鼓励使用 @EnableBatchProcessing
注解,我们需要手动声明 JobRepository
、JobLauncher
和 TransactionManager
Bean。此外,JobBuilderFactory
和 StepBuilderFactory
已被弃用,建议使用 JobBuilder
和 StepBuilder
类。
6、总结 {#6总结}
本文通过实际示例介绍了 Spring Batch 的概念和用法,以及在 Spring 和 Spring Boot 中的不同配置方式。
Ref:https://www.baeldung.com/introduction-to-spring-batch