irpas技术客

Spring Cloud Stream 消息驱动的配置与使用_Welcome,Everyone.

irpas 4724

Spring Cloud Stream 消息驱动的配置与使用 一、什么是 Stream 消息驱动二、Spring Cloud Stream 消息驱动的设计思想为什么用 Spring Cloud Stream 消息驱动Stream 凭什么可以统一或屏蔽底层差异?Spring Cloud Stream 消息驱动的工作流程 三、快速构建消息驱动生产者、消费者(集群)1、构建消息驱动之生产者2、消息驱动之消费者3、故障1:重复消费问题(重点)Stream 消息分组(Group)解决重复消费问题 4、故障2:消息错过Stream 之消息持久化(消息分组实现)

一、什么是 Stream 消息驱动

官方文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/index.html

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。应用程序通过 inputs(消息消费者) 或者 outputs (消息提供者)来与 Spring Cloud Stream 中 binder 对象(绑定器)交互。通过配置来 binding(绑定) ,而 Spring Cloud Stream 的 binder 对象负责与消息中间件交互。所以,只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动。

Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

总的一句话:Spring Cloud Stream 是屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

同时,Spring Cloud Stream 目前仅支持 RabbitMQ、Kafka。

二、Spring Cloud Stream 消息驱动的设计思想

讲到这里就不得不提到 标准的 MQ即没有引入 Spring Cloud 之前的最原始的消息中间件 ActiveMQ、RabbitMQ等等:

生产者/消费者之间靠消息媒介 Message 传递信息内容。

消息必须走特定的 消息通道 MessageChannel。

消息通道 MessageChannel 里的子接口 Subscribable Channel 来消费消息,由 MessageHandler 负责收发处理。

Stream 的消息通信方式遵循的发布-订阅模式。(Outpu:消息生产者,Input:消息消费者)

为什么用 Spring Cloud Stream 消息驱动

比如我们所用到的 RabbitMQ 和 Kafka,由于这两个消息中间件的架构上的不同,像 RabbitMQ 有 exchange 交换机,kafka 有 Topic 主题和 Partitions 分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream 给我们提供了一种解耦合的方式。

Stream 凭什么可以统一或屏蔽底层差异?

在没有 binder 绑定器这个概念下,我们的 SpringBoot 应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。而通过定义绑定器 binder 作为中间层,就完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件实现。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

Spring Cloud Stream 消息驱动的工作流程

Source、Sink:可简单的理解为参照对象是 Spring Cloud Stream 自身,从 Stream 发布消息就是输出(作为 Source 源),接收消息就是输入(Sink)。

Channel:通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,同时可通过 Channel 对队列进行配置。

Binder:用来连接 MQ 消息中间件(RabbitMQ、Kafka)屏蔽差异。

三、快速构建消息驱动生产者、消费者(集群)

先介绍一下常用的API、注解

组成说明MiddleWare中间件,目前只支持 RabbitMQ 和 KafkaBinderBinder是应用与消息中间件之间的封装,目前实现了 Kafka 和 RabbitMQ 的 Binder,通过 Binder 可以很方便的连接中间件,可以动态的改变消息类型(对应 Kafka 的 Topic,RabbitMQ 的 exchange),这些都可以通过配置文件来实现。@Input注解标识输入通道,通过该输入通道接收到消息进入应用程序@Output注解标识输出通道,发布的消息将通过该通道离开应用程序@StreamListener监听队列,用于消费者的队列的消息接收@EnableBinder指信道 exchange 和 channel 绑定在一起
1、构建消息驱动之生产者

1)创建一个 cloud-stream-rabbitmq-provider-8801 微服务模块

2)编写 pom.xml 文件

<dependencies> <!-- Stream 整合 RabbitMQ --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--基础配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>

3)编写 application.yml 配置文件

server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于 binding整合 type: rabbit # 消息组件类型 environment: # 设置 rabbitmq 的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的 Exchange名称定义 content-type: application/json # 设置消息类型,本次为 json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: send-8801.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址

4)编写主启动类 StreamMQProviderMain8801.class

/** * @Author Herz * @Date 2022/1/20 15:03 */ @SpringBootApplication public class StreamMQProviderMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQProviderMain8801.class, args); } }

5)编写 service 发送消息接口

/** * * 发送消息的接口 * * @Author Herz * @Date 2022/1/20 15:04 */ public interface IMessageProvider { public String send(); }

6)编写 service 的实现类

