1、简介 {#1简介}
在本教程中,我们将介绍如何使用 PostgreSQL 的 LISTEN
命令来在 Spring Boot 应用中实现简单的 MQ。
2、PostgreSQL 的 LISTEN/NOTIFY 机制简介 {#2postgresql-的-listennotify-机制简介}
简单地说,这些命令允许连接的客户端通过普通的 PostgreSQL 连接交换信息。客户端使用 NOTIFY
命令向 channel 发送通知以及可选的 string payload。
channel 可以是任何有效的 SQL 标识符,其工作原理与传统 MQ 系统中的 topic 类似。这意味着 payload 将发送给该特定 channel 的所有活动的监听器(listener)。如果没有 payload,监听者只会收到一个空通知。
要开始接收通知,客户端需要使用 LISTEN
命令,该命令将 channel 名称作为唯一参数。该命令会立即返回,因此客户端可以使用同一连接继续执行其他任务。
- channel 名称在数据库中是唯一的。
- 客户端使用 LISTEN/NOTIFY 时无需特殊授权。
- 在事务中使用 NOTIFY 时,客户端只有在事务成功完成时才会收到通知。
此外,如果在一个事务中使用相同的 payload 向同一 channel 发送多个 NOTIFY 命令,客户端将只收到一个通知。
3、使用 PostgreSQL 作为 Message Broker 的场景 {#3使用-postgresql-作为-message-broker-的场景}
鉴于 PostgreSQL 通知的特性,我们不禁要问,什么时候使用它而不是 RabbitMQ 或类似的成熟 message broker。这需要权衡利弊。一般来说,选择后者意味着:
- 更复杂 - message broker 是另一个必须监控、升级的组件。
- 处理分布式事务中的失败模式(failure mode)。
- 假设我们使用 PostgreSQL 作为主数据库,这些功能已经预制。
- 无分布式事务。
- 这是一种专有机制,只能用于 PostgreSQL。
- 不直接支持持久化订阅者。在客户端开始监听消息之前发送的通知将丢失。
- "模块化单体"式应用中的通知总线。
- 分布式缓存失效。
- 轻量级消息代理,使用普通数据库表作为队列。
- 事件源架构。
4、在 Spring Boot 应用中使用 LISTEN/NOTIFY {#4在-spring-boot-应用中使用-listennotify}
既然我们已经对 LISTEN/NOTIFY 机制有了基本的了解,下面就让我们继续使用它构建一个简单的 Spring Boot 测试应用。我们将创建一个简单的 API,允许我们提交买入/卖出订单。payload 包括我们愿意买入或卖出的仪器id、价格和数量。我们还将添加一个 API,允许我们根据订单的 id 查询订单。
这时,通知机制就派上用场了:我们将在每次插入时发送 NOTIFY,客户端将使用 LISTEN 将订单预加载到各自的本地缓存中。
4.1、项目依赖 {#41项目依赖}
我们的示例应用需要 Spring Boot 应用的常规依赖项以及 PostgreSQL 驱动:
spring-boot-starter-web 、spring-boot-starter-data-jdbc 和 postgresql 的最新版本可从 Maven Central 获取。
4.2、Notification Service {#42notification-service}
由于通知机制是 PostgreSQL 特有的,我们将把它的一般行为封装在一个类中: NotifierService
有两个职责。首先,它提供了发送订单相关通知的 facade(外观设计模式):
public class NotifierService {
private static final String ORDERS_CHANNEL = "orders";
private final JdbcTemplate tpl;
public void notifyOrderCreated(Order order) {
tpl.execute("NOTIFY " + ORDERS_CHANNEL + ", '" + order.getId() + "'");
// ... other methods omitted
其次,它有一个用于 Runnable
实例的工厂方法,应用可使用该方法接收通知。该工厂需要一个 PGNotification
对象的消费者,其中包含检索与通知相关的 channel 和 payload 的方法:
public Runnable createNotificationHandler(Consumer<PGNotification> consumer) {
return () -> {
tpl.execute((Connection c) -> {
c.createStatement().execute("LISTEN " + ORDERS_CHANNEL);
PGConnection pgconn = c.unwrap(PGConnection.class);
while(!Thread.currentThread().isInterrupted()) {
PGNotification[] nts = pgconn.getNotifications(10000);
if ( nts == null || nts.length == 0 ) {
for( PGNotification nt : nts) {
return 0;
在这里,为了简单起见,我们选择交付原始的 PGNotification
。在现实世界中,我们通常要处理多个 domain entity,我们可以使用泛型或类似技术来继承该类,以避免代码重复。
关于创建的 Runnable
- 与数据库相关的逻辑使用
方法。这可确保正确的连接处理/清理,并简化错误处理。 - 回调一直运行,直到当前线程被中断或运行时出错导致它返回。
请注意使用的是 PGConnection
,而不是标准的 JDBC Connection
。我们需要它来直接访问 getNotifications()
有两个重载方法。在不带参数的情况下调用时,它会轮询并返回任何待处理的通知。如果没有,则返回 null
。第二个变量接受一个 integer,表示等待通知的最长时间,超时返回 null
。如果我们传递 0
在应用初始化过程中,我们在 @Configuration
类中使用了 CommandLineRunner
Bean,该 Bean 实际上将生成一个新的线程以开始接收通知:
public class ListenerConfiguration {
CommandLineRunner startListener(NotifierService notifier, NotificationHandler handler) {
return (args) -> {
Runnable listener = notifier.createNotificationHandler(handler);
Thread t = new Thread(listener, "order-listener");
4.3、处理连接 {#43处理连接}
使用同一连接处理通知和常规查询虽然技术上可行,但并不方便。这必须在控制流中分散调用 getNotification()
标准做法是运行一个或多个专用线程来处理通知。每个线程都有自己的连接,并一直保持打开状态。如果这些连接是由 Hikari 或 DBCP 等连接池创建的,这可能会造成问题。
为了避免这些问题,我们的示例创建了一个专用的 DriverDataSource
,反过来,我们用它来创建 NotifierService
所需的 JdbcTemplate
public class NotifierConfiguration {
NotifierService notifier(DataSourceProperties props) {
DriverDataSource ds = new DriverDataSource(
new Properties(),
JdbcTemplate tpl = new JdbcTemplate(ds);
return new NotifierService(tpl);
请注意,我们共享了用于创建 Spring 管理的 main DataSource
的相同连接属性。不过,我们没有将此专用 DataSource
作为 bean 公开,因为这样会禁用 Spring Boot 的自动配置功能。
4.4、处理通知 {#44处理通知}
缓存逻辑的最后一部分是 NotificationHandler
类,它实现了 Consumer<Notification>
接口。该类的作用是处理单个通知,并用 Order
实例填充到已配置的 Cache
public class NotificationHandler implements Consumer<PGNotification> {
private final OrdersService orders;
public void accept(PGNotification t) {
Optional<Order> order = orders.findById(Long.valueOf(t.getParameter()));
// ... log messages omitted
该实现使用 getName()
和 getParameter()
从通知中获取 channel 名称和 order id。在此,我们可以假设通知始终是预期的通知。
实际逻辑非常简单:我们使用 OrderRepository
从数据库中获取 Order
public class OrdersService {
private final OrdersRepository repo;
// ... other private fields omitted
@Transactional(readOnly = true)
public Optional<Order> findById(Long id) {
Optional<Order> o = Optional.ofNullable(ordersCache.get(id, Order.class));
if (!o.isEmpty()) {
log.info("findById: cache hit, id={}",id);
return o;
log.info("findById: cache miss, id={}",id);
o = repo.findById(id);
if ( o.isEmpty()) {
return o;
ordersCache.put(id, o.get());
return o;
5、测试 {#5测试}
要查看通知机制的运行情况,最好的办法是启动两个或多个测试应用实例,每个实例都配置为监听不同的端口。我们还需要两个实例都能连接的 PostgreSQL
实例。请参考 application.properties 文件,并根据 PostgreSQL 实例的连接详情对其进行修改。
接下来,为了启动测试环境,我们将打开两个 shell 并使用 Maven 运行应用。项目的 pom.xml
包含一个额外的配置文件 instance1
# 第一个 shell
$ mvn spring-boot:run
... many messages (omitted)
[ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
[ restartedMain] c.b.messaging.postgresql.Application : Started Application in 2.615 seconds (JVM running for 2.944)
[ restartedMain] c.b.m.p.config.ListenerConfiguration : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService : notificationHandler: sending LISTEN command...
第二个 shell
... many messages (omitted)
[ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path ''
[ restartedMain] c.b.messaging.postgresql.Application : Started Application in 1.984 seconds (JVM running for 2.274)
[ restartedMain] c.b.m.p.config.ListenerConfiguration : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService : notificationHandler: sending LISTEN command...
过一段时间后,我们应该在每个日志中看到一条消息,通知我们应用已准备好接收请求。现在,让我们在另一个 shell 上使用 curl
创建第一个 Order:
$ curl --location 'http://localhost:8080/orders/buy' \
--form 'symbol="BAEL"' \
--form 'price="13.34"' \
--form 'quantity="500"'
在 8080 端口运行的应用实例将打印一些信息。我们还将看到 8081
[ order-listener] c.b.m.p.service.NotificationHandler : Notification received: pid=5141, name=orders, param=30
[ order-listener] c.b.m.postgresql.service.OrdersService : findById: cache miss, id=30
[ order-listener] c.b.m.p.service.NotificationHandler : order details: Order(id=30, symbol=BAEL, orderType=BUY, price=13.34, quantity=500.00)
这证明该机制按预期运行。最后,我们可以再次使用 curl
来查询在 instance1
curl http://localhost:8081/orders/30
[nio-8081-exec-1] c.b.m.postgresql.service.OrdersService : findById: cache hit, id=30
6、总结 {#6总结}
在本文中,我们介绍了 PostgreSQL 的 NOTIFY/LISTEN 机制,以及如何使用它来实现一个无需额外组件的轻量级 message broker。