在应用中把 Redis 当成消息队列来使用已经屡见不鲜了。我想主要原因是当代应用十有八九都会用到 Redis,因此不用再引入其他消息队列系统。而且 Redis 提供了好几种实现消息队列的方法,用起来也简单。
使用 Redis 实现消息队列的几种方式 {#使用-redis-实现消息队列的几种方式}
Redis 提供了多种方式来实现消息队列。
Pub/Sub {#pubsub}
订阅发布模式,发布者把消息发布到某个 Channel,该 Channel 的所有订阅者都会收到消息。但是这种方式最大的问题是 发布出去的消息,如果没有被监听消费,或者消费过程中宕机,那么消息就会永远丢失。适合用于临时通知之类的场景,对于需要保证数据不丢失的场景不能使用这种方式。
List {#list}
List 是 Redis 提供的一种数据类型,底层是链表,可以用来实现队列、栈。
Stream {#stream}
Stream 是一个由 Redis 5 引入的,功能完善的消息队列。想必也是 Redis 官方团队看到太多人拿 Redis 当消息队列使,于是干脆就在 Redis 上设计出一个类似于 Kafka 的消息队列。
Steam 支持消费组消费,一条消息只能被消费组中的其中一个消费者消费。支持 消息确认 、支持 回溯消费 还支持把未 ACK(确认)的消息转移给其他消费者进行重新消费,在进行转移的时候还会累计消息的转移次数,当次数达到一定阈值还没消费成功,就可以放入死信队列。
这也是 Redis 种最复杂的一种数据类型。如果你真的到了需要使用 Redis Steam 作为消息队列的地步,那不如直接使用 RabbitMQ 等更加成熟且稳定的消息队列系统。
使用 List 实现可靠的消息队列 {#使用-list-实现可靠的消息队列}
目前来说,这是用得最多的一种方式,适用于大多数简单的消息队列应用场景。List 类型有很多指令,但是作为消息队列来说用到的只有几个个:
LPUSH key element [element ...]
把元素插入到 List 的首部,如果 List 不存在,会自动创建。
BRPOPLPUSH source destination timeout
移除并且返回 List (source)尾部的最后一个元素,并且同时会把这个元素插入到另一个 List (destination)的首部。
当 source List 中没有元素时,Redis 会阻塞连接,直到有其他客户端向其推送元素或超时。超时时间(秒)为 0 表示永远不超时。
注意,这个命令是 原子性 的,也就是说只要客户端获取到了返回的元素,那么这个元素一定就会在 destination List 有备份。这是实现可靠消息队列的关键!
RPOPLPUSH source destination
同上,它是 BRPOPLPUSH
命令的 非阻塞 版,如果 List 中没有元素就会立即返回 null
。
LREM key count element
从 List 中删除元素,count 的值不同,删除的方式也不同:
- count > 0:从头到尾开始搜索,删除与 element 相等的元素,最多删除 count 个。
- count < 0:从尾到头开始搜索,删除与 element 相等的元素,最多删除 count (绝对值)个。
- count = 0:删除所有与元素相等的元素。
BLMOVE
和LMOVE
命令
LMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT>
BLMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT> timeout
从 Redis 6.2.0 开始,
BRPOPLPUSH
和RPOPLPUSH
命令就被声明为废弃了,取而代之的是语义更加明确的BLMOVE
和LMOVE
命令。
BLMOVE
和LMOVE
可以通过参数指定元素出队列(source)的方向,和入队列(destination)的方向,除此以外并无其他区别。
实现思路 {#实现思路}
了解了上述几个命令后,一个简单易用且可靠的消息队列就呼之欲出了。
- 生产者使用
LPUSH
命令往消息队列生产消息 - 消费者使用
BRPOPLPUSH
命令从队列消费消息,并且还会在获取并返回消息的时候把该消息推送到另一个消息队列,也就是 Pending 队列,这个队列中存储的就是未被消费者 ACK 的消息 - 消费者成功消费完毕后,使用
LREM
命令从 Pending 队列中删除这条消息,整个消费过程结束 - 如果消费者在消费过程中出现异常、宕机,那么需要在恢复后从 Pending 队列中获取到这条消息,再进行重新消费,从而保证了消息队列的可靠性,不会丢失消息(可能存在重复消费,需要做好幂等处理)
在 Spring Boot 中实现 {#在-spring-boot-中实现}
首先,创建 Spring Boot 项目,并整合 Redis。关于如何在 Spring Boot 中整合使用 Redis,请参阅 这篇文章。
创建一个 OrderConsumer
Bean 模拟从队列中消费订单 ID。
package cn.springdoc.demo.consumer;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class OrderConsumer implements ApplicationRunner, Runnable {
static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);
// 消息队列
final String queue = "queue_orders";
// pending 队列,即待确认消息的队列
final String pendingQueue = "pending_queue_orders";
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
public void run(ApplicationArguments args) throws Exception {
// 应用启动后,创建新的线程来执行消费任务
Thread thread = new Thread(this);
thread.setName("order-consumer-thread");
thread.start();
}
@Override
public void run() {
while (true) {
try {
// 1:消费者,从队列未弹出消息,并推送到 pending 队列,整个过程是原子性的
// 最多阻塞 5 秒,超过 5 秒后还没有消息,则返回 null
String item = stringRedisTemplate.opsForList().rightPopAndLeftPush(queue, pendingQueue, 5, TimeUnit.SECONDS);
if (item == null) {
log.info("等待消息 ...");
continue ;
}
try {
// 2:解析为 Long
Long orderId = Long.parseLong(item);
// 模拟消息消费
log.info("消费消息: {}", orderId);
} catch (Exception e) {
log.error("消费异常:{}", e.getMessage());
continue;
}
// 3:消费成功,从 pending 队列删除记录,相当于确认消费
stringRedisTemplate.opsForList().remove(pendingQueue, 0, item);
} catch (Exception e) {
log.error("队列监听异常:{}", e.getMessage());
break;
}
}
log.info("退出消费");
}
}
OrderConsumer
实现了 ApplicationRunner
接口,在应用就绪后创建新的消费线程进行消费。
stringRedisTemplate.opsForList().rightPopAndLeftPush
方法从 queue
队列消费一条消息,同时把消息添加到 pendingQueue
队列。该方法底层调用的正是 brpoplpush
命令,最多阻塞 5 秒,超时后返回 null
。
得到消息后解析为 Long
类型,模拟消费,即输出到日志。如果消费成功,则调用 stringRedisTemplate.opsForList().remove
方法(底层正是 LREM
命令)从 pendingQueue
队列中删除消息。如果消费失败,失败的消息会在 pendingQueue
队列中继续存在,不会丢失,可以重新投递消费或者是人工处理。
测试 {#测试}
启动应用后,通过 Redis 客户端往 queue_orders
队列推送消息:
> lpush queue_orders 10000
"1"
> lpush queue_orders 10010
"1"
> lpush queue_orders 10011
"1"
> lpush queue_orders Nan
"1"
往 queue_orders
队列推送了四条订单的 ID。注意最后一条消息值是 Nan
,这会导致 Long.parseLong
异常从而导致消费失败。
服务端输出日志如下:
[ main] cn.springdoc.demo.DemoApplication : Started DemoApplication in 3.769 seconds (process running for 4.18)
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消费消息: 10000
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消费消息: 10010
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消费消息: 10011
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消费异常:For input string: "Nan"
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ...
符合预期,前面三条消息都成功消费,最后一条消息消费失败。按照设计,这条消费失败的消息应该在 Pending 队列 pending_queue_orders
中存在。且应该只有这一条消息,因为其他三条消息都消费成功。
查看 pending_queue_orders
队列中的所有元素:
> lrange pending_queue_orders 0 -1
1) "Nan"
一切 OK,该队列中只有 Nan
这条消息,正是消费失败的那条消息。
此时,你如果想查看一下 Redis 中的所有 key,你会发现只有 pending_queue_orders
队列存在:
> keys *
1) "pending_queue_orders"
queue_orders
队列呢?这是 Redis List
的一个特性,当从 List
中弹出最后一个元素后,Redis 就会删除这个 List
。queue_orders
中的元素都被弹出了,所以它被删除了。当再次尝试往 queue_orders
中压入消息时,它会自动创建。也就是说 我们不需要手动预先创建队列, Redis 会自己创建,也会在合适的时间删除,而这一切都是线程安全的。
由于这是线程安全的,所以队列中的 一条消息只能被一个消费者(客户端)进行消费,这非常适合在分布式或者是集群模式下使用,不必担心同一条消息被多个消费者消费到。
注意,Pending 队列中的消息可能存在重复消费的可能。例如,消费者成功消费消息后,在调用
remove
方法从 Pending 队列中删除消息时失败,那么 Pending 队列中的这条删除失败的消息其实已经是被成功消费了的,需要在业务中考虑到!
使用 BLMOVE 和 LMOVE 命令 {#使用-blmove-和-lmove-命令}
上文说过,从 Redis 6.2.0 开始 BRPOPLPUSH
和 RPOPLPUSH
命令就被声明为废弃了,后续版本中推荐使用 BLMOVE
和 LMOVE
命令。
目前 StringRedisTemplate
(Spring Boot 3.2.2 )并未直接提供与 BLMOVE
和 LMOVE
命令对应的 API 方法,但是可以获取到底层连接对象来调用 BLMOVE
和 LMOVE
命令。
String item = this.stringRedisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
// 调用 bLMove 命令
byte[] ret = connection.listCommands().bLMove(queue.getBytes(), pendingQueue.getBytes(), Direction.RIGHT, Direction.LEFT, 5);
return ret == null ? null : new String(ret);
}
});
Redis 的持久化方式 {#redis-的持久化方式}
Redis 是一个内存数据库,为了保证数据的安全不丢失,它提供了两种数据备份(持久化)方式,即 RDB 和 AOF。
- RDB:生成某一时刻的数据快照,通过子进程进行备份,数据可能不完整(取决于备份周期)。
- AOF:通过记录执行的指令到文件来实现数据备份,相对完整性较高,但是会记录每一条执行命令,性能会有一定影响。
这就需要根据你的业务场景来选择合适的持久化方式,也可以同时配合使用 RDB 和 AOF 两种方式,兼顾性能和数据安全。
总结 {#总结}
本文介绍了如何在 Spring Boot 中使用 Redis List 的 BRPOPLPUSH
/BLMOVE
命令来实现一个线程安全且可靠的消息队列。