irpas技术客

消息队列的介绍及配置_乘浪初心_消息队列配置参数

网络投稿 2689

一、消息队列详解

消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候

主要解决:应用耦合、异步消息、流量削锋等问题。实现高性能、高可用、可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件

目前在生产环境,使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等

1、 消息队列的两种模型

?1.1、点对点模型

每个消息只有一个接收者(Consumer),一旦被消息,就不再在消息队列中 发送者和接收者间没有依赖性,发送者发送消息后,不管有没有接收者在运行,不会影响下一次发送

1.2、发布订阅模型

每个消息可以有多个订阅者 每个订阅者都可以接收到主题的所有消息

两种模型区别:一份消息是否能被多次消费 如果只有一个订阅者,两个模型基本一样,所以发布订阅模型在功能层面是兼容队列模型的

?2、消息队列实现分布式事务

?一个严格意义的事务实现是ACID4个属性:原子性、一致性、隔离性、持久性原子性:一个事务操作不可分割,要么全部成功,要么全部失败,不能一半成功一半失败一致性:事务执行完成之前的时间点,读到的一定是更新前的数据,之后读到的一定是更新后的数据隔离性:一个事务的执行不能被其他事务干扰(一个事务内部的操作及使用的数据对正在进行的其他事务是隔离的,并发执行的各个事务之间不能互相干扰)持久性:事务一旦提交,后续的其他操作和故障都不会对事务的结果有任何影响

?在分布式系统中,光是要实现数据一致性就已经非常困难了,所以一般只保证达到最终一致性。比较常见的分布式事务实现有

1)、2PC(Two-phase Commit)二阶段提交2)、TCC(Try-Confirm-Cancel)3)、事务消息

3、 消息队列分布式事务实现

?Kafka和RocketMQ都提供了事务相关功能,下面以订单为例看下如何如何实现的

?Kafka和RocketMQ都提供了事务相关功能,核心:半消息半消息:这个半消息不是指消息内容不完整,而是指在事务提交前,对于消费者来说,这个消息是不可见的(sendMessageInTransaction)

(一)、消息队列的应用场景(或作用)

应用场景分为:异步处理、应用解耦、流量削锋和消息通讯等等;其中最主要的的是:异步处理、应用解耦、流量削锋

1、异步处理

场景:在用户注册后,需要发送注册邮件和发送注册信息,传统的做法有两种:串行方式、并行方式

传统模式的缺点:一些非必要的业务逻辑以同步的方式运行,太耗费时间中间件模式的的优点:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

1.1、串行方式

将注册信息写入数据库成功后,发送注册邮件,然后发送注册短信,而所有任务执行完成后,返回信息给客户端;如图

1.2、并行方式

将注册信息写入数据库成功后,同时进行发送注册邮件和发送注册短信的操作。而所有任务执行完成后,返回信息给客户端。同串行方式相比,并行方式可以提高执行效率,减少执行时间

假设三个操作均需要50ms的执行时间,排除网络因素,则最终执行完成,串行方式需要150ms,而并行方式需要100ms 因为cpu在单位时间内处理的请求数量是一致的,假设:CPU每1秒吞吐量是100此,则串行方式1秒内可执行的请求量为1000/150,不到7次;并行方式1秒内可执行的请求量为1000/100,为10次 可以看出,传统串行和并行的方式会受到系统性能的局限,那么如何解决这个问题? 我们需要引入消息队列,将不是必须的业务逻辑,异步进行处理,由此改造出来的流程如下图

根据上述的流程,用户的响应时间基本相当于将用户数据写入数据库的时间,发送注册邮件、发送注册短信的消息在写入消息队列后,即可返回执行结果,写入消息队列的时间很快,几乎可以忽略,也有此可以将系统吞吐量提升至20QPS(每秒查询率),比串行方式提升近3倍,比并行方式提升2倍

?QPS:每秒查询率,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准

?因特网上,经常用每秒查询率来衡量域名系统服务器的机器的性能,即为QPS对应fetches/sec,即每秒的响应请求数,也即是最大吞吐能力

2、应用解耦

场景:某一个系统A要与其他系统打交道(即调用其中的方法),如果其它系统改变或者新增系统,那么A系统都会改变,这样的话耦合度比较高,比较麻烦

中间件模式: 使用消息队列来解决这个问题

我们A系统将产生的数据发入消息队列中,其它的系统再去消息队列来进行消费,那么其他系统的减少或者新增系统即与A系统关系不大了,这样来实现解耦的功能