/** * * 发送消息的接口的实现类,用于和 消息中间件打交道 * * * @Author Herz * @Date 2022/1/20 15:04 */ @EnableBinding(Source.class) // 可以理解为是一个消息的发送管道 消息源 的定义 public class IMessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; // 消息的发送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); // 创建并发送消息 this.output.send(MessageBuilder.withPayload(serial).build()); System.out.println("***serial: "+serial); return serial; } }

注意!!这里的 MessageChannel 的对象名必须和 yml 配置文件的通道名称相同否则就会报错:expected single matching bean but found 3: output,nullChannel,errorChannel

这两个名称必须相同!!!

7)测试消息生产者是否配置成功:启动主启动类,启动 RabbitMQ,在地址栏输入 http://localhost:15672/并登录

如果在 exchange 里面发现与自己 yml 配置文件中 spring.cloud.stream.binders.output.destination 属性配置的名称一致的交换机名称说明已经配置成功。

同时,多次访问 http://localhost:8801/send/message,可在 RabbitMQ 首页波峰的变化。

2、消息驱动之消费者

1)创建微服务模块 cloud-stream-rabbitmq-consumer-8802

2)编写 pom.xml 文件

<dependencies> <!-- Stream 整合 RabbitMQ --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--基础配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>

3)编写 application.yml 配置文件

server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于 binding整合 type: rabbit # 消息组件类型 environment: # 设置 rabbitmq 的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称,input 代表消息输入 destination: studyExchange # 表示要使用的 Exchange名称定义 # 一定要和 消息驱动生产者的 配置文件中的名称相同 content-type: application/json # 设置消息类型,本次为 json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: receive-8802.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址

4)编写主启动类 StreamMQConsumerMain8802.class

/** * @Author Herz * @Date 2022/1/20 16:40 */ @SpringBootApplication public class StreamMQConsumerMain8802 { public static void main(String[] args) { SpringApplication.run(StreamMQConsumerMain8802.class, args); } }

5)编写 用来接收消息的 service

/** * @Author Herz * @Date 2022/1/20 16:41 */ @Component @EnableBinding(Sink.class) public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input( // this.output.send(MessageBuilder.withPayload(serial).build()); 消息驱动生产者的 service 中创建并发送消息 // Message<T> 的泛型就取决于 withPayload 的消息类型。 Message<String> message){ System.out.println("消费者1号,------->接收到的消息:" + message.getPayload()+"\t port: "+serverPort); } }

6)启动主启动类并测试:浏览器输入地址 http://localhost:8801/send/message,刷新 5 次,看控制台结果

消息驱动生产者:

消息驱动消费者:

3、故障1:重复消费问题(重点)

当消费者是集群的时候,面临一个很严重的问题:重复消费问题。

根据 cloud-stream-rabbitmq-consumer-8802 模块再构建一个 cloud-stream-rabbitmq-consumer-8803 消息驱动消费者 模块 构成消费者集群。

为什么说 重复消费 是一个很严重的问题呢???

比如,在以下场景:订单系统为集群部署,都需要从 RabbitMQ 中获取订单消息,如果在商城系统中下的一个订单信息同时被两个服务获取到,就会导致该订单在支付金额时重复扣款,关于钱这方面肯定是很严重的问题!!所以必须要避免这种情况。

那么怎样避免这种情况呢?这个时候 Stream 中的消息分组就起到了作用。因为在 Stream 中处于同一个 Group 中的多个消费者是竞争关系,就能够保证同一条消息只会被其中一个应用消费一次。而不同组是可以全面消费的(即重复消费)。

Stream 消息分组(Group)解决重复消费问题

原理:微服务应用放置于同一个 group 中,就能够保证消息只会被其中一个应用消费一次。

配置实现:只需要在 yml 配置文件中添加如下一行配置即可(group 名相同即可实现同一分组)

group: testA

添加位置如图所示:

两个消费者的 yml 配置文件添加完后重启后,可在 RabbitMQ 中查看:

4、故障2:消息错过 Stream 之消息持久化(消息分组实现)

一方面消息分组不仅可以解决消息的重复消费问题,另一方面也可以实现消息持久化,解决消息错过的问题。

那么,何为消息错过呢???即当消息驱动的消费者中途服务出现故障或者其他的原因暂停服务后,等待可重启后不会接收到这个期间消息驱动的生产者所提供的所有消息,这就是消息错过。(在未配置分组时)

解决办法:给消息驱动的消费者添加分组,这样也就实现了消息的持久化。

即在该微服务的 yml 配置文件中添加如下一行配置信息:

# 分组名可自定义 group: testA

添加的位置如图所示:


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Spring #Cloud #stream #消息驱动的配置与使用 #消息驱动的配置与使用一什么是