irpas技术客

springboot+redis+sse+vue实现分布式消息发布/通知_多云几多_vue sse

网络投稿 8142

springboot+redis+sse+vue实现分布式消息发布/通知 一、需求说明二、架构选择三、代码实现1. sse集成sse服务类代码客户端链接控制器前端实现(vue)方法调用 2. redis实现订阅/发布监听类redisConfig配置消息发送

一、需求说明

需求是实现web端的小红点通知,因为后端是两台机子做负载,所以需要实现分布式消息订阅发布

这里没有用消息中间件(rabbitmq…)和websoket,因为相对项目来说,这俩个比较重,所以用了相对较轻的redis和sse,都是项目自带的

二、架构选择 redis(分布式发布订阅)sse (SseEmitter) 三、代码实现 1. sse集成 sse服务类代码

这里会话的key值存储可以不用这么复杂,我当时想着连接成功后可以直接将返回的sseEmitter扔到redis里去实现分布式,但是不行,序列化后取出来是发不了消息的,原因可能是存到redis里就相当于直接把连接扔了,哈哈

package com.smartvillage.framework.sse.serve; import cn.hutool.core.collection.CollectionUtil; import com.smartvillage.common.core.redis.RedisCache; import com.smartvillage.common.utils.spring.SpringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.stream.Collectors; /** * @author wangyj * @className SseEmitterServer * @description 消息推送服务类 * @date 22/11/9 */ public class SseEmitterServer { private static final Logger log = LoggerFactory.getLogger(SseEmitterServer.class); private static final String KEY_PREFIX = "SseEmitter_"; private static final String ONLINE_SESSION_COUNT = "OnlineSessionCount"; /** * 当前连接数 */ // private static AtomicInteger count = new AtomicInteger(0); /** * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面 */ private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); /** * 创建用户连接并返回 SseEmitter * * @param sessionId 用户ID * @return SseEmitter */ public static SseEmitter connect(String sessionId) { // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException SseEmitter sseEmitter = new SseEmitter(0L); // 注册回调 sseEmitter.onCompletion(completionCallBack(sessionId)); sseEmitter.onError(errorCallBack(sessionId)); sseEmitter.onTimeout(timeoutCallBack(sessionId)); // SpringUtils.getBean(RedisCache.class).setCacheObject(getCacheKey(sessionId), sseEmitter); // 数量+1 SpringUtils.getBean(RedisCache.class).incr(ONLINE_SESSION_COUNT,1); sseEmitterMap.put(getCacheKey(sessionId),sseEmitter); log.info("创建新的sse连接,当前会话:{}", sessionId); return sseEmitter; } /** * 给指定用户发送信息 -- 单播 */ public static void sendMsg(String userId, String message) { sendMessage(getCacheKey(userId),message); } /** * 给指定用户发送信息 */ public static void sendMessage(String cacheKey, String message) { if (sseEmitterMap.containsKey(cacheKey)) { // if (SpringUtils.getBean(RedisCache.class).hasKey(cacheKey)) { try { // SseEmitter sseEmitter = SpringUtils.getBean(RedisCache.class).getCacheObject(cacheKey); SseEmitter sseEmitter = sseEmitterMap.get(cacheKey); sseEmitter.send(message,MediaType.APPLICATION_JSON); log.info("用户[{}]推送成功:{}", cacheKey, message); } catch (IOException e) { log.error("用户[{}]推送异常:{}", cacheKey, e.getMessage()); removeUser(cacheKey); } } } /** * 向多人发布消息 -- 组播 * * @param groupId 开头标识 * @param message 消息内容 */ public static void groupSendMessage(String groupId, String message) { // Set<String> keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX + groupId + "*"); Set<String> keys = sseEmitterMap.keySet().stream().filter(k -> k.startsWith(KEY_PREFIX + groupId)).collect(Collectors.toSet()); if(CollectionUtil.isNotEmpty(keys)){ batchSendMessage(message,keys); } } /** * 群发所有人 -- 广播 */ public static void batchSendMessage(String message) { // Set<String> keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX + "*"); Set<String> keys = sseEmitterMap.keySet(); if(CollectionUtil.isNotEmpty(keys)){ batchSendMessage(message,keys); } } /** * 群发消息 */ public static void batchSendMessage(String message, Set<String> keys) { keys.forEach(key -> sendMessage(key, message)); } /** * 移除用户连接 */ public static void removeUser(String cacheKey) { // SpringUtils.getBean(RedisCache.class).deleteObject(cacheKey); sseEmitterMap.remove(cacheKey); // 数量-1 SpringUtils.getBean(RedisCache.class).decr(ONLINE_SESSION_COUNT,1); log.info("移除用户:{}", cacheKey); } /** * 获取当前连接信息 */ public static List<String> getIds() { Collection<String> keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX); return keys.stream().map(k -> k.replace(KEY_PREFIX, "")).collect(Collectors.toList()); } /** * 获取当前连接数量 */ public static int getUserCount() { return SpringUtils.getBean(RedisCache.class).getCacheObject(ONLINE_SESSION_COUNT); } private static Runnable completionCallBack(String userId) { return () -> { log.info("结束连接:{}", userId); removeUser(getCacheKey(userId)); }; } private static Runnable timeoutCallBack(String userId) { return () -> { log.info("连接超时:{}", userId); removeUser(getCacheKey(userId)); }; } private static Consumer<Throwable> errorCallBack(String userId) { return throwable -> { log.info("连接异常:{}", userId); removeUser(getCacheKey(userId)); }; } /** * 设置cache key * * @param configKey 参数键 * @return 缓存键key */ public static String getCacheKey(String configKey){ return KEY_PREFIX + configKey; } } 客户端链接控制器 /** * @author wangyj * @className AiWarningSseController * @description 警告消息订阅 * @date 22/11/10 */ @RestController @RequestMapping("/test") public class SseController { @Autowired RedisCache redisCache; /** * 客户端链接 * @return */ @GetMapping("/connect") public SseEmitter connect() { return SseEmitterServer.connect("test-key"); } /** * 消息推送 * @return */ @PostMapping("/post") public AjaxResult postMessage(String msg) { // ... 业务逻辑 // 推送消息 SseEmitterServer.sendMsg("test-key", msg)) return AjaxResult.success("推送成功"); } /** * 链接关闭 * @return */ @GetMapping("/close") public AjaxResult close() { SseEmitterServer.removeUser("test-key"); return AjaxResult.success(); } } 前端实现(vue)

这里使用了组件:vue-sse(自行安装哈)

方法调用 mounted() { // 组件挂载时订阅 this.subscribeWarnMsg(); }, beforeDestroy() { // 组件销毁时记得关链接释放资源 this.closeWarningMessage(); }, methods: { //... // 消息订阅 subscribeWarnMsg() { this.$sse .create({ // format: "json", // 注掉就能接受消息 polyfill: true, forcePolyfill: true, url: process.env.VUE_APP_BASE_API + "/test/connect", withCredentials: true, polyfillOptions: { // 超时时间,调长点,要不频繁重连 heartbeatTimeout: 10 * 60 * 1000, // 携带认证token headers: { Authorization: 'Bearer ' + getToken(), }, }, }) .on("message", (msg) => { console.log(msg) }) .on("error", (err) => console.error("Failed to parse or lost connection:", err) ) .connect() .catch((err) => console.error("Failed make initial connection:", err)); }, // 关闭订阅 closeMessage() { return request({ url: '/test/close', method: 'get', } ) }

至此sse封装完成!单节点的项目就可以正常用了~

2. redis实现订阅/发布 监听类 /** * @author wangyj * @className TestListener * @description redis listener * @date 22/11/17 */ @Component public class TestListener{ private static final Logger log = LoggerFactory.getLogger(TestListener.class); public void onMessage(String msg) { log.info(msg); JSONObject parseObject = JSON.parseObject(msg); Long deptId = parseObject.getLong("deptId"); // 组播 SseEmitterServer.groupSendMessage("deptId:" + deptId, msg); // 单播 SseEmitterServer.sendMsg("test-key", msg)); } } redisConfig配置 /** * redis配置 */ @Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport{ // ... 其他序列化等配置 @Bean // 这里要注入我们刚才写的监听者类 public MessageListenerAdapter TestListenerAdapter(TestListener receiver) { // 这个"onMessage"要和监听者类里的方法名对应,因为是反射注入的,默认是"handleMessage"?可以看下源码 return new MessageListenerAdapter(receiver,"onMessage"); } /*@Bean public MessageListenerAdapter listenerAdapter1(TestListener1 receiver) { return new MessageListenerAdapter(receiver,"onMessage"); } @Bean public MessageListenerAdapter listenerAdapter2(TestListener2 receiver) { return new MessageListenerAdapter(receiver,"onMessage"); }*/ /** * redis消息监听器容器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, // 这个玩意可以后面跟多个哈,名字匹配自动注入的,MessageListenerAdapter aiWarningListenerAdapter,MessageListenerAdapter listenerAdapter1,MessageListenerAdapter listenerAdapter2,当然,要有对应名字的bean,看上面注释掉的代码 MessageListenerAdapter testListenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了一个叫chat的通道 // container.addMessageListener(listenerAdapter1, new PatternTopic("chat")); container.addMessageListener(aiWarningListenerAdapter, new PatternTopic(RedisChannel.AI_WARNING)); return container; } } 消息发送 redisCache.convertAndSend(RedisChannel.AI_WARNING, warningLog);

结合上文SseController 里面消息推送代码:

public class SseController { @Autowired RedisCache redisCache; /** * 客户端链接 * @return */ @GetMapping("/connect") public SseEmitter connect() { return SseEmitterServer.connect("test-key"); } /** * 消息推送 * @return */ @PostMapping("/post") public AjaxResult postMessage(String msg) { // ... 业务逻辑 // 推送消息 //SseEmitterServer.sendMsg("test-key", msg)); // 先推到redis redisCache.convertAndSend(RedisChannel.AI_WARNING, warningLog); return AjaxResult.success("推送成功"); }

redisCache

@Component public class RedisCache { @Autowired public RedisTemplate redisTemplate; // ...其他方法 /** * 消息推送 * @param channel * @param message */ public void convertAndSend(String channel,Object message){ redisTemplate.convertAndSend(channel,message); } }

完活~

还有一种监听者配置方法,参考:

@Component public class TestListener implements MessageListener{ private static final Logger log = LoggerFactory.getLogger(TestListener.class); @Override public void onMessage(Message message, byte[] pattern) { // 订阅的频道名称 String channel = new String(message.getChannel()); // 消息体 String msg = new String(message.getBody()); } }

redisConfig

@Bean RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, TestListener testListener ) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); //订阅topic - subscribe container.addMessageListener(testListener ,new ChannelTopic("testChannel")); return container; }

注意:

多个实例在消费时,要注意消费时加锁,避免重复消费的情况nginx超时时长nginx iphashnginx配置 client_max_body_size 300m; #设置nginx能处理的最大请求主体大小。 client_body_buffer_size 128k; #请求主体的缓冲区大小。 proxy_connect_timeout 600; proxy_read_timeout 600; proxy_send_timeout 600; proxy_buffer_size 64k; proxy_buffers 4 32k; proxy_busy_buffers_size 64k; proxy_temp_file_write_size 64k; location /apis { rewrite ^.+apis/?(.*)$ /$1 break; include uwsgi_params; proxy_pass http://192.168.5.127:8088/; # 关键参数 proxy_buffering off; }

注意:

要配置代理超时时间不配置proxy_buffering off的话,会出现请求发出后,接口收到直接返回,无法保持长连接。 参考网上说明:proxy_buffering这个参数用来控制是否打开后端响应内容的缓冲区,如果这个设置为off,那么proxy_buffers和proxy_busy_buffers_size这两个指令将会失效

如有问题请不吝指正~


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Vue #SSE