3、流量削锋

如场景:商品秒杀业务,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列

1)、可以控制参与活动的人数2)、可以缓解短时间内高流量对应用的巨大压力

处理方式如图:

1)、服务器在接收到用户请求后,首先写入消息队列。这时如果消息队列中消息数量超过最大数量,则直接拒绝用户请求或返回跳转到错误页面2)、秒杀业务根据秒杀规则读取消息队列中的请求信息,进行后续处理

4、消息通讯

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题

日志采集客户端:负责日志数据采集,定时写受写入Kafka队列Kafka消息队列:负责日志数据的接收,存储和转发日志处理应用:订阅并消费kafka队列中的日志数据

(二)、使用消息队列后的缺点

1、系统可用性降低:系统可用性在某种程度上降低,为什么这样说呢?在加入MQ之前,你不用考虑消息丢失或者说MQ挂掉等等的情况,但是,引入MQ之后你就需要去考虑了!2、系统复杂性提高:加入MQ之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!3、一致性问题:消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了

(三)、常见的消息队列 常见消息队列对比ActiveMQRabbitMQRocketMQkafka开发语言javaerlangjavascala单机吞吐量万级万级十万级十万级时效性ms级us级ms级ms级以内可用性高(主从架构)高(主从架构)非常高(分布式架构)非常高(分布式架构)功能特性成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富MQ功能比较完备,扩展性佳只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广

?根据上面的对比得出以下结论

1)、中小型软件公司,建议选RabbitMQ 因为erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。正所谓,成也萧何,败也萧何!他的弊端也在这里,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要 不考虑kafka的原因是,中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除 不考虑rocketmq的原因是,rocketmq是阿里出品,如果阿里放弃维护rocketmq,中小型公司一般抽不出人来进行rocketmq的定制化开发,因此不推荐

2)、大型软件公司,根据具体使用在rocketMQ和kafka之间二选一 大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对rocketMQ,大型软件公司也可以抽出人手对rocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的。至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。具体该选哪个,看使用场景

(四)、消息队列的高可用

rcoketMQ的集群就有多master 模式、多master多slave异步复制模式、多 master多slave同步双写模式,如图:

Producer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息

kafka

?一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息

rabbitMQ:也有普通集群和镜像集群模式这里就不多说明

(五)、消息的重复消费

造成重复消费的原因:消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同,如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念,简单说一下,就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。那造成重复消费的原因?,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者

解决方法:1、例如:你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据2、在例如:你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作3、准备一个第三方介质:来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可

(六)、消费的性传输

每种MQ都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据

RabbitMQ

1、生产者弄丢数据

从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit()) 然而缺点就是吞吐量下降了。因此,在生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作

2、消息队列丢数据

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发 那么如何持久化呢,其实也很容易,就下面两步1、将queue的持久化标识durable设置为true,则代表是一个持久的队列2、发送消息的时候将deliveryMode=2 这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据

3、消费者丢数据

消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rahbitMQ会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息,就会丢失该消息 至于解决方案,采用手动确认消息即可

二、RabbitMQ安装 1、?安装Erlang

官方下载Erlang:Downloads - Erlang/OTP

[root@servers ~]# yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel [root@servers ~]# tar xvf otp_src_24.2-rc1.tar.gz [root@servers ~]# cd otp_src_24.2 [root@servers otp_src_24.2]# ./configure --prefix=/usr/local/erlang --with-ssl -enable-threads -enable-smmp-support -enable-kernel-poll --enable-hipe --without-javac [root@servers otp_src_24.2]# make && make install [root@servers otp_src_24.2]# vim /etc/profile ................ 在最后加入 ........ ERLANG_HOME=/usr/local/erlang PATH=$ERLANG_HOME/bin:$PATH export ERLANG_HOME export PATH 保存 root@servers ~]# source /etc/profile [root@servers ~]# erl #验证是否安装成功 Erlang/OTP 24 [erts-12.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] Eshell V12.0 (abort with ^G) 1> ctrl + C 退出 如果一次没有退出就多按几次 2、安装RadditMQ

下载RabbitMQ:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.9.12

?

