irpas技术客

JavaEE:SpringCloud-使用Stream向RabbitMQ发送/接收消息_無_爲

网络投稿 472

一、使用Stream向RabbitMQ发送/接收消息:

1.导入stream和rabbitmq依赖包:

<dependencies> ? ? <!-- 导入Actuator依赖包 --> ? ? <dependency> ? ? ? ? <groupId>org.springframework.boot</groupId> ? ? ? ? <artifactId>spring-boot-starter-actuator</artifactId> ? ? </dependency> ? ? <dependency> ? ? ? ? <groupId>org.springframework.cloud</groupId> ? ? ? ? <artifactId>spring-cloud-starter-stream-rabbit</artifactId> ? ? </dependency> </dependencies>

2.application.yml中配置stream和rabbitmq:

(1)配置rmq服务器连接信息:

spring: ? application: ? ? name: stream-service ? rabbitmq: #配置rabbitmq服务器连接信息 ? ? host: 192.168.233.147 ? ? port: 5672 ? ? username: root ? ? password: root ? ... server: ? port: 10004 management: #开启actuator-endpoint ? security: ? ? enabled: false ?#关闭security ? endpoints: ? ? web: ? ? ? exposure: ? ? ? ? include: "*" ? endpoint: ? ? health: ? ? ? show-details: always

(2)配置stream发送/接收普通消息/分组消息/延迟消息:

spring: ? ... ? cloud: ? ? stream: ? ? ? instance-count: 2 ? #(2)参与消息分区的接收者实例总数为2个(消息分区配置) ? ? ? instance-index: 0 ? #(2)接收者实例初始索引为0(消息分区配置) ? ? ? rabbit: ?#(3)延迟消息配置(RabbitMQ要安装延迟队列插件) ? ? ? ? bindings: ? ? ? ? ? delayed-sender: ? ? ? ? ? ? producer: ? ? ? ? ? ? ? delayedExchange: true ?#开启消息延迟 ? ? ? bindings: ?#将发送者与接收者共同绑定到mall_exchange(exchange名称) ? ? ? ? #############################(1)普通消息配置 start############################# ? ? ? ? receiver: ?#消息接收配置,此处名称为Topic.receiver ? ? ? ? ? destination: topic_exchange ? #exchange名称 ? ? ? ? sender: ?#消息发送配置,此处名称为Topic.sender ? ? ? ? ? destination: topic_exchange ? #exchange名称 ? ? ? ? #############################(1)普通消息配置 end############################# ? ? ? ? #############################(2)分组消息配置 start############################# ? ? ? ? group-receiver: ?#分区消息接收配置,此处名称为GroupTopic.group-receiver ? ? ? ? ? destination: group_exchange ? #exchange名称 ? ? ? ? ? group: group1 ? #设置分组名 ? ? ? ? ? consumer: ? ? ? ? ? ? partitioned: true ?#打开消息分区功能 ? ? ? ? group-sender: ?#分区消息发送配置,此处名称为GroupTopic.group-sender ? ? ? ? ? destination: group_exchange ? #exchange名称 ? ? ? ? ? producer: ? ? ? ? ? ? partitionCount: 2 ? #2个分区 ? ? ? ? ? ? partitionKeyExpression: headers['instanceIndex'] ?#规定指定实例索引的接收者才能收到消息,发送时通过MessageBuilder.setHeader(key名, 索引值)设置接收者索引 ? ? ? ? #############################(2)分组消息配置 end############################# ? ? ? ? #############################(3)延迟消息配置 start############################# ? ? ? ? delayed-receiver: ?#分区消息接收配置,此处名称为DelayedTopic.delayed-receiver ? ? ? ? ? destination: delayed_exchange ? #exchange名称 ? ? ? ? delayed-sender: ?#分区消息发送配置,此处名称为DelayedTopic.delayed-sender ? ? ? ? ? destination: delayed_exchange ? #exchange名称 ? ? ? ? #############################(3)延迟消息配置 end#############################

3.创建自定义普通/分组/延迟消息Topic:

(1)普通消息Topic:

