1、概览 {#1概览}
Spring Batch 是一个强大的 Java 批处理框架,因此在数据处理活动和定时任务运行中被广泛选择。根据业务逻辑的复杂程度,作业可以依赖不同的配置值和动态参数。
本文将带你了解如何使用 JobParameter
以及如何从基本的批处理组件中访问它们。
2、Demo 项目 {#2demo-项目}
我们将为药房服务开发一个 Spring Batch。主要业务任务是查找即将过期的药品,根据销售情况计算新价格,并通知消费者即将过期的药品。此外,我们将从内存中的 H2 数据库读取数据,并将所有处理细节写入日志,以简化实现过程。
2.1、依赖 {#21依赖}
要开始演示应用,需要添加 Spring Batch 和 H2 依赖:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.2.0</version>
</dependency>
可以在 Maven Central Repository 中找到最新的 H2 和 Spring Batch 版本。
2.2、准备测试数据 {#22准备测试数据}
首先在 schema-all.sql
中定义 Schema:
DROP TABLE medicine IF EXISTS;
CREATE TABLE medicine (
med_id VARCHAR(36) PRIMARY KEY,
name VARCHAR(30),
type VARCHAR(30),
expiration_date TIMESTAMP,
original_price DECIMAL,
sale_price DECIMAL
);
初始测试数据定义在 data.sql
文件中:
INSERT INTO medicine VALUES ('ec278dd3-87b9-4ad1-858f-dfe5bc34bdb5', 'Lidocaine', 'ANESTHETICS', DATEADD('DAY', 120, CURRENT_DATE), 10, null);
INSERT INTO medicine VALUES ('9d39321d-34f3-4eb7-bb9a-a69734e0e372', 'Flucloxacillin', 'ANTIBACTERIALS', DATEADD('DAY', 40, CURRENT_DATE), 20, null);
INSERT INTO medicine VALUES ('87f4ff13-de40-4c7f-95db-627f309394dd', 'Amoxicillin', 'ANTIBACTERIALS', DATEADD('DAY', 70, CURRENT_DATE), 30, null);
INSERT INTO medicine VALUES ('acd99d6a-27be-4c89-babe-0edf4dca22cb', 'Prozac', 'ANTIDEPRESSANTS', DATEADD('DAY', 30, CURRENT_DATE), 40, null);
Spring Boot 在应用启动时运行这些文件,并且我们将在测试执行中使用这些测试数据。
2.3、Medicine Domain 类 {#23medicine-domain-类}
对于我们的服务,需要一个简单的 Medicine
实体类:
@AllArgsConstructor
@Data
public class Medicine {
private UUID id;
private String name;
private MedicineCategory type;
private Timestamp expirationDate;
private Double originalPrice;
private Double salePrice;
}
ItemReader
使用 expirationDate
字段计算药品是否即将过期。当药品接近过期时,ItemProcessor
将更新 salePrice
字段。
2.4、Application Properties {#24application-properties}
应用需要在 src/main/resources/application.properties
文件中设置多个属性:
spring.batch.job.enabled=false
batch.medicine.cron=0 */1 * * * *
batch.medicine.alert_type=LOGS
batch.medicine.expiration.default.days=60
batch.medicine.start.sale.default.days=45
batch.medicine.sale=0.1
由于我们只配置一个作业(Job),因此 spring.batch.job.enabled
应设置为 false
,以禁用初始作业执行。默认情况下,Spring 会在上下文启动后以空参运行作业:
[main] INFO o.s.b.a.b.JobLauncherApplicationRunner - Running default command line with: []
batch.medicine.cron
属性定义了计划运行的 cron 表达式。根据定义的情景,我们应该每天运行作业。然而,在本例中,作业每分钟启动一次,以便能够轻松检查处理行为。
InputReader
、InputProcessor
和 InpurWriter
还需要其他属性来执行业务逻辑。
3、任务参数 {#3任务参数}
Spring Batch 包含一个 JobParameters
类,用于存储特定作业(Job)运行时的参数。这一功能在各种情况下都很有用。例如,它允许传递特定运行期间生成的动态变量。此外,它还可以创建一个控制器(Controller),根据客户端提供的参数启动作业。
在我们的场景中,我们将利用该类来保存应用参数和动态运行时参数。
3.1、StepScope 和 JobScope {#31stepscope-和-jobscope}
除了常规 Spring 中众所周知的 Bean Scope 外,Spring Batch 还引入了两个额外的 Scope:StepScope
和 JobScope
。有了这些 Scope,就可以为工作流中的每个步骤(Step)或作业(Job)创建唯一的 Bean。Spring 可确保与特定步骤/作业相关的资源在其整个生命周期内都是独立隔离和管理的。
有了这项功能,我们就可以轻松控制上下文,并在特定运行的读取、处理和写入部分共享所有需要的属性。为了能够注入作业参数,我们需要用 @StepScope
或 @JobScope
对依赖 Bean 进行注解。
3.2、在计划执行中填充作业参数 {#32在计划执行中填充作业参数}
定义 MedExpirationBatchRunner
类,它将通过 cron 表达式启动我们的作业(在我们的例子中是每 1 分钟一次)。我们应该用 @EnableScheduling
对该类进行注解,并定义适当的 @Scheduled
入口方法:
@Component
@EnableScheduling
public class MedExpirationBatchRunner {
...
@Scheduled(cron = "${batch.medicine.cron}", zone = "GMT")
public void runJob() {
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
launchJob(now);
}
}
由于我们要手动启动作业,因此应使用 JobLaucher
类,并在 JobLauncher#run()
方法中提供已填充的 JobParameter
。在我们的示例中,我们提供了来自 application.properties
的值以及两个特定于运行的参数(触发作业的日期和 trace id):
public void launchJob(ZonedDateTime triggerZonedDateTime) {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addString(BatchConstants.TRIGGERED_DATE_TIME, triggerZonedDateTime.toString())
.addString(BatchConstants.ALERT_TYPE, alertType)
.addLong(BatchConstants.DEFAULT_EXPIRATION, defaultExpiration)
.addLong(BatchConstants.SALE_STARTS_DAYS, saleStartDays)
.addDouble(BatchConstants.MEDICINE_SALE, medicineSale)
.addString(BatchConstants.TRACE_ID, UUID.randomUUID().toString())
.toJobParameters();
jobLauncher.run(medExpirationJob, jobParameters);
} catch (Exception e) {
log.error("Failed to run", e);
}
}
在配置参数之后,我们有几种选项可以在代码中使用这些值。
3.3、在 Bean 定义中读取任务参数 {#33在-bean-定义中读取任务参数}
使用 SpEL ,我们可以从配置类中的 Bean 定义访问作业参数。Spring 会将所有参数合并为 String
到 Object
的常规 Map
:
@Bean
@StepScope
public MedicineProcessor medicineProcessor(@Value("#{jobParameters}") Map<String, Object> jobParameters) {
...
}
在方法内部,我们将使用 jobParameters
来初始化 MedicineProcessor
的适当字段。
3.4、直接读取服务中的任务参数 {#34直接读取服务中的任务参数}
另一种方法是在 ItemReader
中使用 Setter 注入。我们可以通过 SpEL 表达式,像从其他 Map
一样获取准确的参数值:
@Setter
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> {
@Value("#{jobParameters['DEFAULT_EXPIRATION']}")
private long defaultExpiration;
}
我们只需确保 SpEL 中使用的 key 与参数初始化时使用的 key 相同。
3.5、通过 "BeforeStep" 读取任务参数 {#35通过-beforestep-读取任务参数}
Spring Batch 提供了一个 StepExecutionListener
接口,允许我们监听步骤执行阶段:步骤开始前和步骤完成后。我们可以利用这一功能,在步骤开始前访问属性,并执行任何自定义逻辑。最简单的方法就是使用 @BeforeStep
注解,它与 StepExecutionListener
的 beforeStep()
方法相对应:
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
JobParameters parameters = stepExecution.getJobExecution()
.getJobParameters();
...
log.info("Before step params: {}", parameters);
}
4、Job Configuration {#4job-configuration}
让我们把所有部分结合起来,看看全貌。
Reader、Processor 和 Writer 都需要两个属性:BatchConstants.TRIGGERED_DATE_TIME
和 BatchConstants.TRACE_ID
。
我们将在所有 Step Bean 定义中使用相同的提取逻辑来提取公共参数。
private void enrichWithJobParameters(Map<String, Object> jobParameters, ContainsJobParameters container) {
if (jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) != null) {
container.setTriggeredDateTime(ZonedDateTime.parse(jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME)
.toString()));
}
if (jobParameters.get(BatchConstants.TRACE_ID) != null) {
container.setTraceId(jobParameters.get(BatchConstants.TRACE_ID).toString());
}
}
其他参数都是针对特定组件的,没有通用逻辑。
4.1、配置 ItemReader {#41配置-itemreader}
首先,我们要配置 ExpiresSoonMedicineReader
并丰富公共参数:
@Bean
@StepScope
public ExpiresSoonMedicineReader expiresSoonMedicineReader(JdbcTemplate jdbcTemplate, @Value("#{jobParameters}") Map<String, Object> jobParameters) {
ExpiresSoonMedicineReader medicineReader = new ExpiresSoonMedicineReader(jdbcTemplate);
enrichWithJobParameters(jobParameters, medicineReader);
return medicineReader;
}
仔细看看 Reader
的具体实现。TriggeredDateTime
和 traceId
参数在构建 Bean 时直接注入,而 defaultExpiration
参数则由 Spring 通过 Setter 注入。为了便于演示,我们在 doOpen()
方法中使用了所有这些参数:
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> implements ContainsJobParameters {
private ZonedDateTime triggeredDateTime;
private String traceId;
@Value("#{jobParameters['DEFAULT_EXPIRATION']}")
private long defaultExpiration;
private List<Medicine> expiringMedicineList;
...
@Override
protected void doOpen() {
expiringMedicineList = jdbcTemplate.query(FIND_EXPIRING_SOON_MEDICINE, ps -> ps.setLong(1, defaultExpiration), (rs, row) -> getMedicine(rs));
log.info(&quot;Trace = {}. Found {} meds that expires soon&quot;, traceId, expiringMedicineList.size());
if (!expiringMedicineList.isEmpty()) {
setMaxItemCount(expiringMedicineList.size());
}
}
@PostConstruct
public void init() {
setName(ClassUtils.getShortName(getClass()));
}
}
ItemReader
不应标记为 @Component
。此外,我们还需要调用 setName()
方法来设置所需的 Reader
名称。
4.2、配置 ItemProcessor 和 ItemWriter {#42配置-itemprocessor-和-itemwriter}
ItemProcessor
和 ItemWriter
遵循与 ItemReader
相同的方法。因此,它们不需要任何特定的配置来访问参数。Bean 定义逻辑通过 enrichWithJobParameters()
方法初始化公共参数。其他仅在单个类中使用且不需要在所有组件中填充的参数,通过 Spring 在相应的类中进行 Setter 注入来进行增强。
我们应该使用 @StepScope
注解标记所有依赖于属性的 Bean。否则,Spring 将只在上下文启动时创建一次 Bean,而不会注入参数值。
4.3、配置完整的流程 {#43配置完整的流程}
我们不需要采取任何特定操作来配置任务参数,只需将所有 Bean 组合起来:
@Bean
public Job medExpirationJob(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
MedicineWriter medicineWriter,
MedicineProcessor medicineProcessor,
ExpiresSoonMedicineReader expiresSoonMedicineReader) {
Step notifyAboutExpiringMedicine = new StepBuilder("notifyAboutExpiringMedicine", jobRepository).<Medicine, Medicine>chunk(10)
.reader(expiresSoonMedicineReader)
.processor(medicineProcessor)
.writer(medicineWriter)
.faultTolerant()
.transactionManager(transactionManager)
.build();
return new JobBuilder("medExpirationJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(notifyAboutExpiringMedicine)
.build();
}
5、运行应用 {#5运行应用}
运行一个完整的示例,看看应用是如何使用所有参数的。
我们需要从 SpringBatchExpireMedicationApplication
类启动 Spring Boot 应用。
一旦计划的方法执行,Spring 将记录所有参数。
INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=medExpirationJob]] launched with the following parameters: [{'SALE_STARTS_DAYS':'{value=45, type=class java.lang.Long, identifying=true}','MEDICINE_SALE':'{value=0.1, type=class java.lang.Double, identifying=true}','TRACE_ID':'{value=e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, type=class java.lang.String, identifying=true}','ALERT_TYPE':'{value=LOGS, type=class java.lang.String, identifying=true}','TRIGGERED_DATE_TIME':'{value=2023-12-06T22:36:00.011436600Z, type=class java.lang.String, identifying=true}','DEFAULT_EXPIRATION':'{value=60, type=class java.lang.Long, identifying=true}'}]
首先,ItemReader
会根据 DEFAULT_EXPIRATION
参数写入已找到的药品信息:
INFO c.b.b.job.ExpiresSoonMedicineReader - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. Found 2 meds that expires soon
其次,ItemProcessor
使用 SALE_STARTS_DAYS
和 MEDICINE_SALE
参数来计算新价格:
INFO c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 18.0 for medicine 9d39321d-34f3-4eb7-bb9a-a69734e0e372
INFO c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 36.0 for medicine acd99d6a-27be-4c89-babe-0edf4dca22cb
最后,ItemWriter
会将更新的药物写入同一 trace 内的日志:
INFO c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=9d39321d-34f3-4eb7-bb9a-a69734e0e372, name=Flucloxacillin, type=ANTIBACTERIALS, expirationDate=2024-01-16 00:00:00.0, originalPrice=20.0, salePrice=18.0)
INFO c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=acd99d6a-27be-4c89-babe-0edf4dca22cb, name=Prozac, type=ANTIDEPRESSANTS, expirationDate=2024-01-06 00:00:00.0, originalPrice=40.0, salePrice=36.0)
INFO c.b.b.job.MedicineWriter - Finishing job started at 2023-12-07T11:58:00.014430400Z
6、总结 {#6总结}
本文介绍了如何在 Spring Batch 中使用任务参数。
ItemReader
、ItemProcessor
和 ItemWriter
可以在 Bean 初始化过程中手动添加参数,也可以由 Spring 通过 @BeforeStep
或 Setter 注入添加参数。
Ref:https://www.baeldung.com/spring-batch-itemreader-access-job-parameters