[root@servers ~]# xz -d rabbitmq-server-generic-unix-3.9.12.tar.xz [root@servers ~]# tar xf rabbitmq-server-generic-unix-3.9.12.tar [root@servers ~]# cp -rf rabbitmq_server-3.9.12 /usr/local/ [root@servers ~]# cd /usr/local/ [root@servers local]# mv rabbitmq_server-3.9.12 rabbitmq [root@servers local]# cd rabbitmq/sbin/ [root@servers sbin]# ./rabbitmq-plugins enable rabbitmq_management #开启管理页面插件 Enabling plugins on node rabbit@C7--13: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@C7--13... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch set 3 plugins. Offline change; changes will take effect at broker restart. 启动与关闭服务 [root@servers sbin]# ./rabbitmq-server #启动,ctrl + c 退出及关闭 [root@servers sbin]# ./rabbitmq-server -detached #在后台启动服务 [root@servers sbin]# ./rabbitmqctl stop #关闭服务 添加管理员账号

?在后台启动后进行添加用户

添加用户格式: ./rabbitmqctl add_user 用户名 密码 [root@servers sbin]# ./rabbitmqctl add_user admin 123.com Adding user "admin" ... Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more. 分配用户标签格式: ./rabbitmqctl add_user_tags 用户名 管理员标签[administrator] [root@servers sbin]# ./rabbitmqctl set_user_tags admin administrator Setting tags for user "admin" to [administrator] ... 访问测试

输入之前设置的用户:admin 和密码:123.com 进行登录

?三、RabbitMQ详解 (一)、AMQP简介

RabbitMQ是实现了高级消息队列协议(AMQP协议)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库

AMQP协议:即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然AMQP协议的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗

AMQP的三层协议Module Layer位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息Session Layer主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理Transport Layer主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示
(二)、功能 存储转发多个消息发送者,单个消息接收者分布式事务多个消息发送者,多个消息接收者发布订阅多个消息发送者,多个消息接收者基于内容的路由多个消息发送者,多个消息接收者文件传输队列多个消息发送者,多个消息接收者点对点连接单个消息发送者,单个消息接收者
(三)、术语说明 AMQP模型(AMQP Model)一个由关键实体和语义表示的逻辑框架,遵从AMQP规范的服务器必须提供这些实体和语义。为了实现本规范中定义的语义,客户端可以发送命令来控制AMQP服务器连接(Connection)一个网络连接,比如TCP/IP套接字连接会话(Session)端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”信道(Channel)多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质客户端(Client)AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息服务器(Server)接受客户端连接,实现AMQP消息队列和路由功能的进程。也称为“消息代理”端点(Peer)AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)搭档(Partner)当描述两个端点之间的交互过程时,使用术语“搭档”来表示“另一个”端点的简记法。比如我们定义端点A和端点B,当它们进行通信时,端点B是端点A的搭档,端点A是端点B的搭档片段集(Assembly)段的有序集合,形成一个逻辑工作单元段(Segment)帧的有序集合,形成片段集中一个完整子单元帧(Frame)AMQP传输的一个原子单元。一个帧是一个段中的任意分片控制(Control)单向指令,AMQP规范假设这些指令的传输是不可靠的命令(Command)需要确认的指令,AMQP规范规定这些指令的传输是可靠的异常(Exception)在执行一个或者多个命令时可能发生的错误状态类(Class)一批用来描述某种特定功能的AMQP命令或者控制消息头(Header)描述消息数据属性的一种特殊段消息体(Body)包含应用程序数据的一种特殊段。消息体段对于服务器来说完全透明——服务器不能查看或者修改消息体消息内容(Content)包含在消息体段中的的消息数据交换器(Exchange)服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列交换器类型(Exchange Type)基于不同路由语义的交换器类消息队列(Message Queue)一个命名实体,用来保存消息直到发送给消费者绑定器(Binding)消息队列和交换器之间的关联绑定器关键字(Binding Key)绑定的名称。一些交换器类型可能使用这个名称作为定义绑定器路由行为的模式路由关键字(Routing Key)一个消息头,交换器可以用这个消息头决定如何路由某条消息持久存储(Durable)一种服务器资源,当服务器重启时,保存的消息数据不会丢失临时存储(Transient)一种服务器资源,当服务器重启时,保存的消息数据会丢失持久化(Persistent)服务器将消息保存在可靠磁盘存储中,当服务器重启时,消息不会丢失非持久化(Non-Persistent)服务器将消息保存在内存中,当服务器重启时,消息可能丢失消费者(Consumer)一个从消息队列中请求消息的客户端应用程序生产者(Producer)一个向交换器发布消息的客户端应用程序虚拟主机(Virtual Host)一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。客户端应用程序在登录到服务器之后,可以选择一个虚拟主机 ?
(四)、RabbitMQ的工作流程

