irpas技术客

RabbitMQ使用方法最详细攻略_Liu_Shihao_rabbitmq使用方法

大大的周 7953

目录 一、简介二、安装部署2.1 yum方式安装2.2 docker方式安装 三、RabbitMQ架构消息投递消费流程(重要): 四、RabbitMQ通讯方式4.0 建立Maven项目4.1 HelloWord4.1.1 创建连接工具类4.1.2 生产消息4.1.3 监听消息 4.2 Work Queue4.2.1 一个生产者4.2.2 两个消费者轮询消费 4.3 Publish/Subscribe(FANOUT类型)4.3.1 生产者(创建FANOUT类型交换机)4.3.2 两个消费者一起消费 4.4 Routing (DIRECT类型)4.4.1 生产消息4.4.2 监听消息 4.5 Topic (Topic主题模式)4.5.1 生产者4.5.2 消费者 4.6 RPC4.6.1 client客户端4.6.2 server服务端 4.7 Headers 五、RabbitMQ整合SpringBoot5.1 构建SpringBoot项目5.2 声明交换机、队列、绑定5.3 发送消息5.4 监听消息 六、RabbitMQ保证消息可靠性6.1 保证消息到达交换机6.2 保证消息路由到队列6.3 保证队列可以持久化消息完整消息生产者代码(确保消息到达队列)总结123实现生产者代码6.4 保证消费者可以正常消费消息6.5 使用SpringBoot实现消息可靠完整SpringBoot确保消息到达队列生产者代码 七、死信队列&延时交换机7.0 构造死信队列7.1 消息被拒绝或者Nack7.2 消息过期7.3 消息超过队列最大长度7.4 延迟交换机(RabbitMQ Plugins)7.5 声明延时交换机7.6 发送延时消息 八、集群高可用

一、简介

本文内容主要包括:

RabbitMQ的安装部署AMQP架构RabbitMQ的通信方式RabbitMQ整合SpringBootRabbitMQ保证消息可靠性RabbitMQ死信队列和延时交换机RabbitMQ集群高可用

源码已上传Github仓库:https://github.com/Liu-Shihao/rabbitmq-java-api

同步Gitee仓库:https://gitee.com/L1692312138/rabbitmq

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,Erlang是一种通用的面向并发的编程语言,所以RabbitMQ的性能特别快。RabbitMQ 支持海量的插件实现一些特殊的功能,例如:延时交换机等。 二、安装部署 2.1 yum方式安装

因为 RabbitMQ 需要 erlang 环境的支持,所以必须先安装 erlang 。

# 安装 erlang 对应的 yum repo curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash # 安装 erlang 环境 yum install erlang-22.3.3-1.el7.x86_64 # 根据提示 再次执行如下命令即可 yum load-transaction /tmp/yum_save_tx.2020-05-14.22-21.n0cwzm.yumtx # 测试 erlang 是否安装成功 erl # 安装 rabbitmq 对应的 yum repo curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash # 安装 rabbitmq yum install rabbitmq-server-3.8.3-1.el7.noarch # 设置开机启动 chkconfig rabbitmq-server on # 启动服务 systemctl start rabbitmq-server.service # 开启WEB可视化管理插件 rabbitmq-plugins enable rabbitmq_management # 添加用户 rabbitmqctl add_user 用户名 密码 rabbitmqctl set_user_tags 用户名 administrator # 访问可视化管理界面 浏览器输入: 你的服务器IP:15672 2.2 docker方式安装

国内 Docker 镜像网站: https://hub.daocloud.io/ 需要安装Docker环境。 准备docker-compose.yml文件

version: "3.1" services: rabbitmq: image: daocloud.io/library/rabbitmq:3.8.5 container_name: rabbitmq restart: always volumes: - ./data/:/var/lib/rabbitmq/ ports: - 5672:5672 - 15672:15672 # 在Linux内部执行: curl localhost:5672 出现AMQP 安装成功 # 开启可视化界面 # 进入容器内部 docker exec -it rabbitmq bash # 进入 /opt/rabbitmq ,找到 sbin 和 plugins 文件夹 # 在plugins 目录下会有rabbitmq_managemengt插件,然后在哎sbin目录下执行 ./rabbitmq-plugins enable rabbitmq_managemeng # 访问可视化管理界面 浏览器输入: 你的服务器IP:15672 用户名和密码 都是guest

三、RabbitMQ架构

AMQP协议:

RabbitMQ完整架构:

消息投递消费流程(重要): Publisher (生产者)与 Virtual Host 建立连接 Connection;Publisher(生产者)与 Exchange (交换机)建立通道 Channel;Exchange (交换机)通过 routes (路由) 规则 将消息路由到某一个或者多个Queue(队列)中;Consumer(消费者)与 Virtual Host 建立连接 Connection ;Consumer(消费者)与 Queue(队列)建立通道 Channel;Consumer(消费者)拿到 Queue(队列)的消息进行消费; 四、RabbitMQ通讯方式

官网文档:https://·/getstarted.html

GetStart文档:https://·/getstarted.html

4.0 建立Maven项目

引入RabbitMQ客户端和Junit单元测试的依赖:

<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> </dependency> </dependencies> 4.1 HelloWord

一个生产者、一个消费者、默认交换机、创建一个队列,路由Key默认为队列名。 使用默认交换机就是"",空字符串 ,默认路由Key就是队列名。 注意:生产者和消费者建议都声明队列

