2024-06-04
分类:Java笔记
阅读(142) 评论(0)
![](https://img1.51tbox.com/static/2024-06-03/col/b37515c98575514e7bc3131a12cceb3a/c1304cd2e1ee471eb77bc9eab7242db5.jpg.jpg)
工具类部分内容
-------
```
package com.hwd.campus.common.redis.utils;
import com.hwd.campus.common.redis.constant.RedisKeyPrefixConst;
import com.hwd.campus.common.redis.service.RedisListSelect;
import com.hwd.campus.common.redis.service.RedisSelect;
import lombok.AllArgsConstructor;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author
* @datetime 2023-01-11 09:25:52
* @description
*/
@Component
@AllArgsConstructor
public class RedisUtils {
private RedisTemplate<String, Object> redisTemplate;
private StringRedisTemplate stringRedisTemplate;
private static final Long DAY_SECONDS = 60 * 60 * 24L;
private static final Long SEVEN_DAY_SECONDS = 7 * DAY_SECONDS;
public StreamInfo.XInfoGroups groups(String key) {
return stringRedisTemplate.opsForStream().groups(key);
}
public void addGroup(String key, String groupName) {
stringRedisTemplate.opsForStream().createGroup(key, groupName);
}
/**
* 添加流
*
* @param streamKey 流关键
* @param msgContext 上下文
*/
public void addStream(String streamKey, Object msgContext) {
stringRedisTemplate.opsForStream().add(Record.of(msgContext).withStreamKey(streamKey));
}
}
```
##此处采用Stream实现消息队列
创建监听器
-----
```
package com.hwd.campus.manage.biz.listener;
import com.hwd.campus.common.redis.constant.RedisKeyPrefixConst;
import com.hwd.campus.common.redis.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.Duration;
/**
* @author
* @datetime 2023-01-14 11:04:28
* @description 消费监听,自动ack
*/
@Slf4j
@Component
public class LogStreamConsumerRunner implements ApplicationRunner, DisposableBean {
@Resource
private RedisConnectionFactory redisConnectionFactory;
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Resource
private LogStreamConsumer logStreamConsumer;
@Resource
private RedisUtils redisUtils;
private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer;
@Override
public void run(ApplicationArguments args) {
addConsumeGroup(RedisKeyPrefixConst.OPERATE_LOG_STREAM_KEY, RedisKeyPrefixConst.OPERATE_LOG_CONSUME_GROUP);
addConsumeGroup(RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY, RedisKeyPrefixConst.LOGIN_LOG_CONSUME_GROUP);
threadPoolTaskExecutor.setMaxPoolSize(100);
// 创建配置对象
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次性最多拉取多少条消息
.batchSize(10)
//执行消息轮询的执行器
.executor(this.threadPoolTaskExecutor)
// 消息消费异常的handler
.errorHandler(Throwable::printStackTrace)
//超时时间,设置为0,表示不超时(超时后会抛出异常)
.pollTimeout(Duration.ZERO)
// 序列化器
.serializer(new StringRedisSerializer())
.targetType(String.class)
.build();
//根据配置对象创建监听容器对象
streamMessageListenerContainer = StreamMessageListenerContainer.create(this.redisConnectionFactory, options);
//使用监听容器对象开始监听消费
receiveAutoAck(RedisKeyPrefixConst.OPERATE_LOG_CONSUME_GROUP, RedisKeyPrefixConst.OPERATE_LOG_CONSUME_NAME, RedisKeyPrefixConst.OPERATE_LOG_STREAM_KEY);
receiveAutoAck(RedisKeyPrefixConst.LOGIN_LOG_CONSUME_GROUP, RedisKeyPrefixConst.LOGIN_LOG_CONSUME_NAME, RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY);
//启动监听
streamMessageListenerContainer.start();
}
private void receiveAutoAck(String consumeGroup, String consumeName, String streamKey) {
streamMessageListenerContainer.receiveAutoAck(
Consumer.from(consumeGroup, consumeName),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), this.logStreamConsumer);
}
private void addConsumeGroup(String streamKey, String consumeGroup) {
if (redisUtils.hasKey(streamKey)) {
StreamInfo.XInfoGroups groups = redisUtils.groups(streamKey);
if (groups.isEmpty()) {
redisUtils.addGroup(streamKey, consumeGroup);
}
} else {
redisUtils.addGroup(streamKey, consumeGroup);
}
}
@Override
public void destroy() {
this.streamMessageListenerContainer.stop();
}
}
```
进行消费进行日志增加
----------
```
package com.hwd.campus.manage.biz.listener;
import cn.hutool.json.JSONUtil;
import com.hwd.campus.manage.biz.model.vo.LoginVo;
import com.hwd.campus.manage.biz.service.ILoginLogService;
import com.hwd.campus.manage.biz.service.IOperateLogService;
import com.hwd.campus.common.redis.constant.RedisKeyPrefixConst;
import com.hwd.campus.common.redis.utils.RedisUtils;
import com.hwd.campus.common.web.filter.model.OperateLogModel;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* @author
*/
@Component
@Slf4j
@AllArgsConstructor
public class LogStreamConsumer implements StreamListener<String, ObjectRecord<String, String>> {
private RedisUtils redisUtils;
private IOperateLogService manageOperateLogService;
private ILoginLogService manageLoginLogService;
@Override
public void onMessage(ObjectRecord<String, String> message) {
log.info("接受到来自redis的消息");
log.info(("message id " + message.getId().getValue()));
String stream = message.getStream();
log.info(("stream " + stream));
Object value = message.getValue();
log.info(("value " + value));
if (RedisKeyPrefixConst.OPERATE_LOG_STREAM_KEY.equals(stream)) {
OperateLogModel operateLogModel = JSONUtil.toBean(message.getValue(), OperateLogModel.class);
manageOperateLogService.addOperateLog(operateLogModel);
} else if (RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY.equals(stream)) {
LoginVo loginVo = JSONUtil.toBean(message.getValue(), LoginVo.class);
manageLoginLogService.addLoginLog(loginVo);
}
//消费完毕删除该条消息
redisUtils.streamDelete(Objects.requireNonNull(stream), message.getId().getValue());
}
}
```
可通过接口往stream里面set值
------------------
```
redisUtils.addStream(RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY, JSONUtil.toJsonStr(loginVo));
```
众生皆苦,唯有自渡!