irpas技术客

RabbitMQ实现延迟队列_有趣的灵魂_不世俗的心_rabbitmq延迟队列实现

未知 841

实现方式一:死信队列

AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。 但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:

1、Time To Live(TTL)

RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信) RabbitMQ针对队列中的消息过期时间有两种方法可以设置。 A: 通过队列属性设置,队列中所有消息都有相同的过期时间。 B: 对消息进行单独设置,每条消息TTL可以不同。

2、Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

队列出现dead letter的情况

1、消息或者队列的TTL过期 2、队列达到最大长度 3、消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false 代码实现 首先加入依赖

<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.16</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.78</version> </dependency>

编写配置文件 application.yml

spring: application: name: delay-queue rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin123456 virtual-host: delays1 server: port: 8082

编写队列配置文件

#订单业务队列的名称,交换机名称、路由key、超时时间 delay.bussiness.queue: order_time_out_15s delay.bussiness.excahnge: order_time_out_15s_exchange delay.bussiness.route: order_time_out_15s_776 #死信队列的名称,交换机名称、路由key、超时时间 delay.dead.queue: dead_order_time_out_15s delay.dead.excahnge: dead_order_time_out_15s_exchange delay.dead.route: dead_order_time_out_15s_666 delay.bussiness.order.timeout: 15000 #利用rabbitmq_delayed_message_exchange实现延迟队列的方式 #插件实现订单业务队列的名称,交换机名称、路由key、超时时间 delay.plugins.queue: plugin_order_delay_30 delay.plugins.exchange: plugin_order_exchange_30 delay.plugins.route.key: plugin_order_route_key_30 delay.plugin.timeout: 30000

编写配置读取类

@Configuration @PropertySource("classpath:rabbitmqs.properties") @Data public class OrderDelayConfig { @Value("${delay.bussiness.queue}") private String orderDelayQueueName; @Value("${delay.bussiness.excahnge}") private String orderDelayExchangeName; @Value("${delay.bussiness.route}") private String orderDelayRouteKey; @Value("${delay.dead.queue}") private String orderDeadDelayQueueName; @Value("${delay.dead.excahnge}") private String orderDeadDelayExchangeName; @Value("${delay.dead.route}") private String orderDeadDelayRouteKey; @Value("${delay.bussiness.order.timeout}") private Long timeout; @Value("${delay.plugins.queue}") private String pluginOrderQueueName; @Value("${delay.plugins.exchange}") private String pluginOrderExchangeName; @Value("${delay.plugins.route.key}") private String pluginOrderRouteKey; @Value("${delay.plugin.timeout}") private Long pluginTimeout; }

编写队列、交换机创建、交换机和队列、路由key值绑定的配置类

//rabbitMq内置死信队列信息 private final String dlexchange = "x-dead-letter-exchange"; private final String dlRouteKey = "x-dead-letter-routing-key"; private final String ttl = "x-message-ttl"; @Autowired private OrderDelayConfig orderDelayConfig; //创建死信交换机 @Bean("orderDeadExchange") public DirectExchange deadTopicExchange() { return new DirectExchange(orderDelayConfig.getOrderDeadDelayExchangeName()); } //创建业务交换机 @Bean("orderExchange") public DirectExchange payTopicExchange() { return new DirectExchange(orderDelayConfig.getOrderDelayExchangeName()); } //创建死信队列 @Bean("orderDeadQueue") public Queue deadQueue() { return new Queue(orderDelayConfig.getOrderDeadDelayQueueName()); } /** * 创建订单超时队列 */ @Bean("orderQueue") public Queue payQueue() { Map<String, Object> params = new HashMap<>(); //设置队列的过期时间 params.put(ttl, orderDelayConfig.getTimeout()); //声明当前队列绑定的死信交换机 params.put(dlexchange, orderDelayConfig.getOrderDeadDelayExchangeName()); //声明当前队列的死信路由键 params.put(dlRouteKey, orderDelayConfig.getOrderDeadDelayRouteKey()); return QueueBuilder.durable(orderDelayConfig.getOrderDelayQueueName()).withArguments(params).build(); } //订单数据队列绑定交换机和route的key值 @Bean public Binding delayBindingA(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(orderDelayConfig.getOrderDelayRouteKey()); } //死信队列与死信交换机进行绑定 @Bean public Binding BindingErrorQueueAndExchange(@Qualifier("orderDeadQueue") Queue deadQueue, @Qualifier("orderDeadExchange") DirectExchange exchange) { return BindingBuilder.bind(deadQueue).to(exchange).with(orderDelayConfig.getOrderDeadDelayRouteKey()); }

以上已经完成了配置工作,下面需要完成业务代码实现 1、新建订单实体类