public interface Topic { //自定义普通消息Topic ? ? String INPUT = "receiver"; ?//自定义名 ? ? String OUTPUT = "sender"; ?//自定义名 ? ? @Input(INPUT) ? ? SubscribableChannel input(); ? ? @Output(OUTPUT) ? ? MessageChannel output(); }

(2)分组消息Topic:

public interface GroupTopic { //分组消息Topic ? ? String INPUT = "group-receiver"; ?//自定义名 ? ? String OUTPUT = "group-sender"; ?//自定义名 ? ? @Input(INPUT) ? ? SubscribableChannel input(); ? ? @Output(OUTPUT) ? ? MessageChannel output(); }

(3)延迟消息Topic:

public interface DelayedTopic { //延迟消息Topic ? ? String INPUT = "delayed-receiver"; ?//自定义名 ? ? String OUTPUT = "delayed-sender"; ?//自定义名 ? ? @Input(INPUT) ? ? SubscribableChannel input(); ? ? @Output(OUTPUT) ? ? MessageChannel output(); }

4.创建消息接收类接收普通/分组/延迟消息:

@EnableBinding(value = {Sink.class, Topic.class, GroupTopic.class, DelayedTopic.class}) ?//接收rmq消息 public class MsgReceive { ? ? @StreamListener(Sink.INPUT) ? //接收普通消息,Sink.INPUT为系统自带 ? ? public void onMsg0(String msg) { ? ? ? ? //此处接受到普通消息 ? ? } ? ? @StreamListener(Topic.INPUT) ? //接收普通消息,Topic.INPUT为自定义 ? ? public void onMsg(String msg) { ? ? ? ? //此处接受到普通消息 ? ? } ? ? @StreamListener(GroupTopic.INPUT) ? //接收分组消息,GroupTopic.INPUT为自定义 ? ? public void onGroupMsg(String msg) { // ?public void onGroupMsg(String msg, @Header("version") String version) { ?//接收带版本号的分组消息,可以根据版本号处理不同的逻辑 ? ? ? ? //此处接受到分组消息 ? ? } ? ? @StreamListener(DelayedTopic.INPUT) ? //接收延迟消息,DelayedTopic.INPUT为自定义 ? ? public void onDelayedMsg(String msg) { ? ? ? ? //此处接受到延迟消息 ? ? } }

5.测试发送消息:

@Controller public class MsgSendController { ? ? @Autowired ? ? private Topic topic; //消息发送 ? ? @Autowired ? ? private GroupTopic groupTopic; //分组消息发送 ? ? @Autowired ? ? private DelayedTopic delayedTopic; //延迟消息发送 ? ? @RequestMapping("/sendMsg") ? ? @ResponseBody ? ? public String sendMsg(String msg) { ? ? ? ? topic.output().send(MessageBuilder.withPayload(msg).build()); ?//发送消息到rmq ? ? ? ? return "发送普通消息"; ? ? } ? ? @RequestMapping("/sendGroupMsg") ? ? @ResponseBody ? ? public String sendGroupMsg(String instanceIndex, String msg) { ? ? ? ? groupTopic.output().send(MessageBuilder.withPayload(msg).setHeader("instanceIndex", instanceIndex).build()); ? ? ? ? //groupTopic.output().send(MessageBuilder.withPayload(msg).setHeader("version", "1.0.0").build()); ? //发送带版本号的消息 ? ? ? ? return "发送分组消息"; ? ? } ? ? @RequestMapping("/sendDelayedMsg") ? ? @ResponseBody ? ? public String sendDelayedMsg(String msg) { ? ? ? ? delayedTopic.output().send(MessageBuilder.withPayload(msg).setHeader("x-delay", 5000).build()); //通过x-delay头字段控制延迟,此处5毫秒 ? ? ? ? return "发送延迟消息"; ? ? } }

二、其他配置:

1.配置异常重试(application.yml中):

(1)单主机上异常重试:

spring: ? cloud: ? ? stream: ? ? ? bindings: ? ? ? ? receiver: ?#receiver为Topic.receiver ? ? ? ? ? consumer: ? ? ? ? ? ? maxAttempts: 3 ?#单机上异常重试:发生异常时,消息重复发送的最大次数

(2)多主机上异常重试:

spring: ? rabbitmq: ? ? listener: ? ? ? direct: ? ? ? ? default-requeue-rejected: true ? #开启多主机上异常重试(方式1):全局异常重试 ? cloud: ? ? stream: ? ? ? bindings: ? ? ? ? receiver: ?#receiver为Topic.receiver ? ? ? ? ? consumer: ? ? ? ? ? ? maxAttempts: 1 ?#多主机上异常重试时,强制单机异常重试为1次(也就是不重试) ? ? ? rabbit: ? ? ? ? bindings: ? ? ? ? ? receiver: ?#receiver为Topic.receiver ? ? ? ? ? ? consumer: ? ? ? ? ? ? ? requeueRejected: true ?#开启多主机上异常重试(方式2):仅对receiver异常重试

2.配置死信队列(application.yml中):

spring: ? cloud: ? ? stream: ? ? ? bindings: ? ? ? ? receiver: ?#receiver为Topic.receiver ? ? ? ? ? consumer: ? ? ? ? ? ? maxAttempts: 3 ?#异常重试次数 ? ? ? rabbit: ? ? ? ? bindings: ? ? ? ? ? receiver: ?#receiver为Topic.receiver ? ? ? ? ? ? consumer: ? ? ? ? ? ? ? autoBindDlq: true ? ? ?#开启死信队列,默认队列名为xxx.dlq

3.异常降级处理:

@EnableBinding(value = {...}) public class MsgReceive { ? ? ... ? ? @ServiceActivator(inputChannel = "group_exchange.group1.errors") ?//配置异常降级回调方法(异常重试超过最大次数后触发),inputChannel命名规则:destination.group.errors ? ? public void callback(Message<?> message) { //message为org.springframework.messaging.Message ? ? ? ? //处理异常降级逻辑 ? ? } }

?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #amplt #导入Actuator依赖包 #ampgt