消息队列有三个概念: 发消息者、消息队列、收消息者。RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和队列之间, 加入了交换器 (Exchange)。这样发消息者和消息队列就没有直接联系,转而变成发消息者把消息发给交换器,交换器根据调度策略再把消息转发给消息队列 消息生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange的Channel,将消息发送给Exchange。Exchange根据路由规则,将消息转发给指定的消息队列。消息队列储存消息,等待消费者取出消息。消费者通过建立与消息队列相连的Channel,从消息队列中获取消息

Producer(消息的生产者)向消息队列发布消息的客户端应用程序Channel(信道)多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道Routing Key(路由键)消息头的一个属性,用于标记消息的路由规则,决定了交换机的转发路径。最大长度255 字节BrokerRabbitMQ Server,服务器实体Binding(绑定)用于建立Exchange和Queue之间的关联。一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表Exchange(交换器|路由器)提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。交换器用于转发消息,它不会存储消息 ,如果没有 Queue绑定到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。交换器有四种消息调度策略,分别是fanout, direct, topic, headersBinding Key(绑定键)Exchange与Queue的绑定关系,用于匹配Routing Key。最大长度255 字节Queue(消息队列)存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将消息取走。需要注意,当多个消费者订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,每一条消息只能被一个订阅者接收Consumer(消息的消费者)从消息队列取得消息的客户端应用程序Message(消息)消息由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)等
(五)、Exchange消息调度策略

交换器的功能主要是接收消息并且转发到绑定的队列,交换器不存储消息,在启用ack模式后,交换器找不到队列会返回错误

调度策略是指Exchange在收到生产者发送的消息后依据什么规则把消息转发到一个或多个队列中保存。调度策略与三个因素相关:Exchange Type(Exchange的类型),Binding Key(Exchange和Queue的绑定关系),消息的标记信息(Routing Key和headers)Exchange根据消息的Routing Key和Exchange绑定Queue的Binding Key分配消息。生产者在将消息发送给Exchange的时候,一般会指定一个Routing Key,来指定这个消息的路由规则,而这个Routing Key需要与Exchange Type及Binding Key联合使用才能最终生效 在Exchange Type与Binding Key固定的情况下(一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定Routing Key来决定消息流向哪里

交换器的四种消息调度策略:fanout, direct, topic, headers

1、Fanout (订阅模式/广播模式)

?交换器会把所有发送到该交换器的消息路由到所有与该交换器绑定的消息队列中。订阅模式 与Binding Key和Routing Key无关,交换器将接受到的消息分发给有绑定关系的所有消息队列队列(不论Binding Key和Routing Key是什么)。类似于子网广播,子网内的每台主机都获得了一份复制的消息。Fanout交换机转发消息是最快的

?2、Direct(路由模式)

?精确匹配:当消息的Routing Key与 Exchange和Queue 之间的Binding Key完全匹配,如果匹配成功,将消息分发到该Queue。只有当Routing Key和Binding Key完全匹配的时候,消息队列才可以获取消息。Direct是Exchange的默认模式RabbitMQ默认提供了一个Exchange,名字是空字符串,类型是Direct,绑定到所有的Queue(每一个Queue和这个无名Exchange之间的Binding Key是Queue的名字)。所以,有时候我们感觉不需要交换器也可以发送和接收消息,但是实际上是使用了RabbitMQ默认提供的Exchange

?3、Topic (通配符模式)

按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的Binding Key进行模糊匹配,如果匹配成功,将消息分发到该QueueRouting Key是一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding Key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

?4、Headers(键值对模式)

Headers不依赖于Routing Key与Binding Key的匹配规则来转发消息,交换器的路由规则是通过消息头的Headers属性来进行匹配转发的,类似HTTP请求的Headers 在绑定Queue与Exchange时指定一组键值对,键值对的Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或all,代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any) 当消息发送到Exchange时,交换器会取到该消息的headers,对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。Headers交换机的优势是匹配的规则不被限定为字符串(String),而是Object类型

?(六)、RPC(Remote Procedure Call,远程过程调用)

?MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道) 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC

?RabbitMQ中实现RPC的机制是:1、客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14个属性,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)2、服务器端收到消息并处理3、服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性4、客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理

