irpas技术客

Spring boot 配置整合RabbitMq_愤怒了Kirk_springboot整合rabbitmq配置

网络投稿 6599

Spring boot 配置整合RabbitMq

这篇文章是在使用rabbit时,自己创建的demo,一些简单的介绍 内含消息推送实例,消费实例,Direct、Fanout的使用;

首先是为什么使用RabbitMq,说下Rabbit的优缺点:

优点:

应用异步 将需要同步处理的并且耗时的操作由消息队列来进行一步操作,提高了程序的响应时间 应用解耦 系统的耦合度越高,容错率就越低,可维护行越低,Mq可以使应用间解耦,提升容错率和可维护性 流量削峰 根据系统的存储速度来定制每秒可接受数量,高并发下,数据堆积在Mq中,并发结束后,继续处理积压数据,缓解服务器压力,使得存储数据库不会崩溃;

缺点:

系统可用性降低 系统引入的外部依赖越多,系统越容易挂掉,本来只是A系统调用BCD三个系统接口就好,ABCD四个系统不报错整个系统会正常运行。引入了MQ之后,虽然ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。 系统复杂度提高 引入了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序? 一致性问题 A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的问题。

本人用的苹果系统,安装也不用多说,肯定是brew,简单,都懂。

brew install rabbitmq 以服务方式启动,启动后终端可以关闭,不影响服务运行 brew services start rabbitmq 进入安装目录 /usr/local/Cellar/rabbitmq/3.9.5/ 关闭rabbitmq服务:./rabbitmqctl stop 查看rabbitmq运行状态:./rabbitmqctl status 安装完成后: 在浏览器访问http://localhost:15672/ 用户名:guest 密码:guest

第一步,安装完成后, 开始创建Spring boot 项目,引入Rabbit的jar

<!-- 引入RabbitMq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

第二步,配置Rabbit,我这里选用的是创建个yml,在里面添加

spring: rabbitmq: #ip host: 127.0.0.1 #端口号 port: 5672 #虚拟host 可以不设置,使用server默认host,随你喜欢命名 virtual-host: kirkHost #账号 username: guest #密码 password: guest

第三步,配置完成后,就可以使用Rabbit了,来配置一个Rabbit的Confit配置类,里面设置一些Rabbit的交换机,队列信息,创建RabbitMqConfiguration.class

