51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Redis Stream消息队列 java实战

![](https://img1.51tbox.com/static/2024-06-03/col/b37515c98575514e7bc3131a12cceb3a/c1304cd2e1ee471eb77bc9eab7242db5.jpg.jpg) 工具类部分内容 ------- ``` package com.hwd.campus.common.redis.utils; import com.hwd.campus.common.redis.constant.RedisKeyPrefixConst; import com.hwd.campus.common.redis.service.RedisListSelect; import com.hwd.campus.common.redis.service.RedisSelect; import lombok.AllArgsConstructor; import org.springframework.data.redis.connection.stream.Record; import org.springframework.data.redis.connection.stream.StreamInfo; import org.springframework.data.redis.core.*; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /**  * @author  * @datetime 2023-01-11 09:25:52  * @description  */ @Component @AllArgsConstructor public class RedisUtils {     private RedisTemplate<String, Object> redisTemplate;     private StringRedisTemplate stringRedisTemplate;     private static final Long DAY_SECONDS = 60 * 60 * 24L;     private static final Long SEVEN_DAY_SECONDS = 7 * DAY_SECONDS;     public StreamInfo.XInfoGroups groups(String key) {         return stringRedisTemplate.opsForStream().groups(key);     }     public void addGroup(String key, String groupName) {         stringRedisTemplate.opsForStream().createGroup(key, groupName);     }     /**      * 添加流      *      * @param streamKey  流关键      * @param msgContext 上下文      */     public void addStream(String streamKey, Object msgContext) {         stringRedisTemplate.opsForStream().add(Record.of(msgContext).withStreamKey(streamKey));     } } ``` ##此处采用Stream实现消息队列 创建监听器 ----- ``` package com.hwd.campus.manage.biz.listener; import com.hwd.campus.common.redis.constant.RedisKeyPrefixConst; import com.hwd.campus.common.redis.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.Duration; /**  * @author   * @datetime 2023-01-14 11:04:28  * @description 消费监听,自动ack  */ @Slf4j @Component public class LogStreamConsumerRunner implements ApplicationRunner, DisposableBean {     @Resource     private RedisConnectionFactory redisConnectionFactory;     @Resource     private ThreadPoolTaskExecutor threadPoolTaskExecutor;     @Resource     private LogStreamConsumer logStreamConsumer;     @Resource     private RedisUtils redisUtils;     private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer;     @Override     public void run(ApplicationArguments args) {         addConsumeGroup(RedisKeyPrefixConst.OPERATE_LOG_STREAM_KEY, RedisKeyPrefixConst.OPERATE_LOG_CONSUME_GROUP);         addConsumeGroup(RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY, RedisKeyPrefixConst.LOGIN_LOG_CONSUME_GROUP); threadPoolTaskExecutor.setMaxPoolSize(100);         // 创建配置对象         StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =                 StreamMessageListenerContainer.StreamMessageListenerContainerOptions                         .builder()                         // 一次性最多拉取多少条消息                         .batchSize(10)                         //执行消息轮询的执行器                         .executor(this.threadPoolTaskExecutor)                         // 消息消费异常的handler                         .errorHandler(Throwable::printStackTrace)                         //超时时间,设置为0,表示不超时(超时后会抛出异常)                         .pollTimeout(Duration.ZERO)                         // 序列化器                         .serializer(new StringRedisSerializer())                         .targetType(String.class)                         .build();         //根据配置对象创建监听容器对象         streamMessageListenerContainer = StreamMessageListenerContainer.create(this.redisConnectionFactory, options);         //使用监听容器对象开始监听消费         receiveAutoAck(RedisKeyPrefixConst.OPERATE_LOG_CONSUME_GROUP, RedisKeyPrefixConst.OPERATE_LOG_CONSUME_NAME, RedisKeyPrefixConst.OPERATE_LOG_STREAM_KEY);         receiveAutoAck(RedisKeyPrefixConst.LOGIN_LOG_CONSUME_GROUP, RedisKeyPrefixConst.LOGIN_LOG_CONSUME_NAME, RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY);         //启动监听         streamMessageListenerContainer.start();     }     private void receiveAutoAck(String consumeGroup, String consumeName, String streamKey) {         streamMessageListenerContainer.receiveAutoAck(                 Consumer.from(consumeGroup, consumeName),                 StreamOffset.create(streamKey, ReadOffset.lastConsumed()), this.logStreamConsumer);     }     private void addConsumeGroup(String streamKey, String consumeGroup) {         if (redisUtils.hasKey(streamKey)) {             StreamInfo.XInfoGroups groups = redisUtils.groups(streamKey);             if (groups.isEmpty()) {                 redisUtils.addGroup(streamKey, consumeGroup);             }         } else {             redisUtils.addGroup(streamKey, consumeGroup);         }     }     @Override     public void destroy() {         this.streamMessageListenerContainer.stop();     } } ``` 进行消费进行日志增加 ---------- ``` package com.hwd.campus.manage.biz.listener; import cn.hutool.json.JSONUtil; import com.hwd.campus.manage.biz.model.vo.LoginVo; import com.hwd.campus.manage.biz.service.ILoginLogService; import com.hwd.campus.manage.biz.service.IOperateLogService; import com.hwd.campus.common.redis.constant.RedisKeyPrefixConst; import com.hwd.campus.common.redis.utils.RedisUtils; import com.hwd.campus.common.web.filter.model.OperateLogModel; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.stream.StreamListener; import org.springframework.stereotype.Component; import java.util.Objects; /**  * @author   */ @Component @Slf4j @AllArgsConstructor public class LogStreamConsumer implements StreamListener<String, ObjectRecord<String, String>> {     private RedisUtils redisUtils;     private IOperateLogService manageOperateLogService;     private ILoginLogService manageLoginLogService;     @Override     public void onMessage(ObjectRecord<String, String> message) {         log.info("接受到来自redis的消息");         log.info(("message id " + message.getId().getValue()));         String stream = message.getStream();         log.info(("stream " + stream));         Object value = message.getValue();         log.info(("value " + value));         if (RedisKeyPrefixConst.OPERATE_LOG_STREAM_KEY.equals(stream)) {             OperateLogModel operateLogModel = JSONUtil.toBean(message.getValue(), OperateLogModel.class);             manageOperateLogService.addOperateLog(operateLogModel);         } else if (RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY.equals(stream)) {             LoginVo loginVo = JSONUtil.toBean(message.getValue(), LoginVo.class);             manageLoginLogService.addLoginLog(loginVo);         }         //消费完毕删除该条消息         redisUtils.streamDelete(Objects.requireNonNull(stream), message.getId().getValue());     } } ``` 可通过接口往stream里面set值 ------------------ ``` redisUtils.addStream(RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY, JSONUtil.toJsonStr(loginVo)); ```
赞(7)
未经允许不得转载:工具盒子 » Redis Stream消息队列 java实战