@Data public class Order { /** * 订单编号 */ private String orderNo; /** * 价格(元) */ private BigDecimal price; /** * 商品数量 */ private int prodductNum; /** * 总金额 */ private BigDecimal totalAmount; /** * 创建时间 */ private Date createTime; }

定义消息发送和消息消费

@Component @EnableScheduling @Slf4j public class OrderDelayQueue { private RabbitTemplate rabbitTemplate; private OrderDelayConfig orderDelayConfig; private final static String orderQueueName = "dead_order_time_out_15s"; public OrderDelayQueue(RabbitTemplate rabbitTemplate, OrderDelayConfig orderDelayConfig) { this.rabbitTemplate = rabbitTemplate; this.orderDelayConfig = orderDelayConfig; } @Scheduled(cron = "0/30 * * * * ?") public void sendOrderMsg() { Order order; for (int i = 0; i < 3; i++) { // Thread.sleep(1000); order = new Order(); order.setOrderNo(new Snowflake().nextIdStr()); order.setCreateTime(new Date()); rabbitSendMsg(JSON.toJSONString(order)); } } /** * 发送消息 * * @param msg */ public void rabbitSendMsg(String msg) { rabbitTemplate.convertAndSend(orderDelayConfig.getOrderDelayExchangeName(), orderDelayConfig.getOrderDelayRouteKey(), msg); } //消费死信队列的消息 @RabbitListener(queues = orderQueueName) public void infoConsumption(String data) throws Exception { //此处编写执行订单超时状态的逻辑代码 final String nowformat = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"); final Order order = JSONObject.parseObject(data, Order.class); final long diff = (System.currentTimeMillis() - order.getCreateTime().getTime()) / 1000; log.info(order.getOrderNo() + "死信队列========:订单已经超时了" + "失效时间" + diff + "秒"); } }

到此完成全部代码,启动项目执行即可完成死信队列实现延迟队列功能。

方式2:利用rabbitmq_delayed_message_exchange实现延迟队列 安装插件

1、下载延时消息插件:https://·/community-plugins.html 2、将下载的文件放在rabbitmq的安装路径plugins文件中

安装插件 打开rabbitmq的命令界面 执行命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 入上图所示表示安装成功

代码实现

配置信息,在第一种方式中已配置

1、队列、交换机、路由key值创建,绑定 //插件实现创建业务交换机 @Bean("pluginOrderExchange") public CustomExchange pluginExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(orderDelayConfig.getPluginOrderExchangeName(), "x-delayed-message", true, false, args); } /** * 插件实现创建订单超时队列 */ @Bean("pluginOrderQueue") public Queue pluginQueue() { return new Queue(orderDelayConfig.getPluginOrderQueueName()); } /** * 插件实现绑定交换机,和指定的key值 * @param queue * @param exchange * @return */ @Bean public Binding delayBindingPlugin(@Qualifier("pluginOrderQueue") Queue queue, @Qualifier("pluginOrderExchange") CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(orderDelayConfig.getPluginOrderRouteKey()).noargs(); } 2、消息发送,消息消费 @Component @EnableScheduling @Slf4j public class OrderPluginDelayQueue { private RabbitTemplate rabbitTemplate; private OrderDelayConfig orderDelayConfig; private final static String orderQueueName = "plugin_order_delay_30"; public OrderPluginDelayQueue(RabbitTemplate rabbitTemplate, OrderDelayConfig orderDelayConfig) { this.rabbitTemplate = rabbitTemplate; this.orderDelayConfig = orderDelayConfig; } @Scheduled(cron = "0/30 * * * * ?") public void sendOrderMsg() { Order order; for (int i = 0; i < 3; i++) { // Thread.sleep(1000); order = new Order(); order.setOrderNo(new Snowflake().nextIdStr()); order.setCreateTime(new Date()); rabbitSendMsg(JSON.toJSONString(order),orderDelayConfig.getPluginTimeout().intValue()); } } /** * 发送消息 * * @param msg */ public void rabbitSendMsg(String msg,int delayTime) { rabbitTemplate.convertAndSend(orderDelayConfig.getPluginOrderExchangeName(), orderDelayConfig.getPluginOrderRouteKey(), msg,s->{ s.getMessageProperties().setDelay(delayTime); return s; }); } //消费死信队列的消息 @RabbitListener(queues = orderQueueName) public void infoConsumption(String data) throws Exception { //此处编写执行订单超时状态的逻辑代码 final String nowformat = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"); final Order order = JSONObject.parseObject(data, Order.class); final long diff = (System.currentTimeMillis() - order.getCreateTime().getTime()) / 1000; log.info(order.getOrderNo() + "插件实现=============订单已经超时了" + "失效时间" + diff + "秒"); } }

以上完成第二种方式代码 实现结果截图


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #rabbitmq延迟队列实现 #To #或者 #针对Message设置