首先是 fanout 扇形交换机 package com.kirk.demo.RabbitMQ.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Rabbit 的配置类 */ @Configuration public class RabbitMqConfiguration { /** * 1:声明注册 fanout 模式的交换机 * @return FanoutExchange */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout_test_exchange",true,false); } /** * 2:声明队列 test.fanout.queue * * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 * return new Queue("TestDirectQueue",true,true,false); * * 一般设置一下队列的持久化就好,其余两个就是默认false * * @return Queue */ @Bean public Queue testQueue () { return new Queue("test.fanout.queue",true); } /** * 3:完成绑定关系(队列和交换机完成绑定关系) * @return Binding */ @Bean public Binding testBinding() { return BindingBuilder.bind(testQueue()).to(fanoutExchange()); } } 看下消息推送,生产者代码 package com.kirk.demo.RabbitMQ.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class RabbitTestService { private final static Logger log= LoggerFactory.getLogger(RabbitTestService.class); @Autowired private RabbitTemplate rabbitTemplate; /** * 模拟消息发送 */ public void sendMessage() { String messageId = String.valueOf(UUID.randomUUID()); log.info("消息id生成成功:"+messageId); //exchange:交换机名称 //routingKey:路由键,fanout扇形模式下不需要 //Object: 传入数据 rabbitTemplate.convertAndSend("fanout_test_exchange",null,messageId); } } 看下消息接收者,消费者代码 package com.kirk.demo.RabbitMQ.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * 消费者 * * @author Kirk */ @Service public class FanoutQueueService { private final static Logger log = LoggerFactory.getLogger(FanoutQueueService.class); /** * 消费者信息 * <p> * queues:队列名称 * concurrency:消费开启线程数(根据需求自己配置即可,单线程去掉) * * @param messageId 数据(对应传入队列时的消息类型) */ @RabbitListener(queues = {"test.fanout.queue"}, concurrency = "5") public void receiveMessage(String messageId) { log.info("test fanout--接收信息:->" + messageId); try { Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } log.info("test fanout--消费完成:->" + messageId); }

fanout的队列模式已经都配置好了,那么简单说下fanout的扇形模式,为什么叫扇形模式,

↙ 队列1 消息 → 扇形交换机 ← 队列2 ↖ 队列3

如果生成者发布了一条消息到指定交换机之后, 那么和交换机绑定的3个队列都会消费这条信息; fanout的具体使用场景就很多了,例如:

发送消息,多种不同模式通知(短信,邮箱,微信通知等)电商发送优惠券(同时发送不同种类的优惠券,满减、折扣、兑礼等)一条信息分开处理
再看下 direct 直连交换机 package com.kirk.demo.RabbitMQ.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Rabbit 的配置类 */ @Configuration public class RabbitMqConfiguration { /** * 1.声明注册 direct 模式的交换机 * @return DirectExchange */ @Bean public DirectExchange directExchange() { return new DirectExchange("direct_test_exchange",true,false); } /** * 2.声明队列 * * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 * return new Queue("TestDirectQueue",true,true,false); * * 一般设置一下队列的持久化就好,其余两个就是默认false * * @return Queue */ @Bean public Queue directTestQueue() { return new Queue("test.direct.queue",true); } /** * 3.绑定交换机 * @return Queue */ @Bean public Binding directTestBinding() { return BindingBuilder.bind(directTestQueue()).to(directExchange()).with("test"); } } 看下消息推送,生产者代码 package com.kirk.demo.RabbitMQ.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class RabbitTestService { private final static Logger log= LoggerFactory.getLogger(RabbitTestService.class); @Autowired private RabbitTemplate rabbitTemplate; /** * 模拟消息发送 */ public void sendMessage() { String messageId = String.valueOf(UUID.randomUUID()); log.info("消息id生成成功:"+messageId); //exchange:交换机名称 //routingKey:路由键,直连模式下可以指定对应Key,队列绑定交换机是添加的"test" //Object: 传入数据 rabbitTemplate.convertAndSend("direct_test_exchange","test",messageId); } } 看下消息接收者,消费者代码 package com.kirk.demo.RabbitMQ.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * 消费者 * * @author Kirk */ @Service public class FanoutQueueService { private final static Logger log = LoggerFactory.getLogger(FanoutQueueService.class); /** * 消费者信息 * <p> * queues:队列名称 * concurrency:消费开启线程数(根据需求自己配置即可,单线程去掉) * * @param messageId 数据(对应传入队列时的消息类型) */ @RabbitListener(queues = {"test.direct.queue"}, concurrency = "5") public void receiveMessage(String messageId) { log.info("test direct--接收信息:->" + messageId); try { Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } log.info("test direct--消费完成:->" + messageId); }

Direct 直连模式也配置好了,下面说下直连模式概念

直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的。

这样当一个交换机绑定多个队列时,就会被送到对应的队列去处理。


关于Topic 主题交换机

这两种模式没有用到,但是也简单的了解下吧:

Topic Exchange 主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。 简单地介绍下规则: *(星号) 用来表示一个单词 (必须出现的) #(井号) 用来表示任意数量(零个或多个)单词 通配的绑定键是跟队列进行绑定的,举个小例子 队列Q1 绑定键为 *.TT.* 队列Q2 绑定键为 TT.# 如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到; 如果一条消息携带的路由键为 TT.AA .BB,那么队列Q2将会收到;

主题交换机是非常强大的,为啥这么膨胀? 当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。 所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。

由于没有用到,所有没有写Demo

以上就是关于RabbitMq的简单配置

部分参考地址


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #适合新手的看的Spring #Boot #配置整合 #rabbitmq #案例详细介绍了Direct # #Fanout