1、简介 {#1简介}
本文将通过一些实际的案例,带你了解 Spring Integration 的核心概念。
Spring Integration 提供了许多功能强大的组件,可大大增强企业架构内系统和流程的互联性。
2、设置 {#2设置}
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>5.1.13.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>5.1.13.RELEASE</version>
</dependency>
你可以从 Maven Central 下载最新版本的 Spring Integration Core 和 Spring Integration File Support。
3、消息传递模式 {#3消息传递模式}
这个库中的一个基本模式是消息传递(Messaging)。该模式围绕消息展开,消息是离散的数据负载(Discrete Payloads),通过预定义的通道从源系统或进程传递到一个或多个系统或进程。
从历史上看,这种模式是整合多个不同系统的最灵活方式,它可以:
- 几乎完全解耦集成所涉及的系统
- 允许集成中的参与系统完全不考虑彼此的底层协议、格式或其他实现细节
- 鼓励开发和重复使用参与集成的组件
4、消息集成的实际应用 {#4消息集成的实际应用}
举一个基本例子,将 MPEG 视频文件从指定文件夹复制到另一个配置文件夹:
@Configuration
@EnableIntegration
public class BasicIntegrationConfig{
public String INPUT_DIR = "the_source_dir";
public String OUTPUT_DIR = "the_dest_dir";
public String FILE_PATTERN = "*.mpeg";
@Bean
public MessageChannel fileChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader= new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
@Bean
@ServiceActivator(inputChannel= "fileChannel")
public MessageHandler fileWritingMessageHandler() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}
}
上面的代码配置了一个 ServiceActivator(服务激活器),一个 MessageChannel(集成通道)和一个 InboundChannelAdapter(入站通道适配器)。
稍后将详细介绍每种组件类型。@EnableIntegration
注解将该类指定为 Spring Integration 配置。
启动 Spring Integration 应用上下文:
public static void main(String... args) {
AbstractApplicationContext context
= new AnnotationConfigApplicationContext(BasicIntegrationConfig.class);
context.registerShutdownHook();
Scanner scanner = new Scanner(System.in);
System.out.print("Please enter q and press <enter> to exit the program: ");
while (true) {
String input = scanner.nextLine();
if("q".equals(input.trim())) {
break;
}
}
System.exit(0);
}
上述 main 方法启动了 Integration Context;它还接受从命令行输入的 q
字符,以退出程序。接下来,让我们更详细地研究一下这些组件。
5、Spring Integration 组件 {#5spring-integration-组件}
5.1、Message {#51message}
org.springframework.integration.Message
接口定义了 Spring Message:Spring Integration Context 中的数据传输单元。
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
它定义了两个关键元素的 get 方法:
- 消息头,本质上是一个Key/Value容器,可用于传输元数据,由
org.springframework.integration.MessageHeaders
类定义 - 消息 Payload,即要传输的有价值的实际数据 - 在本例中,视频文件就是 Payload
5.2、Channel {#52channel}
Spring Integration(以及 EAI)中的 Channel 是集成架构中的基本管道。它是一个系统向另一个系统传递消息的管道。
你可以把它想象成一个字面意义上的管道,集成系统或流程可以通过它向其他系统推送信息(或接收来自其他系统的信息)。
Spring Integration 中的 Channel 有很多种,具体取决于你的需求。它们在很大程度上是可配置的,开箱即用,无需任何自定义代码,但如果你有自定义需求,也有一个强大的框架可供使用。
点对点(P2P)Channel 用于在系统或组件之间建立 1 对 1 的通信线路。一个组件在 Channel 上发布信息,另一个组件就能接收到信息。Channel 的每一端只能有一个组件。
如你所见,配置 Channel 就像返回 DirectChannel
实例一样简单:
@Bean
public MessageChannel fileChannel1() {
return new DirectChannel();
}
@Bean
public MessageChannel fileChannel2() {
return new DirectChannel();
}
@Bean
public MessageChannel fileChannel3() {
return new DirectChannel();
}
如上,定义了三个独立的 Channel,它们都由各自的 getter 方法名称标识。
发布-订阅(Pub-Sub)Channel 用于在系统或组件之间建立一对多的通信线路。这样,就可以向之前创建的所有 3 个 DirectChannel
发布信息。
因此,对本例来说,可以用发布-订阅(Pub-Sub)Channel 来取代 P2P Channel:
@Bean
public MessageChannel pubSubFileChannel() {
return new PublishSubscribeChannel();
}
@Bean
@InboundChannelAdapter(value = "pubSubFileChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
现在,已将入站 Channel 适配器(InboundChannelAdapter
)转换为发布到 Pub-Sub Channel;。这样,就可以将从源文件夹读取的文件发送到多个目的地。
5.3、Bridge {#53bridge}
Spring Integration 中的 Bridge(桥接器)用于连接两个消息 Channel 或 Adapter,如果它们因某种原因无法直接连接的话。
在本例中,可以使用 Bridge 将发布-订阅 Channel 连接到三个不同的点对点 Channel(因为点对点 Channel 和发布-订阅 Channel 不能直接连接):
@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel1() {
return new DirectChannel();
}
@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel2() {
return new DirectChannel();
}
@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel3() {
return new DirectChannel();
}
上述 Bean 配置将 PubSubFileChannel
与三个 P2P(点对点)Channel 桥接起来。@BridgeFrom
注解定义了 Bridge,可应用于需要订阅 Pub-Sub(发布订阅)Channel 的任意数量的 Channel。
可以将上述代码理解为:"创建一个从 pubSubFileChannel
到 fileChannel1
、fileChannel2
和 fileChannel3
的 Bridge,这样,来自 pubSubFileChannel
的消息就可以同时传送到这三个 Channel"。
5.4、ServiceActivator {#54serviceactivator}
Service Activator(服务激活器)是任何在给定方法上定义了 @ServiceActivator
注解的 POJO。这使我们能够在收到入站 Channel 的消息时执行 POJO 上的任何方法,并将消息写入出站 Channel。
在本例中,ServiceActivator
从配置的输入 Channel 接收文件,并将其写入配置的文件夹。
5.5、Adapter {#55adapter}
Adapter(适配器)是一种基于企业集成模式的组件,可以 "插入" 系统或数据源。从字面上看,它几乎就像我们从插入墙上插座或电子设备中所熟知的适配器一样。
它允许与数据库、FTP 服务器和消息系统(如 JMS、AMQP 和 Twitter 等社交网络)等 "黑盒" 系统进行可重复使用的连接。连接到这些系统的需求无处不在,这意味着适配器具有很强的可移植性和可重用性(事实上,有一个小型的 适配器目录,任何人都可以免费使用)。
适配器分为 inbound(入站)和 outbound(出站)两大类。
让我们结合示例场景中使用的适配器来看看这些类别:
如你所见,入站适配器(Inbound Adapter)用于从外部系统(这里是文件系统目录)引入信息。
入站适配器配置包括:
@InboundChannelAdapter
注解将 Bean 配置标记为适配器 - 配置适配器将向其发送消息的 Channel(在本例中是 MPEG 文件)和poller
,poller
是一个组件,可帮助适配器以指定的时间间隔轮询配置的文件夹。- 一个标准的 Spring Java 配置类,可返回
FileReadingMessageSource
,即处理文件系统轮询的 Spring Integration 类实现
出站适配器(Outbound Adapter)用于向外发送消息。Spring Integration 支持多种开箱即用的适配器,适用于各种常见用例。
6、总结 {#6总结}
本文通过一个基本的示例来介绍了 Spring Integration,展示了 Spring Integration 库的基于 Java 的配置和可重用性的组件。
Spring Integration 代码既可作为 JavaSE 中的独立项目部署,也可作为 Jakarta EE 环境中更大项目的一部分部署。虽然 Spring Integration 无法与其他以 EAI 为中心的产品和模式(如企业服务总线 ESB)直接竞争,但它是一种可行的轻量级替代方案,可以解决 ESB 所要解决的许多相同问题。
Ref:https://www.baeldung.com/spring-integration