(七)、消息确认:Message acknowledgment

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在Timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开 这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的问题,Queue中堆积的消息会越来越多,消费者重启后会重复消费这些消息并重复执行业务逻辑 如果我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ会立即把这个Message标记为完成,然后从queue中删除了

(八)、消息持久化:Message durability

如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务

(九)、分发机制

我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据。因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里。这时,就得使用工作队列了。一个队列有多个消费者同时消费数据工作队列有两种分发数据的方式:轮询分发(Round-robin)和 公平分发(Fair dispatch)

轮询分发:队列给每一个消费者发送数量一样的数据

公平分发:消费者设置每次从队列中取一条数据,并且消费完后手动应答,继续从队列取下一个数据

1、轮询分发:Round-robin dispatching

如果工作队列中有两个消费者,两个消费者得到的数据量一样的,并不会因为两个消费者处理数据速度不一样使得两个消费者取得不一样数量的数据。但是这种分发方式存在着一些隐患,消费者虽然得到了消息,但是如果消费者没能成功处理业务逻辑,在RabbitMQ中也不存在这条消息。就会出现消息丢失并且业务逻辑没能成功处理的情况

2、公平分发:Fair dispatch

消费者设置每次从队列里取一条数据,并且关闭自动回复机制,每次取完一条数据后,手动回复并继续取下一条数据。与轮询分发不同的是,当每个消费都设置了每次只会从队列取一条数据时,并且关闭自动应答,在每次处理完数据后手动给队列发送确认收到数据。这样队列就会公平给每个消息费者发送数据,消费一条再发第二条,而且可以在管理界面中看到数据是一条条随着消费者消费完从而减少的,并不是一下子全部分发完了。采用公平分发方式就不会出现消息丢失并且业务逻辑没能成功处理的情况

(十)、事务

对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器接收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务

(十一)、Confirm机制

使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息是否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重

(十二)、Alternate Exchange(代替交换器)

Alternate Exchange是Rabbitmq自己扩展的功能,不是AMQP协议定义的 创建Exchange指定该Exchange的Alternate Exchange,发送消息的时候如果Exchange没有成功把消息路由到队列中去,这就会将此消息路由到Alternate Exchange属性指定的Exchange上了。需要在创建Exchange时添加alternate-exchange属性。如果Alternate Exchange也没能成功把消息路由到队列中去,这个消息就会丢失。可以触发publish confirm机制,表示这个消息没有确认

创建交换器时需要指定如下属性 Map<String,Object> argsMap = new HashMap<>(); argsMap.put(“alternate-exchange”,“Alternate Exchange Name”); (十三)、TTL(生存时间)

RabbitMQ允许您为消息和队列设置TTL(生存时间)。 可以使用可选的队列参数或策略完成(推荐使用后一个选项)。 可以为单个队列,一组队列或单个消息应用消息TTL

设置消息的过期时间 MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration(“30000”); 设置队列中消息的过期时间 在声明一个队列时,可以指定队列中消息的过期时间,需要添加x-message-ttl属性 Map<String, Object> arguments = new HashMap<>(); arguments.put(“x-message-ttl”,30000);

如果同时制定了Message TTL,Queue TTL,则时间短的生效

(十四)、Queue Length Limit(队列长度限制)

可以设置队列中消息数量的限制,如果测试队列中最多只有5个消息,当第六条消息发送过来的时候,会删除最早的那条消息。队列中永远只有5条消息 使用代码声明含有x-max-length和x-max-length-bytes属性的队列Max length(x-max-length) 用来控制队列中消息的数量 如果超出数量,则先到达的消息将会被删除掉

Max length bytes(x-max-length-bytes) 用来控制队列中消息总的大小 如果超过总大小,则最先到达的消息将会被删除,直到总大小不超过x-max-length-byte为止

Map<String, Object> arguments = new HashMap<>(); arguments.put(“x-max-length”,3); #表示队列中最多存放三条消息 Map<String, Object> arguments = new HashMap<>(); arguments.put(“x-max-length-bytes”,10); #队列中消息总的空间大小 (十五)、Dead Letter Exchange(死信交换器)

在队列上指定一个Exchange,则在该队列上发生如下情况1、消息被拒绝(basic.reject or basic.nack),且requeue=false2、消息过期而被删除(TTL)3、消息数量超过队列最大限制而被删除4、消息总大小超过队列最大限制而被删除

