51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Spring Batch 教程

1、概览 {#1概览}

本文将带你深入了解 Spring Batch。它是一个批处理框架,专为执行批处理作业而设计。

当前的 5.0.0 版本支持 Spring 6 和 Java 17。

2、工作流基础 {#2工作流基础}

Spring Batch 遵循传统的批处理架构,其中 Job Repository 负责 Job 的调度和交互。

一项工作(Job)可以有多个步骤(Step)。每个步骤通常都遵循 读取数据处理数据写入数据 的顺序。

当然,框架会在这里完成大部分繁重的工作,尤其是在处理 Job 的底层持久化时 - 本文使用 h2 作为 Job Repository。

2.1、应用示例 {#21应用示例}

在本例中,我们需要将一些财务交易数据从 CSV 迁移到 XML。

输入文件的结构非常简单。

每行包含一笔交易,由用户名(username)、用户 ID(userid)、交易日期(transaction_date)和金额(transaction_amount)组成:

username, userid, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411

3、Maven 依赖 {#3maven-依赖}

本项目需要依赖 Spring Core、Spring Batch 和 H2 数据库:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-oxm</artifactId>
    <version>6.0.6</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.1.214</version>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-core</artifactId>
    <version>5.0.0</version>
</dependency>

4、Spring Batch 和 Job 配置 {#4spring-batch-和-job-配置}

基本的 Spring Batch 配置以及 CSV 转 XML 功能的 Job 描述如下所示。

基于 Java 的 Job 配置:

@Profile("spring")
public class SpringBatchConfig {
@Value(&quot;input/record.csv&quot;)
private Resource inputCsv;

@Value(&quot;file:xml/output.xml&quot;)
private Resource outputXml;

@Bean
public ItemReader&lt;Transaction&gt; itemReader()
  throws UnexpectedInputException, ParseException {
    FlatFileItemReader&lt;Transaction&gt; reader = new FlatFileItemReader&lt;Transaction&gt;();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    String[] tokens = { &quot;username&quot;, &quot;userid&quot;, &quot;transactiondate&quot;, &quot;amount&quot; };
    tokenizer.setNames(tokens);
    reader.setResource(inputCsv);
    DefaultLineMapper&lt;Transaction&gt; lineMapper = 
      new DefaultLineMapper&lt;Transaction&gt;();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
    reader.setLineMapper(lineMapper);
    return reader;
}

@Bean
public ItemProcessor&lt;Transaction, Transaction&gt; itemProcessor() {
    return new CustomItemProcessor();
}

@Bean
public ItemWriter&lt;Transaction&gt; itemWriter(Marshaller marshaller)
  throws MalformedURLException {
    StaxEventItemWriter&lt;Transaction&gt; itemWriter = 
      new StaxEventItemWriter&lt;Transaction&gt;();
    itemWriter.setMarshaller(marshaller);
    itemWriter.setRootTagName(&quot;transactionRecord&quot;);
    itemWriter.setResource(outputXml);
    return itemWriter;
}

@Bean
public Marshaller marshaller() {
    Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
    marshaller.setClassesToBeBound(new Class[] { Transaction.class });
    return marshaller;
}

@Bean
protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader&lt;Transaction&gt; reader, 
                  ItemProcessor&lt;Transaction, Transaction&gt; processor, ItemWriter&lt;Transaction&gt; writer, ) {
    return new StepBuilder(&quot;step1&quot;, jobRepository).&lt;Transaction, Transaction&gt; chunk(10, transactionManager)
      .reader(reader).processor(processor).writer(writer).build();
}

@Bean(name = &quot;firstBatchJob&quot;)
public Job job(JobRepository jobRepository, @Qualifier(&quot;step1&quot;) Step step1) {
    return new JobBuilder(&quot;firstBatchJob&quot;, jobRepository).preventRestart().start(step1).build();
}

public DataSource dataSource() {
 EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
 return builder.setType(EmbeddedDatabaseType.H2)
       .addScript(&quot;classpath:org/springframework/batch/core/schema-drop-h2.sql&quot;)
       .addScript(&quot;classpath:org/springframework/batch/core/schema-h2.sql&quot;)
       .build();
}

@Bean(name = &quot;transactionManager&quot;)
public PlatformTransactionManager getTransactionManager() {
    return new ResourcelessTransactionManager();
}

@Bean(name = &quot;jobRepository&quot;)
public JobRepository getJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource());
    factory.setTransactionManager(getTransactionManager());
    factory.afterPropertiesSet();
    return factory.getObject();
}

@Bean(name = &quot;jobLauncher&quot;)
public JobLauncher getJobLauncher() throws Exception {
   TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
   jobLauncher.setJobRepository(getJobRepository());
   jobLauncher.afterPropertiesSet();
   return jobLauncher;
}

}

以及,基于 XML 的配置:

<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="input/record.csv" />
    <property name="lineMapper">
        <bean
            class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean
                    class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="names"
                        value="username,userid,transactiondate,amount" />
                </bean>
            </property>
            <property name="fieldSetMapper">
                <bean
                    class="com.baeldung.batch.service.RecordFieldSetMapper" />
            </property>
        </bean>
    </property>
    <property name="linesToSkip" value="1" />
</bean>

<bean id="itemProcessor" class="com.baeldung.batch.service.CustomItemProcessor" />

<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter"> <property name="resource" value="file:xml/output.xml" /> <property name="marshaller" ref="marshaller" /> <property name="rootTagName" value="transactionRecord" /> </bean>

<bean id="marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller"> <property name="classesToBeBound"> <list> <value>com.baeldung.batch.model.Transaction</value> </list> </property> </bean>

<batch:job id="firstBatchJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="itemReader" writer="itemWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job>

<!-- 连接到 H2 database --> <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="org.h2.Driver" /> <property name="url" value="jdbc:h2:file:~/repository" /> <property name="username" value="" /> <property name="password" value="" /> </bean>

<!-- 自动创建 job-meta 表 --> <jdbc:initialize-database data-source="dataSource"> <jdbc:script location="org/springframework/batch/core/schema-drop-h2.sql" /> <jdbc:script location="org/springframework/batch/core/schema-h2.sql" /> </jdbc:initialize-database>

<!-- 在数据库中存储 job-meta --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <property name="dataSource" ref="dataSource" /> <property name="transactionManager" ref="transactionManager" /> <property name="databaseType" value="h2" /> </bean>

<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean>

整个配置如上,接下来细细讨论。

4.1、使用 ItemReader 读取数据和创建对象 {#41使用-itemreader-读取数据和创建对象}

首先,配置了 cvsFileItemReader,它会读取 record.csv 中的数据并将其转换为 Transaction 对象:

@SuppressWarnings("restriction")
@XmlRootElement(name = "transactionRecord")
public class Transaction {
    private String username;
    private int userId;
    private LocalDateTime transactionDate;
    private double amount;
/* get、set 省略 */

@Override
public String toString() {
    return &quot;Transaction [username=&quot; + username + &quot;, userId=&quot; + userId
      + &quot;, transactionDate=&quot; + transactionDate + &quot;, amount=&quot; + amount
      + &quot;]&quot;;
}

}

为此,使用自定义 mapper:

public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern(&quot;d/M/yyy&quot;);
    Transaction transaction = new Transaction();

    transaction.setUsername(fieldSet.readString(&quot;username&quot;));
    transaction.setUserId(fieldSet.readInt(1));
    transaction.setAmount(fieldSet.readDouble(3));
    String dateString = fieldSet.readString(2);
    transaction.setTransactionDate(LocalDate.parse(dateString, formatter).atStartOfDay());
    return transaction;
}

}

4.2、使用 ItemProcessor 处理数据 {#42使用-itemprocessor-处理数据}

创建自定义的项目处理器 CustomItemProcessor。它不会处理任何与 Transaction 对象相关的内容。

它所做的只是将来自 reader 的原始对象传递给 writer者:

public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {
public Transaction process(Transaction item) {
    return item;
}

}

4.3、使用 ItemWriter 将对象写入 FS {#43使用-itemwriter-将对象写入-fs}

最后,把该 transaction 存储到一个 XML 文件中,该文件位于 xml/output.xml

<bean id="itemWriter"
  class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" value="file:xml/output.xml" />
    <property name="marshaller" ref="marshaller" />
    <property name="rootTagName" value="transactionRecord" />
</bean>

4.4、配置 Batch Job {#44配置-batch-job}

我们要做的就是使用 batch:job 语法将这些点与 Job 连接起来。

注意 commit-interval(提交间隔)。这是在向 itemWriter 提交批处理之前内存中要保留的 Transaction 数量。

它会将 Transaction 保留在内存中,直到该点(或输入数据结束)。

Java Bean 和相应的 XML 配置如下:

@Bean
protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Qualifier("itemProcessor") ItemProcessor<Transaction,
        Transaction> processor, ItemWriter<Transaction> writer) {
    return new StepBuilder("step1", jobRepository)
            .<Transaction, Transaction> chunk(10, transactionManager)
            .reader(itemReader(inputCsv))
            .processor(processor)
            .writer(writer)
            .build();
}
<batch:job id="firstBatchJob">
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
              processor="itemProcessor" commit-interval="10">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

4.5、运行 Batch Job {#45运行-batch-job}

设置好一切后,运行应用:

@Profile("spring")
public class App {
    public static void main(String[] args) {
        // Spring Java 配置
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(SpringConfig.class);
        context.register(SpringBatchConfig.class);
        context.refresh();
    JobLauncher jobLauncher = (JobLauncher) context.getBean(&quot;jobLauncher&quot;);
    Job job = (Job) context.getBean(&quot;firstBatchJob&quot;);
    System.out.println(&quot;Starting the batch job&quot;);
    try {
        JobExecution execution = jobLauncher.run(job, new JobParameters());
        System.out.println(&quot;Job Status : &quot; + execution.getStatus());
        System.out.println(&quot;Job completed&quot;);
    } catch (Exception e) {
        e.printStackTrace();
        System.out.println(&quot;Job failed&quot;);
    }
}

}

使用 -Dspring.profiles.active=spring Profile 运行 Spring 应用。

5、Spring Boot 配置 {#5spring-boot-配置}

创建一个 Spring Boot 应用,并将之前的 Spring Batch 配置转换为在 Spring Boot 环境中运行。

基本上,这与之前的 Spring Batch 示例大致相同。

5.1、Maven 依赖 {#51maven-依赖}

pom.xml 中添加 spring-boot-starter-batch 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

还需要使用数据库来存储 Spring Batch Job 信息,在本例中仍然使用 H2 内存数据库。

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.1.214</version>
    <scope>runtime</scope>
</dependency>

5.2、Spring Boot 配置 {#52spring-boot-配置}

使用 @Profile 注解来区分 Spring 和 Spring Boot 配置。

在应用中设置 spring-boot Profile:

@SpringBootApplication
public class SpringBatchApplication {
public static void main(String[] args) {
    SpringApplication springApp = new SpringApplication(SpringBatchApplication.class);
    springApp.setAdditionalProfiles(&quot;spring-boot&quot;);
    springApp.run(args);
}

}

5.3、Spring Batch Job 配置 {#53spring-batch-job-配置}

使用的 Batch Job 配置与之前的 SpringBatchConfig 类相同:

@Configuration
public class SpringBootBatchConfig {
@Value(&quot;input/record.csv&quot;)
private Resource inputCsv;

@Value(&quot;input/recordWithInvalidData.csv&quot;)
private Resource invalidInputCsv;

@Value(&quot;file:xml/output.xml&quot;)
private Resource outputXml;

// ...

}

从 spring-boot 3.0 开始,不鼓励使用 @EnableBatchProcessing 注解,我们需要手动声明 JobRepositoryJobLauncherTransactionManager Bean。此外,JobBuilderFactoryStepBuilderFactory 已被弃用,建议使用 JobBuilderStepBuilder 类。

6、总结 {#6总结}

本文通过实际示例介绍了 Spring Batch 的概念和用法,以及在 Spring 和 Spring Boot 中的不同配置方式。


Ref:https://www.baeldung.com/introduction-to-spring-batch

赞(5)
未经允许不得转载:工具盒子 » Spring Batch 教程