irpas技术客

springCloud使用stream配置rabbitMq实现延时消息_奔跑的菜鸡

网络投稿 1550

先安装rabbitMq延时插件

参考我另一篇文章 https://blog.csdn.net/weixin_43944305/article/details/120828003

上依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>

消息通道

package com.fchan.springcloudstream.service; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface MyMessageChannel { String out = "out"; String in = "in"; @Output(out) MessageChannel out(); @Input(in) SubscribableChannel in(); } // 发送延迟消息 @PostMapping("/delayed") public String sendDelayedMessage(@RequestParam("body") String body, @RequestParam("seconds") Integer seconds) { Map<String,Object> message = new HashMap<>(); message.put("body", body); myMessageChannel.out().send( MessageBuilder.withPayload(message) .setHeader("x-delay", seconds * 1000) .build() ); log.info("发送延迟消息成功"); return "SUCCESS"; }

延时消息接收

package com.fchan.springcloudstream.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import java.util.Map; @Component @EnableBinding({MyMessageChannel.class}) public class MyConsumer { Logger log = LoggerFactory.getLogger(MyConsumer.class); @StreamListener(MyMessageChannel.in) public void input(Message<Map<String,Object>> message){ log.info("收到消息:{}", message.getPayload()); } }

yml配置

spring: rabbitmq: host: 110.40.181.73 port: 35672 username: root password: 10086 virtual-host: /fchan cloud: stream: rabbit: bindings: # 消费者开启延时队列支持 in: consumer: delayed-exchange: true # 生产者开启延时队列支持 out: producer: delayed-exchange: true bindings: in: # 指定消息所属exchange destination: test # 指定消费者分组,在多实例的时候必需指定,防止重复消费 group: myIn out: destination: test

在启动项目后登陆rabbitmq管理页面可以看到exchange创建成功 localhost:8080/delayed?body=1231&seconds=5 测试成功


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。