就会把该消息转发到指定的这个exchange 需要定义了x-dead-letter-exchange属性,同时也可以指定一个可选的x-dead-letter-routing-key,表示默认的routing-key,如果没有指定,则使用消息原来的routeing-key进行转发

当定义队列时指定了x-dead-letter-exchange(x-dead-letter-routing-key视情况而定),并且消费端执行拒绝策略的时候将消息路由到指定的Exchange中去 我们知道还有二种情况会造成消息转发到死信队列 一种是消息过期而被删除,可以使用这个方式使的rabbitmq实现延迟队列的作用。还有一种就是消息数量超过队列最大限制而被删除或者消息总大小超过队列最大限制而被删除

(十六)、priority queue(优先级队列)

声明队列时需要指定x-max-priority属性,并设置一个优先级数值

消息优先级属性 MessageProperties messageProperties = new MessageProperties(); messageProperties.setPriority(priority);

如果设置的优先级小于等于队列设置的x-max-priority属性,优先级有效 如果设置的优先级大于队列设置的x-max-priority属性,则优先级失效

创建优先级队列,需要增加x-max-priority参数,指定一个数字。表示最大的优先级,建议优先级设置为1~10之间 发送消息的时候,需要设置priority属性,最好不要超过上面指定的最大的优先级 如果生产端发送很慢,消费者消息很快,则有可能不会严格的按照优先级来进行消费1、发送的消息的优先级属性小于设置的队列属性x-max-priority值,则按优先级的高低进行消费,数字越高则优先级越高2、送的消息的优先级属性都大于设置的队列属性x-max-priority值,则设置的优先级失效,按照入队列的顺序进行消费3、费端一直进行监听,而发送端一条条的发送消息,优先级属性也会失效

RabbitMQ不能保证消息的严格的顺序消费

(十七)、延迟队列

延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费 延迟队列多用于需要延迟工作的场景

最常见的是以下两种场景:

?1、消费如:用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用2、延迟重试如:消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试 我们可以利用RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。实现延迟队列

Time-To-Live ExtensionsRabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用

Dead Letter Exchange 刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:1、消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false2、消息因为设置了TTL而过期3、消息进入了一条已经达到最大长度的队列 如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列

?四、RabbitMQ配置 (一)、RabbitMQ 常用命令 1、Vhost虚拟机

每个RabbitMQ服务器都能创建虚拟主机(virtual host),简称vhost。每个vhost本质上是一个独立的小型RabbitMQ服务器,拥有自己独立的队列、交换器及绑定关系等,并且它拥有自己独立的权限,RabbitMQ默认创建vhost为 “ / ”

创建vhost rabbitmqctl add_vhost {vhost} 查看所有vhost rabbitmqctl list_vhosts 删除指定vhost rabbitmqctl delete_vhost {vhost} 2、用户管理

在RabbitMQ中,用户是访问控制的基本单元,且单个用户可以跨越多个vhost进行授权

创建用户 rabbitmqctl add_user {username} {password} 修改密码 rabbitmqctl change_password {username} {password} 清除密码 rabbitmqctl clear_password {username} 验证用户 rabbitmqctl authenticate_user {username} {password} 删除用户 rabbitmqctl delete_user {username} 用户列表 rabbitmqctl list_users 3、权限管理

?用户权限指的是用户对exchange,queue的操作权限,包括配置权限,读写权限。配置权限会影响到exchange,queue的声明和删除。读写权限影响到从queue里取消息,向exchange发送消息以及queue和exchange的绑定(bind)操作

RabbitMQ中,权限控制是以vhost为单位,创建用户时,将被指定至少一个vhost,默认的vhost是 “ / ”

授予权限 rabbitmqctl set_permissions [-p vhost] {user}{conf}{write}{read} 配置项说明vhost授权用户访问指定的vhostuser用户名conf一个用于匹配用户在哪些资源上拥有可配置权限的正则表达式,例如:".*"表示全部write一个用于匹配用户在哪些资源上拥有可写权限的正则表达式 ,例如:".*"表示全部read一个用于匹配用户在哪些资源上拥有可读权限的正则表达式,例如:".*"表示全部
收回权限 rabbitmqctl clear_permissions [-p vhost] {username} 虚拟主机权限列表 rabbitmqctl list_permissions [-p vhost] 查看指定用户权限 rabbitmqctl list_user_permissions {username} 4、角色分配

rabbitmq的角色有5种类型

