irpas技术客

chapter22:Spring Cloud Stream的简单使用_咬鱼的胖橘猫

irpas 1781

目录 一、概念二、编码API以及常用注解三、消息的生产方3.1 依赖3.2 配置文件3.3 业务类3.4 结果 四、消息的消费方4.1 依赖4.2 配置文件4.3 业务类 五、消息分组5.1 情景5.2 消息分组80028003

一、概念 统一的消息模型,屏蔽差异性,方便去处理多种消息中间件示例图: 二、编码API以及常用注解

三、消息的生产方 3.1 依赖 <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> </dependencies> 3.2 配置文件 server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: # 配置要绑定的rabbitmq的服务信息 binders: # 定义的名称,用户Binding整合 defaultRabbit: # 消息组件类型 type: rabbit # 设置rabbitmq相关的环境配置 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 信道的名称 output: # 使用的Exchange名称 destination: studyExchange # 设置消息类型,本次为Json,文本使用 text/plain content-type: application/json # 设置要绑定的消息服务的具体设置 把binder 改为default-binder就可以了 binder: defaultRabbit eureka: client: service-url: # 设置与eureka sever交互的地址与查询服务、注册服务 defaultZone: http://localhost:7001/eureka instance: # 修改实例id instance-id: send-8001.com # 访问路径可以显示IP地址 prefer-ip-address: true # Eureka客户端向服务端发送心跳的间隔时间,单位为s(默认30s),这里设置为1s lease-renewal-interval-in-seconds: 1 # Eureka服务端在收到最后一次心跳后等待时间上限,单位为s(默认90s),超时将剔除服务 lease-expiration-duration-in-seconds: 2 3.3 业务类 @EnableBinding(Source.class) public class IMessageProviderImpl implements IMessageProvider { /** * 发送消息通道 @Resource(name = "output") * 注意一下 各位messageChannel 变量名切记莫写成 outPut 不然会报错 */ @Resource(name = "output") private MessageChannel messageChannel; @Override public String sendMessage() { // 因为构建了message对象 把它传给管道Source,然后再给MQ的,不是用返回值 String serial = UUID.randomUUID().toString(); messageChannel.send(MessageBuilder.withPayload(serial).build()); System.out.println("********serial:" + serial); return null; } } 3.4 结果

四、消息的消费方 4.1 依赖 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> 4.2 配置文件 server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: # 配置要绑定的rabbitmq的服务信息 binders: # 定义的名称,用户Binding整合 defaultRabbit: # 消息组件类型 type: rabbit # 设置rabbitmq相关的环境配置 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 信道的名称 input: # 使用的Exchange名称 destination: studyExchange # 设置消息类型,本次为Json,文本使用 text/plain content-type: application/json # 设置要绑定的消息服务的具体设置 把binder 改为default-binder就可以了 binder: defaultRabbit # 消息分组:防止重复消费 group: GroupA eureka: client: service-url: # 设置与eureka sever交互的地址与查询服务、注册服务 defaultZone: http://localhost:7001/eureka instance: # 修改实例id instance-id: consumer-8002.com 4.3 业务类 @Component @EnableBinding(Sink.class) public class ReceiverMessageController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消费者1号,-------------->收到的消息是:" + message.getPayload() + "\t port:" + serverPort); } }

五、消息分组 5.1 情景

就会造成如下结果:

5.2 消息分组 不同组:重复消费相同组:竞争消息 8002

# 消息分组:防止重复消费 group: GroupA

server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: # 配置要绑定的rabbitmq的服务信息 binders: # 定义的名称,用户Binding整合 defaultRabbit: # 消息组件类型 type: rabbit # 设置rabbitmq相关的环境配置 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 信道的名称 input: # 使用的Exchange名称 destination: studyExchange # 设置消息类型,本次为Json,文本使用 text/plain content-type: application/json # 设置要绑定的消息服务的具体设置 把binder 改为default-binder就可以了 binder: defaultRabbit # 消息分组:防止重复消费 group: GroupA eureka: client: service-url: # 设置与eureka sever交互的地址与查询服务、注册服务 defaultZone: http://localhost:7001/eureka instance: # 修改实例id instance-id: consumer-8002.com 8003

# 消息分组:防止重复消费 group: GroupA

server: port: 8803 spring: application: name: cloud-stream-consumer cloud: stream: # 配置要绑定的rabbitmq的服务信息 binders: # 定义的名称,用户Binding整合 defaultRabbit: # 消息组件类型 type: rabbit # 设置rabbitmq相关的环境配置 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 信道的名称 input: # 使用的Exchange名称 destination: studyExchange # 设置消息类型,本次为Json,文本使用 text/plain content-type: application/json # 设置要绑定的消息服务的具体设置 把binder 改为default-binder就可以了 binder: defaultRabbit # 消息分组:防止重复消费 group: GroupA eureka: client: service-url: # 设置与eureka sever交互的地址与查询服务、注册服务 defaultZone: http://localhost:7001/eureka instance: # 修改实例id instance-id: consumer-8003.com

生产者发送3次请求,8002 以及 8003 共同去共同竞争

生产者发送了两条消息给A组 此刻,消费者1和消费者2宕机了 过会,A组的消费者1和消费2重启;这两条消息会被A组成员竞争.

chapter21:Bus消息总线实现动态刷新chapter23:Spring Cloud Sleuth 和 Zipkin 的简单使用


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #chapter22Spring #Cloud #Stream的简单使用 #Spring