irpas技术客

【完美解决】SpringBoot整合rabbitmq “Failed to declare queue(s)“_商俊帅

irpas 6031

文章目录 一、复现问题二、问题解决—在生产者代码创建队列1.RabbitAdmin是什么?2.RabbitAdmin如何自动创建队列? 三、源码分析—知其然更要知其所以然1.重点看下RabbitAdmin的initiallize()方法2.什么时候调用的initiallize()方法?

一、复现问题

在测试rabbitmq的时候,启动生产者没有问题,启动消费者后突然发现了如下的问题

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[email.queue] at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:700) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE] at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:584) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE] at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:571) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1338) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1183) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60] Caused by: java.io.IOException: null at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1012) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52) ~[amqp-client-5.7.3.jar:5.7.3] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60] at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1190) ~[spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE] at com.sun.proxy.$Proxy70.queueDeclarePassive(Unknown Source) ~[na:na] at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:679) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE] ... 5 common frames omitted Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'email.queue' in vhost '/', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.7.3.jar:5.7.3] ... 14 common frames omitted Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'email.queue' in vhost '/', class-id=50, method-id=10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) ~[amqp-client-5.7.3.jar:5.7.3] at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599) ~[amqp-client-5.7.3.jar:5.7.3] ... 1 common frames omitted

错误描述的信息比较长,我们可以从中筛选出重要的信息,比如caused by后面的信息,下面把caused by的信息摘出来

Failed to declare queue(s) reply-code=404, reply-text=NOT_FOUND - no queue 'email.queue' in vhost '/'

从中可以得出,在rabbitmq中没有创建此队列,打开rabbitmq的管理控制台,这时显示rabbitmq没有队列,所以消费者去监听队列时显示404找不到队列的错误 那就证明了一个问题,生产者没有创建队列成功,或者说就是没有创建队列。看了网上的几篇帖子说的是在rabbitmq的管理控制台手动创建,这怎么能忍的了呢!能让电脑干的事儿就不能让人干。接下来就分享如何在代码中创建队列!

二、问题解决—在生产者代码创建队列

下面请出我们今天的主角—RabbitAdmin;

1.RabbitAdmin是什么?

RabbitAdmin是Spring AMQP封装的一个对象。 在使用RabbitAdmin的时候我们只是配置一个RabbitAdmin对象交给Spring容器做管理。

2.RabbitAdmin如何自动创建队列?

1.配置RabbitAdmin实例

//配置连接工厂 @Bean public CachingConnectionFactory cachingConnectionFactory(){ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(rabbitmqHost); return cachingConnectionFactory; } @Bean public RabbitAdmin rabbitAdmin(){ //需要传入 RabbitAdmin rabbitAdmin = new RabbitAdmin(cachingConnectionFactory()); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; }

2.使用RabbitAdmin实例显式声明队列(主要是这一步)

@Autowired private RabbitAdmin rabbitAdmin; //声明邮件队列 @Bean public Queue emailQueue(){ Queue queue = new Queue(MAIL_QUEUE_NAME, true); //显式声明邮件队列 rabbitAdmin.declareQueue(queue); return queue; }

两个步骤搞定!此时再查看RabbitMQ管理控制台已经创建好了队列

三、源码分析—知其然更要知其所以然

那么RabbitAdmin自动创建队列原理是什么?

1.重点看下RabbitAdmin的initiallize()方法

下面的代码块就是从spring上下文环境(ApplicationContext也叫IOC容器)中拿到咱们配置Exchange、Queue、Binding的Bean实例

Collection<Exchange> contextExchanges = new LinkedList<Exchange>( this.applicationContext.getBeansOfType(Exchange.class).values()); Collection<Queue> contextQueues = new LinkedList<Queue>( this.applicationContext.getBeansOfType(Queue.class).values()); Collection<Binding> contextBindings = new LinkedList<Binding>( this.applicationContext.getBeansOfType(Binding.class).values()); Collection<DeclarableCustomizer> customizers = this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();

拿到之后就开始创建了,就是下面的declareXXX方法

this.rabbitTemplate.execute(channel -> { declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()])); declareQueues(channel, queues.toArray(new Queue[queues.size()])); declareBindings(channel, bindings.toArray(new Binding[bindings.size()])); return null; }); 2.什么时候调用的initiallize()方法?

我们观察到RabbitAdmin这个类实现了InitializingBean接口,该接口是Spring创建bean实例初始化过程后的回调函数,也就是预留给开发者的扩展点。该接口提供的方法是afterPropertiesSet(),此方法中调用了initiallize()方法。那么就可以知道在spring创建bean实例时就已经创建好了队列


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #failed #To #declare #queues #quotFailed