rabbitmqctl set_user_tags {username} {tag…} User为用户名 Tag为角色名(对应的administrator,monitoring,policymaker,management,或其他自定义名称) 也可以给同一用户设置多个角色,例: rabbitmqctl??set_user_tags??hncscwc??monitoring??policymaker 配置项说明

none

其他

无任何角色,新创建的用户默认角色为none

management

普通管理者

可以访问web管理页面,无法看到节点信息,也无法对策略进行管理

policymaker

策略制定者

包含management的所有权限,并可以管理策略和参数,但无法查看节点的相关信息

monitoring

监控者

包含management的所有权限,并可以看到所有连接(启用management plugin的情况下)、信道及节点相关信息(进程数,内存使用情况,磁盘使用情况等)

administartor

超级管理员

包含minitoring的所有权限,并可以管理用户、虚拟主机、权限、策略、参数
5、Web端管理

RabbitMQ Management 插件可以提供Web界面来管理RabbitMQ中的虚拟主机、用户、角色、队列、交换器、绑定关系、策略、参数等,也可用于监控RabbitMQ服务的状态及一些统计信息

启动插件 rabbitmq-plugins enable rabbitmq_management 关闭插件 rabbitmq-plugins disable rabbitmq_management 插件列表:其中标记为[E*]为显示启动,其中标记为[e*]为隐式启动,开启此功能后需要重启服务才可以正式生效 rabbitmq-plugins list 6、RabbitMQ 管理 [root@servers sbin]# ./rabbitmq-server #启动,ctrl + c 退出及关闭 [root@servers sbin]# ./rabbitmq-server -detached #在后台启动服务 [root@servers sbin]# ./rabbitmqctl stop #关闭服务 [root@C7--13 sbin]# ./rabbitmq-server status #查看状态 7、查看队列列表 rabbitmqctl list_queues[-p vhost][queueinfoitem…] 返回列说明name队列名称durable队列是否持久化auto_delete队列是否自动删除arguments队列参数policy应用到队列上的策略名称pid队列关联的进程IDowner_pid处理排他队列连接的进程IDexclusive队列是否排他
8、查看交换器列表 rabbitmqctl list_exchanges [-p vhost][exchangeinfoitem…] 返回列说明name交换器名称type交换器类型durable交换器是否持久化auto_delete交换器是否自动删除internal是否是内置交换器arguments交换器的参数policy交换器的策略
9、查看绑定关系的列表 rabbitmqctl list_bindings [-p] [bindinginfoitem…] 返回列说明source_name消息来源的名称source_kind消息来源的类别destination_name消息目的地的名称destination_kind消息目的地的种类?routing_key绑定的路由键arguments绑定的参数?
10、查看连接信息列表 rabbitmqctl list_connections [connectioninfoitem …] 返回列说明pid与连接相关的进程IDname连接名称port服务器端口host服务器主机名peer_port服务器对端端口。当一个客户端与服务器连接时,这个客户端的端口就是peer_portpeer_host服务器对端主机名称,或IPssl是否启用SSLstate连接状态,包括starting\tning\opening\running\flow\blocking\blocked\closing\closedchannels连接中的信道个数protocol使用的AMQP协议版本user与连接相关的用户名vhost与连接相关的vhost名称timeout连接超时时长,单位秒
11、查看信道列表 rabbitmqctl list_channels [channelinfoitem…] 返回列说明pid与连接相关的进程IDconnection信道所属连接的进程IDname信道名称number信道的序号user与连接相关的用户名vhost与连接相关的vhost名称transactional信道是否处于事务模式confirm信道是否处于 publisher confirm模式consumer_count信道中的消费者个数messages_unacknowledged已投递但是还未被ack的消息个数messages_uncommitted已接收但是还未提交事务的消息个数acks_uncommitted已ack收到但是还未提交事务的消息个数messages_unconfirmed已发送但是还未确认的消息个数perfetch_count消费者的Qos个数限制,0表示无上限global_prefetch_count整个信道的Qos个数限制
12、查看消费者列表 rabbitmqctl list_consumers [-p vhost] 返回列说明arguments参数channel_pid信道进程idconsumer_tag消费者标记prefetch_count消费者的Qos个数限制,0表示无上限queue_name队列名称
(二)、web界面基本操作 1、添加用户

2、创建虚拟主机 ?3、用户与虚拟主机绑定


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #消息队列配置参数 #1 #消息队列的两种模型