4.1.1 创建连接工具类 package com.lsh; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author :LiuShihao * @date :Created in 2022/3/14 12:22 下午 * @desc :RabbitMQ 连接工具类 * 获取RabbitMQ连接Connection */ public class RabbitMQConnectionUtil { public static final String RABBIT_HOST = "172.16.98.100"; public static final int RABBIT_PORT = 5672; public static final String RABBIT_USERNAME = "admin"; public static final String RABBIT_PWD = "admin"; public static final String RABBIT_VIRTUAL_HOST = "/"; /** * 获取连接对象 * @return * @throws Exception */ public static Connection getConnection () throws Exception{ //1. 创建Connection工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2. 设置RabbitMQ的连接信息 connectionFactory.setHost(RABBIT_HOST); connectionFactory.setPort(RABBIT_PORT); connectionFactory.setUsername(RABBIT_USERNAME); connectionFactory.setPassword(RABBIT_PWD); connectionFactory.setVirtualHost(RABBIT_VIRTUAL_HOST); //3. 返回连接对象 Connection connection = connectionFactory.newConnection(); return connection; } } 4.1.2 生产消息

构造Queue队列参数: name 队列名 durable 是否持久化(如果我们声明一个持久队列,则为 true(该队列将在服务器重新启动后继续存在)) exclusive 是否声明一个独占队列 只允许一个监听者 autoDelete 如果服务器在不再使用队列时应该删除队列 arguments 用于声明队列的参数

package com.lsh; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; /** * @author :LiuShihao * @date :Created in 2022/3/14 12:29 下午 * @desc :生产消息 */ public class Publisher { public static final String QUEUE_NAME = "hello"; /** * 发送消息 */ @Test public void publisher() throws Exception { // 1.获取连接对象 Connection connection = RabbitMQConnectionUtil.getConnection(); //2.构建Channel Channel channel = connection.createChannel(); //3.构建队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.发送消息 String message = "Hello World!"; //默认交换机 "" ; 默认路由为队列名 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送成功!"); //read方法阻塞,查看WEB可视化界面的客户端连接数 System.in.read(); } } 4.1.3 监听消息 package com.lsh; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; /** * @author :LiuShihao * @date :Created in 2022/3/14 2:07 下午 * @desc :监听队列 进行消费 */ public class Consumer { public static final String QUEUE_NAME = "hello"; /** * 发送消息 */ @Test public void publisher() throws Exception { // 1.获取连接对象 Connection connection = RabbitMQConnectionUtil.getConnection(); //2.构建Channel Channel channel = connection.createChannel(); //3.构建队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.监听队列 DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者获取到消息:" + new String(body, "UTF-8")); } }; // channel.basicConsume(QUEUE_NAME,true,callback); System.out.println("开始监听队列"); System.in.read(); } } 4.2 Work Queue

一个生产者、两个消费者、一个队列、默认交换机、默认路由Key

4.2.1 一个生产者

生产者和Hello World的形式是一样的,都是将消息推送到默认交换机。

package com.lsh.work; import com.lsh.RabbitMQConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; /** * @author :LiuShihao * @date :Created in 2022/3/14 12:29 下午 * @desc :生产消息 */ public class Publisher { public static final String QUEUE_NAME = "work"; /** * 发送消息 */ @Test public void publisher() throws Exception { // 1.获取连接对象 Connection connection = RabbitMQConnectionUtil.getConnection(); //2.构建Channel Channel channel = connection.createChannel(); //3.构建队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.发送消息 for (int i = 0; i < 10; i++) { //默认交换机 "" ; 默认路由为队列名 String message = "Work Queue :"+i; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); } System.out.println("消息发送成功!"); //read方法阻塞,查看WEB可视化界面的客户端连接数 System.in.read(); } }

4.2.2 两个消费者轮询消费

让消费者关闭自动ack,手动确认ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息

package com.lsh.b_work; import com.lsh.RabbitMQConnectionUtil; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; /** * @author :LiuShihao * @date :Created in 2022/3/14 2:07 下午 * @desc :设置两个消费者进行监听 * 本来正常两个消费者是通过轮询方式进行消息的消费的 * 如果1号消费者消费需要100毫秒 ; 2号消费者消费需要1000毫秒,这样会影响消息消费的效率 * 如果需要让消费者尽可能的消费多的消息,则需要: * 1.消费者关闭自动ack,开启手动ack确认, * 2.设置消息的流控 * 最终实现消费者可以尽可能去多消费消息 */ public class TwoConsumer { public static final String QUEUE_NAME = "work"; @Test public void consumer01() throws Exception { // 1.获取连接对象 Connection connection = RabbitMQConnectionUtil.getConnection(); //2.构建Channel final Channel channel = connection.createChannel(); //3.构建队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //3.5 设置消息的流控 指定消费者一次拿几个消息 channel.basicQos(1); //4.监听队列 DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者01获取到消息:" + new String(body, "UTF-8")); //手动确认消息 第二个参数false:是否批量操作 channel.basicAck(envelope.getDeliveryTag(),false); } }; //关闭消息自动确认 channel.basicConsume(QUEUE_NAME,false,callback); System.out.println("开始监听队列"); System.in.read(); } @Test public void consumer02() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者02获取到消息:" + new String(body, "UTF-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME,false,callback); System.out.println("开始监听队列"); System.in.read(); } }

4.3 Publish/Subscribe(FANOUT类型)

创建交换机为FANOUT类型,(发布/订阅)(分裂模式) 一个生产者、一个FANOUT类型交换机,两个队列,两个消费者(消息被送到达两个队列) 交换机和队列直接绑定,不需要routingKey。

4.3.1 生产者(创建FANOUT类型交换机) package com.lsh.c_pubsub; import com.lsh.RabbitMQConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; /** * @author :LiuShihao * @date :Created in 2022/3/14 9:29 下午 * @desc :发布订阅模式 publish/subscribe * 自行构建交换机并绑定指定队列(FANOUT类型) * FANOUT类型交换机Exchange与队列Queue是直接绑定,不需要routingKey */ public class FanOutExchangePublisher { //交换机名称 public static final String EXCHANGE_NAME = "pubsub"; //队列1 public static final String QUEUE_NAME2 = "subscribe02"; //队列2 public static final String QUEUE_NAME1 = "subscribe01"; @Test public void publish() throws Exception{ //1.获得链接对象 Connection connection = RabbitMQConnectionUtil.getConnection(); //2.构建channel Channel channel = connection.createChannel(); //3.构建交换机 指定交换机类型为FANOUT // 交换机类型:( DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //4.构建队列(队列名,是否持久,是否排他,是否自动删除,参数) channel.queueDeclare(QUEUE_NAME1,false,false,false,null); channel.queueDeclare(QUEUE_NAME2,false,false,false,null); //5.绑定交换机和队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定,所以routingKey写和不写都是一样的 channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,""); channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,""); //6.发送消息到交换机 此处的routingKey没有用 channel.basicPublish(EXCHANGE_NAME,"",null,"Publish/Subscribe".getBytes()); System.out.println("消息发送成功!"); } } 4.3.2 两个消费者一起消费 package com.lsh.c_pubsub; import com.lsh.RabbitMQConnectionUtil; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; /** * @author :LiuShihao * @date :Created in 2022/3/14 2:07 下午 * @desc :设置两个消费者进行监听 * 本来正常两个消费者是通过轮询方式进行消息的消费的 * 如果1号消费者消费需要100毫秒 ; 2号消费者消费需要1000毫秒,这样会影响消息消费的效率 * 如果需要让消费者尽可能的消费多的消息,则需要: * 1.消费者关闭自动ack,开启手动ack确认, * 2.设置消息的流控 * 最终实现消费者可以尽可能去多消费消息 */ public class TwoConsumer { //队列1 public static final String QUEUE_NAME2 = "subscribe02"; //队列2 public static final String QUEUE_NAME1 = "subscribe01"; @Test public void consumer01() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME1, false, false, false, null); channel.basicQos(1); DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者01获取到消息:" + new String(body, "UTF-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME1,false,callback); System.out.println("开始监听队列"); System.in.read(); } @Test public void consumer02() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME2, false, false, false, null); channel.basicQos(1); DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者02获取到消息:" + new String(body, "UTF-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME2,false,callback); System.out.println("开始监听队列"); System.in.read(); } } 4.4 Routing (DIRECT类型)

创建交换机为DIRECT类型,直接类型

4.4.1 生产消息

在绑定Exchange和Queue时,需要指定好routingKey,同时在发送消息时,也指定routingKey,只有routingKey一致时,才会把指定的消息路由到 指定的Queue。注意:与FANOUT类型交换机不同(FANOUT类型交换机直接绑定队列,不需要routingKey),但是DIRECT类型交换机必须绑定routingKey才能路由到对应的队列。

package com.lsh.d_routing; import com.lsh.RabbitMQConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; /** * @author :LiuShihao * @date :Created in 2022/3/14 9:50 下午 * @desc :生产者:在绑定Exchange和Queue时,需要指定好routingKey,同时在发送消息时,也指定routingKey,只有routingKey一致时,才会把指定的消息路由到 指定的Queue */ public class DirectExchangePublisher { //交换机名称 public static final String EXCHANGE_NAME = "routing"; //队列1 public static final String QUEUE_NAME1 = "routing01"; //队列2 public static final String QUEUE_NAME2 = "routing02"; @Test public void publish() throws Exception{ Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //创建交换机 指定DIRECT直接类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //创建队列 channel.queueDeclare(QUEUE_NAME1,false,false,false,null); channel.queueDeclare(QUEUE_NAME2,false,false,false,null); //绑定交换机和队列并指定路由KEY channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE"); // 将两个队列和一个交换机绑定 channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK"); channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN"); //发送消息到交换机 //此消息会到达队列1 channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"橙子".getBytes()); //此消息会到达队列2 channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"小黑狗".getBytes()); //此消息没有对应的routingKey,所以会被丢弃 channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔".getBytes()); System.out.println("消息发送成功!"); } }

4.4.2 监听消息

两个消费者同4.3.2一样

package com.lsh.d_direct; import com.lsh.RabbitMQConnectionUtil; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; /** * @author :LiuShihao * @date :Created in 2022/3/14 2:07 下午 * @desc : */ public class TwoConsumer { public static final String QUEUE_NAME1 = "routing01"; public static final String QUEUE_NAME2 = "routing02"; @Test public void consumer01() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME1, false, false, false, null); channel.basicQos(1); DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者01获取到消息:" + new String(body, "UTF-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME1,false,callback); System.out.println("开始监听队列"); System.in.read(); } @Test public void consumer02() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME2, false, false, false, null); channel.basicQos(1); DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者02获取到消息:" + new String(body, "UTF-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME2,false,callback); System.out.println("开始监听队列"); System.in.read(); } }

4.5 Topic (Topic主题模式)

4.5.1 生产者

创建Topic类型交换机TOPIC类型可以编写带有特殊意义的routingKey的绑定方式 需要以aaa.bbb.ccc…方式编写routingkey , 其中有两个特殊字符:*(相当于占位符),#(相当通配符)

package com.lsh.e_topic; import com.lsh.RabbitMQConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; /** * @author :LiuShihao * @date :Created in 2022/3/14 10:01 下午 * @desc :主题模式 创建Topic类型交换机 * TOPIC类型可以编写带有特殊意义的routingKey的绑定方式 * 需要以aaa.bbb.ccc..方式编写routingkey ,其中有两个特殊字符:*(相当于占位符),#(相当通配符) */ public class TopicExchangePublisher { //交换机名称 public static final String EXCHANGE_NAME = "topic"; //队列1 public static final String QUEUE_NAME1 = "TopicQueue01"; //队列2 public static final String QUEUE_NAME2 = "TopicQueue02"; //队列2 public static final String QUEUE_NAME3 = "TopicQueue03"; @Test public void publish() throws Exception{ // 1、获取连接对象 Connection connection = RabbitMQConnectionUtil.getConnection(); // 2.获取通道 Channel channel = connection.createChannel(); //3.创建交换机 指定Topic主题类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //4.创建队列 channel.queueDeclare(QUEUE_NAME1,false,false,false,null); channel.queueDeclare(QUEUE_NAME2,false,false,false,null); channel.queueDeclare(QUEUE_NAME3,false,false,false,null); //5.绑定交换机和队列并指定路由KEY // TOPIC类型的交换机在和队列绑定时,需要以aaa.bbb.ccc..方式编写routingkey // 其中有两个特殊字符:*(相当于占位符),#(相当通配符) // 一个队列可以绑定多个路由规则 channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*"); channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind(QUEUE_NAME3,EXCHANGE_NAME,"lazy.#"); //6.发送消息到交换机 //此路由Key符合"*.orange.*" 和 "*.*.rabbit" ,所以或到达队列1 和2 channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙兔子".getBytes()); //此路由Key符合 "*.*.rabbit" ,所以或到达队列2 channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes()); //此路由Key符合 "lazy.#" ,所以或到达队列3 channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog",null,"懒狗狗".getBytes()); System.out.println("消息发送成功!"); } } 4.5.2 消费者

消费者代码无更改,略

4.6 RPC

RabbitMQ这种RPC模式一般使用的不多。 因为两个服务在交互时,可以尽量做到Client和Server的解耦,通过RabbitMQ进行解耦操作 需要让Client发送消息时,携带两个属性:

replyTo告知Server将相应信息放到哪个队列correlationId告知Server发送相应消息时,需要携带位置标示来告知Client响应的信息 4.6.1 client客户端 package com.lsh.f_rpc; import com.lsh.RabbitMQConnectionUtil; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; import java.util.UUID; /** * @author :LiuShihao * @date :Created in 2022/3/15 4:14 下午 * @desc :RabbitMQ RPC模式 Client端代码 * client:向 rpc_publisher 队列发送请求消息,并监听 rpc_consumer 响应队列的消息 * server:监听rpc_publisher队列的消息,并向rpc_consumer发送响应消息 */ public class RpcPublisher { //client端发出消息 队列 public static final String QUEUE_PUBLISHER = "rpc_publisher"; //server端发出响应 队列 public static final String QUEUE_CONSUMER = "rpc_consumer"; @Test public void publisher()throws Exception{ // 1、获取连接对象 Connection connection = RabbitMQConnectionUtil.getConnection(); // 2.获取通道 final Channel channel = connection.createChannel(); //3. 构建队列 channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null); channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null); //4. 发布消息 String message = "Hello RPC!"; final String uuid = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties() .builder() .replyTo(QUEUE_CONSUMER) //设置监听队列(即Server端的响应消息) .correlationId(uuid) //设置UUID .build(); channel.basicPublish("",QUEUE_PUBLISHER,props,message.getBytes()); System.out.println("消息发送成功!"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取唯一标识ID String id = properties.getCorrelationId(); if (id != null && id.equals(uuid)){ //说明是我们发送的请求消息 System.out.println("接收到服务端响应:"+new String(body, "UTF-8")); } //手动ACK确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //监听响应队列 channel.basicConsume(QUEUE_CONSUMER,false,consumer); System.in.read(); } } 4.6.2 server服务端 package com.lsh.f_rpc; import com.lsh.RabbitMQConnectionUtil; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; import java.util.Date; /** * @author :LiuShihao * @date :Created in 2022/3/15 4:45 下午 * @desc :RabbitMQ RPC模式 Server端代码 * client:向 rpc_publisher 队列发送请求消息,并监听 rpc_consumer 响应队列的消息 * server:监听rpc_publisher队列的消息,并向rpc_consumer发送响应消息 */ public class RpcConsumer { //client端发出消息 队列 public static final String QUEUE_PUBLISHER = "rpc_publisher"; //server端发出响应 队列 public static final String QUEUE_CONSUMER = "rpc_consumer"; @Test public void consumer01() throws Exception { // 1.获得链接对象 Connection connection = RabbitMQConnectionUtil.getConnection(); // 2.获得channel final Channel channel = connection.createChannel(); //3.声明队列 channel.queueDeclare(QUEUE_PUBLISHER, false, false, false, null); channel.queueDeclare(QUEUE_CONSUMER, false, false, false, null); //4. 监听消息 DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("服务端获取到消息:" + new String(body, "UTF-8")); String resp = new Date()+":获取到了client发出的请求,这里是响应的信息"; //获取响应队列 String respQueueName = properties.getReplyTo(); //获取UUID String uuid = properties.getCorrelationId(); AMQP.BasicProperties props = new AMQP.BasicProperties() .builder() .correlationId(uuid) .build(); //将响应信息发送到响应队列 channel.basicPublish("",respQueueName,props,resp.getBytes()); //手动确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_PUBLISHER,false,callback); System.out.println("开始监听队列"); System.in.read(); } } 4.7 Headers

Headers类型交换机,(这个用的不多,与Topic类型相似)

/** * @author :LiuShihao * @date :Created in 2022/3/17 4:14 下午 * @desc :Headers类型交换机 */ public class HeadersPublisher { public static final String EXCHANGE_NAME = "headers-exchange"; public static final String QUEUE_NAME = "headers-queue"; @Test public void publisher() throws Exception { // 1.获取连接对象 Connection connection = RabbitMQConnectionUtil.getConnection(); //2.构建Channel Channel channel = connection.createChannel(); //3.构建Headers类型交换机,创建队列并绑定 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS); Map<String, Object> arguments = new HashMap<>(); //x-match 设置为all:表示所有条件都满足才能路由,any:表示有一个条件满足就可以路由 arguments.put("x-match","all"); // arguments.put("x-match","any"); arguments.put("name","jack"); arguments.put("age","23"); channel.queueDeclare(QUEUE_NAME,true,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"",arguments); HashMap<String, Object> headers = new HashMap<>(); headers.put("name","jack"); headers.put("age","23"); //4.发送消息 String msg = "这是Headers类型消息"; AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder() .headers(headers) .build(); channel.basicPublish(EXCHANGE_NAME,"",properties,msg.getBytes()); System.out.println("消息已发送"); System.in.read(); } } 五、RabbitMQ整合SpringBoot 5.1 构建SpringBoot项目 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> </parent> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

配置application.yml文件:

spring: rabbitmq: host: 172.16.98.100 port: 5672 username: admin password: admin virtual-host: /

5.2 声明交换机、队列、绑定

在SpringBoot项目中,通过Configuration配置类来声明队列和交换机:

Exchange:在SpringBoot项目中,直接通过ExchangeBuilder来构造交换机Queue:声明队列:在SpringBoot中,通过QueueBuilder.durable(队列名)来构造队列Binding:声明绑定:在SpringBoot项目中,通过BindingBuilder.bind(队列).to(交换机).with(路由Key)构造绑定 package com.lsh.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author :LiuShihao * @date :Created in 2022/3/15 11:36 下午 * @desc : */ @Configuration public class RabbitMQConfig { /**Topic类型交换机*/ public static final String TOPIC_EXCHANGE_NAME = "boot-exchange"; /**Queue队列名*/ public static final String QUEUE_NAME = "boot-queue"; /**RoutingKey 路由Key*/ public static final String ROUTING_KEY = "*.black.*"; /** * 声明交换机: 在SpringBoot项目中,直接通过ExchangeBuilder来构造交换机 * @return org.springframework.amqp.core.Exchange */ @Bean public Exchange exchange(){ // => channel.DeclareExchange Exchange exchange = ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).build(); return exchange; } /** * 声明队列:在SpringBoot中,通过QueueBuilder.durable(队列名)来构造队列 * @return */ @Bean public Queue queue(){ Queue queue = QueueBuilder.durable(QUEUE_NAME).build(); return queue; } /** * 声明绑定:在SpringBoot项目中,通过BindingBuilder.bind(队列).to(交换机).with(路由Key)构造绑定 * @param exchange 交换机 * @param queue 队列 * @return */ @Bean public Binding binding(Exchange exchange,Queue queue){ Binding binding = BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs(); return binding; } }

5.3 发送消息

在SpringBoot项目中通过RabbitTemplate对象调用RabbitMQ API

package com.lsh; import com.lsh.config.RabbitMQConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * @author :LiuShihao * @date :Created in 2022/3/15 11:58 下午 * @desc : * 在SpringBoot项目中,通过rabbitTemplate.convertAndSend()生产消息 */ @SpringBootTest @RunWith(SpringRunner.class) public class SendMQTest { @Autowired RabbitTemplate rabbitTemplate; @Test public void send(){ //交换机、路由Key、消息内容 rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME,"little.black.rabbit","小黑兔"); System.out.println("消息已发送"); } /** * 生产消息 */ @Test public void sendAndMsgProperties(){ //交换机、路由Key、消息内容、MessageProperties(传递Msg信息:包括CorrelationId、ReplyTo等) rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME, "little.black.rabbit", "小黑兔", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties messageProperties = message.getMessageProperties(); //设置唯一标识 messageProperties.setCorrelationId("123"); //设置响应队列 // messageProperties.setReplyTo(); return message; } }); System.out.println("消息已发送"); } } 5.4 监听消息

在SpringBoot项目中监听消息,通过@RabbitListener(queues = “队列名”) 注解监听队列

package com.lsh.springboot.listener; import com.lsh.springboot.config.RabbitMQConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author :LiuShihao * @date :Created in 2022/3/16 2:21 下午 * @desc :在SpringBoot项目中监听消息,通过@RabbitListener(queues = "队列名") 注解监听队列 * 在SpringBoot项目中, * 如果要关闭自动ack需要在application.yml文件中设置 * spring.rabbitmq.listener.simple.acknowledge-mode为manual * */ @Component public class ConsumeListener { /** * * @param msg 队列的消息 * @param channel * @param message 包含消息的各种信息,如msg、DeliveryTag、CorrelationId、ReplyTo等信息 * @throws IOException */ @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME) public void consumer(String msg, Channel channel, Message message) throws IOException { System.out.println("队列的消息:"+msg); String correlationId = message.getMessageProperties().getCorrelationId(); System.out.println("唯一标识:"+correlationId); //手动ack channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }

六、RabbitMQ保证消息可靠性

6.1 保证消息到达交换机

Confirm机制 可以通过Confirm效果保证消息一定送达到Exchange。

//4.开启confirms channel.confirmSelect(); //5.设置confirms的异步回调 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { //消息成功发送到交换机 success System.out.println("消息成功发送到交换机!"); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //消息未成功发送到交换机 fail System.out.println("消息未成功发送到交换机!"); } }); 6.2 保证消息路由到队列

Return机制 通过Return机制保证消息到达队列(注意:只有消息没有成功到达队列时才会触发回调函数)

//6.设置Return回调,确认消息是否到达队列,需要在发送消息时,设置mandatory参数为true开启Return机制 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { //只有消息没有到达指定队列时,才会触发此函数(例如RoutingKey不对导致消息没有投递到对应队列,就会触发该回调函数) System.out.println("消息没有到达指定队列!"); } }); //8.发送消息 注意:此处需要设置mandatory参数为true,才能开启Return机制 //参数:交换机、路由Key、mandatory、指定参数、消息 channel.basicPublish("","confirms",true,pop,message.getBytes()); 6.3 保证队列可以持久化消息

设置deliveryMode为2表示开启消息持久化,MQ重启后消息不会消失 并且队列同样需要设置持久化。

//7.开启消息持久化,如果没有开启消息持久化。如果MQ重启,则消息会丢失 AMQP.BasicProperties pop = new AMQP.BasicProperties() .builder() //设置deliveryMode为2表示开启消息持久化,MQ重启后消息不会消失 .deliveryMode(2) .build(); # 重启RabbitMQ服务 yum 部署方式 systemctl restart rabbitmq-server.service # Docker docker restart rabbitmq

当我们没有开启队列持久化和消息持久化时,如果队列中有消息未消费,重启RabbitMQ服务,则重启后的RabbitMQ服务的队列中原来的消息被丢失。 而当我们开启了队列持久化和消息持久化之后,重启RabbitMQ服务后,消息不会丢失。

完整消息生产者代码(确保消息到达队列)总结123实现生产者代码 package com.lsh.g_confirms; import com.lsh.RabbitMQConnectionUtil; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; /** * @author :LiuShihao * @date :Created in 2022/3/16 2:49 下午 * @desc :开启消息确认机制 * 1. 确保消息到达交换机 confirms机制 * 1.开启confirms: channel.confirmSelect(); * 2.增加异步回调:channel.addConfirmListener(); * 2.确保消息从交换机路由到达队列 return机制 */ public class ConfirmsPublisher { public static final String QUEUE_NAME = "confirms"; @Test public void publisher() throws Exception { // 1.获取连接对象 Connection connection = RabbitMQConnectionUtil.getConnection(); //2.构建Channel Channel channel = connection.createChannel(); //3.构建队列 注意此处的durable参数只是控制队列持久化,并不能控制消息持久化 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //4.开启confirms channel.confirmSelect(); //5.设置confirms的异步回调 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { //消息成功发送到交换机 success System.out.println("消息成功发送到交换机!"); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //消息未成功发送到交换机 fail System.out.println("消息未成功发送到交换机!"); } }); //6.设置Return回调,确认消息是否到达队列,需要在发送消息时,设置mandatory参数为true开启Return机制 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { //只有消息没有到达指定队列时,才会触发此函数(例如RoutingKey不对导致消息没有投递到对应队列,就会触发该回调函数) System.out.println("消息没有到达指定队列!"); } }); //7.开启消息持久化,如果没有开启消息持久化。如果MQ重启,则消息会丢失 AMQP.BasicProperties pop = new AMQP.BasicProperties() .builder() //设置deliveryMode为2表示开启消息持久化,MQ重启后消息不会消失 .deliveryMode(2) .build(); String message = "Confirms Messaage!"; //8.发送消息 注意:此处需要设置mandatory参数为true,才能开启Return机制 channel.basicPublish("","confirms",true,pop,message.getBytes()); System.out.println("消息发送成功!"); //read方法阻塞,查看WEB可视化界面的客户端连接数 System.in.read(); } } 6.4 保证消费者可以正常消费消息

通过手动ACK确保业务代码在执行完成后再执行消息确认

package com.lsh.g_confirms; import com.lsh.RabbitMQConnectionUtil; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; /** * @author :LiuShihao * @date :Created in 2022/3/16 3:41 下午 * @desc :监听队列消息,关闭自动ack,手动ack */ public class ConfirmsConsumer { public static final String QUEUE_NAME = "confirms"; @Test public void consumer() throws Exception { //1.获得连接对象 Connection connection = RabbitMQConnectionUtil.getConnection(); //2.获得Channel final Channel channel = connection.createChannel(); //3.声明队列,持久化队列 durable为true channel.queueDeclare(QUEUE_NAME, true, false, false, null); //4.设置CallBack函数 DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("监听到队列消息:" + new String(body, "UTF-8")); //手动ack channel.basicAck(envelope.getDeliveryTag(),false); } }; //5.监听队列,关闭自动ack channel.basicConsume(QUEUE_NAME,false,callback); System.out.println("开始监听队列"); System.in.read(); } } 6.5 使用SpringBoot实现消息可靠 在application.yml配置文件中通过配置spring.rabbitmq.publisher-confirm-type为correlated开启confirms机制 spring: rabbitmq: # publisher-confirms: true # SpringBoot 2.1版本以下 使用 已弃用 publisher-confirm-type: correlated # 开启confirms确认(2.1版本后) 在rabbitTemplate.setConfirmCallback()设置confirms机制的回调函数 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ System.out.println("消息已送达交换机!"); }else { System.out.println("消息未到达交换机!"); } } }); application.yml配置spring.rabbitmq.publisher-returns为true开启Return机制。 spring: rabbitmq: publisher-returns: true # 开启Return机制,确保消息成功路由到队列 通过rabbitTemplate.setReturnCallback()方法这是Return机制的回调函数。

注意 :只用SpringBoot项目投递消息时,不需要在设置mandatory参数为true

//注意:低版本使用setReturnCallback()方法;在高版本中该方法被弃用,使用setReturnsCallback()方法 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) throws UnsupportedEncodingException { String msg = new String(message.getBody()); System.out.println("消息未成功投递到队列:"+msg); } }); 设置消息持久化 //在声明队列的时候直接构造持久化队列 @Bean public Queue queue(){ //nonDurable表示不持久化 ;durable表示持久化 Queue queue = QueueBuilder.durable(QUEUE_NAME).build(); return queue; } //发送消息 设置消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); rabbitTemplate.convertAndSend("", "confirmss", "SpringBoot Confirms Message!", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // MessageDeliveryMode枚举类: // NON_PERSISTENT 表示不持久化 ;PERSISTENT表示持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; } }); 完整SpringBoot确保消息到达队列生产者代码 /** * 通过rabbitTemplate.setConfirmCallback()开启confirms机制 */ @Test public void sendWithConfirms(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ System.out.println("消息已送达交换机!"); }else { System.out.println("消息未到达交换机!"); } } }); //注意:低版本使用setReturnCallback()方法;在高版本中该方法被弃用,使用setReturnsCallback()方法 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { String msg = new String(message.getBody()); System.out.println("消息未成功投递到队列:"+msg); } }); //注意 :只用SpringBoot项目投递消息时,不需要在设置mandatory参数为true //发送消息 设置消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); rabbitTemplate.convertAndSend("", "confirmss", "SpringBoot Confirms Message!", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // MessageDeliveryMode枚举类: // NON_PERSISTENT 表示不持久化 ;PERSISTENT表示持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; } }); } 七、死信队列&延时交换机

成为死信有三种方式:

消息被拒绝并且禁止重新被投放回队列消息过期(设置消息的TTL或者设置队列的TTL ,两种方式都能使消息过期)队列内消息超过最大队列长度 7.0 构造死信队列

两个队列(普通队列、死信队列)两个交换机(普通交换机、死信交换机)两个路由(普通路由、死信路由)

其实死信队列/交换机/路由和普通的都是一样的,在构造普通队列的时候指定另一个交换机为死信交换机即可。

package com.lsh.springboot.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author :LiuShihao * @date :Created in 2022/3/16 9:08 下午 * @desc :构建死信队列 * 两个普通队列 * 两个交换机 * 两个路由 */ @Configuration public class DeadLetterConfig { /**普通交换机*/ public static final String NORMAL_EXCHANGE = "normal-exchange"; /**普通队列*/ public static final String NORMAL_QUEUE = "normal-queue"; /**普通队列路由*/ public static final String NORMAL_ROUTING_KEY = "normal.#"; /**死信交换机*/ public static final String DEAD_EXCHANGE = "dead-exchange"; /**死信队列*/ public static final String DEAD_QUEUE = "dead-queue"; /**死信队列路由*/ public static final String DEAD_ROUTING_KEY = "dead.#"; @Bean public Exchange normalExchange(){ return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build(); } /** * 在构造普通队列时需要设置死信交换机和死信路由 * deadLetterExchange 设置死信交换机 * deadLetterRoutingKey 设置死信路由 * 设置普通队列的ttl,如果队列中消息过期则会进入死信队列 * maxLength 设置队列最大长度 ,如果队列中的消息达到最大长度,此时再进入队列的消息则会被丢弃或者进入死信队列 * @return */ @Bean public Queue normalQueue(){ return QueueBuilder.durable(NORMAL_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey("dead.abd") // .ttl(5000) // 设置队列的TTL(消息存活时间) // .maxLength(1) //设置队列最大长度 .build(); } @Bean public Binding normalBinding(Exchange normalExchange,Queue normalQueue){ return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs(); } @Bean public Exchange deadExchange(){ return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build(); } @Bean public Queue deadQueue(){ return QueueBuilder.durable(DEAD_QUEUE).build(); } @Bean public Binding deadBinding(Exchange deadExchange,Queue deadQueue){ return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } }

7.1 消息被拒绝或者Nack //1.监听普通队列消息,拒绝或者不ack消息,并且requeue为false禁止重新投递队列,则消息会进入死信队列 @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE) public void consumer1(String msg, Channel channel, Message message) throws IOException { System.out.println("监听到普通队列消息:"+msg); //拒绝消息或者不ack确认消息 //设置拒绝消息,并禁止重新投递队列requeue=false // channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); // requeue如果为true则重新排队,如果为false则被丢弃或者进入死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); }

7.2 消息过期

设置TTL(TimeToLive)有两种方式:

直接设置消息的过期时间(存在问题,如果第一个消息TTL为30秒,第二个消息TTL为3秒,此时第二个消息必须等待第一个消息过期之后才能过期)通过设置队列的过期时间

注意:通过消息过期的方式使消息进入死信队列,消费者不能监听普通队列,需要监听死信队列。

//1.设置消息的TTL @Test public void sendDeadLetterQueueAndSetTTL(){ rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.ttl", "通过设置TTL消息存活时间,使消息进入死信队列", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置消息存活时间 String 类型 单位为毫秒 message.getMessageProperties().setExpiration("5000"); return message; } }); System.out.println("消息已发送"); } //2.设置队列的TTL @Bean public Queue normalQueue(){ return QueueBuilder.durable(NORMAL_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey("dead.abd") .ttl(5000)//设置队列的TTL 单位为毫秒 .maxLength(1) .build(); } 7.3 消息超过队列最大长度

在声明队列的时候设置队列的最大长度,则超过这个最大长度后的消息都会被进入死信队列或者被丢弃

@Bean public Queue normalQueue(){ return QueueBuilder.durable(NORMAL_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey("dead.abd") .ttl(5000) .maxLength(1) //设置队列最大长度 .build(); } 7.4 延迟交换机(RabbitMQ Plugins)

使用延时交换机的方式对消息延时是最合适的,因为以上的方式都存在问题:

如果给消息设置过期时间,队列消息是按顺序消费的,后面的消息只能等前面那消息处理后才能被处理,如果后面的消息已经过期了但是前面的消息还没有被处理,则后面的消息无法被过期如果给队列设置过期时间,则更不方便,如果消息需要分别延时不同的时间的话,那就只能分别创建多个不同的队列 这种方式是将消息直接存放在队列中等待过期的,时间不准确,并且有弊端。

而直接使用延时交换机来进行延时处理的话,消息是被存放在延时交换机中的,等待到达延时时间后,才会被投递到队列中,从而直接消费。

注意: 由于消息延时在交换机中,未到达队列中,所以如果如果消息设置了Return机制,则由于消息被延时投递,还未到达队列此时会触发Return回调函数, 并且如果此时RabbitMQ服务重启了,存在延时交换机中的消息会被丢失。

延迟交换机属于RabbitMQ的插件了,需要下载插件,开启配置才能实现消息延时 官网插件地址:https://·/community-plugins.html

延时交换机插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0

将插件拷贝到/rabbitmq_server-3.8.3/plugins目录下,然后进入sbin目录,执行命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange。 注意:通过yum下载的RabbitMQ的目录在 /usr/lib/ 注意版本对应。

此处出现的错误信息unknown exchange type 'x-delayed-message'可以看到,是因为RabbitMQ此时还未启动延时交换机的插件,但是却使用了x-delayed-message的属性。所以导致报错。

7.5 声明延时交换机

1、构造arguments参数 指定交换机类型x-delayed-type为topic 2、指定交换机type为x-delayed-message类型

package com.lsh.springboot.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * @author :LiuShihao * @date :Created in 2022/3/17 10:05 上午 * @desc :构造延时交换机 */ @Configuration public class DelayedExchangeConfig { public static final String DELAYED_EXCHANGE_NAME = "boot-delayed-exchange"; public static final String DELAYED_QUEUE_NAME = "boot-delayed-queue"; public static final String DELAYED_ROUTING_KEY = "*.delayed.*"; //普通队列 @Bean public Queue delayedQueue(){ return QueueBuilder.durable(DELAYED_QUEUE_NAME).build(); } /** * 构造延时交换机 * 1、构造arguments参数 指定交换机类型x-delayed-type为topic * 2、指定type为x-delayed-message类型 */ @Bean public Exchange delayedExchange(){ HashMap<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type","topic"); CustomExchange customExchange = new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments); return customExchange; } @Bean public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } } 7.6 发送延时消息

通过message.getMessageProperties().setDelay(30000)设置消息延时时间,单位为毫秒。

//向延时交换机投递延时消息,如果如果消息设置了Return机制,则由于消息被延时投递,还未到达队列此时会触发Return回调函数 @Test public void sendDelayedExchange(){ rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKe)->{ System.out.println("消息未投递到队列"); }); rabbitTemplate.convertAndSend(DelayedExchangeConfig.DELAYED_EXCHANGE_NAME, "little.delayed.rabbit", "小延时兔子", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置延时时间 单位为毫秒 message.getMessageProperties().setDelay(30000); return message; } }); System.out.println("消息已发送"); } 八、集群高可用

TODO


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #rabbitmq使用方法 #本文内容主要包括 #1 #RabbitMQ的安装部署2 #AMQP架构3 #RabbitMQ的通信方式4