irpas技术客

SpringCloud第09讲:消息队列RocketMQ_秦毅翔的专栏_springcloud 消息队列

未知 6299

????????消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

一、MQ使用场景 异步处理流量削峰填谷(比如:秒杀)解耦微服务 二、常见MQ产品对比

参考:https://·/article/290040

三、搭建RocketMQ

参考:https://·/article/290089

第1步:解压

第2步:在mq当前目录打开终端输入命令

??????? nohup sh bin/mqnamesrv &

第3步:测试是否启动成功,命令:

??????? tail -f ~/logs/rocketmqlogs/namesrv.log

第4步:启动Broker,命令

??????? nohup sh bin/mqbroker -n localhost:9876 &

第5步:测试Broker是否启动成功,命令

??????? tail -f ~/logs/rocketmqlogs/broker.log

?????? 或者直接打开目录中自动生成的nohup.out文件看日志结果

停止:以此执行以下两条命令

# 命令

sh bin/mqshutdown broker

# 输出如下信息说明停止成功

The mqbroker(36695) is running...

Send shutdown request to mqbroker(36695) OK

# 命令

sh bin/mqshutdown namesrv

# 输出如下信息说明停止成功

The mqnamesrv(36664) is running...

Send shutdown request to mqnamesrv(36664) OK

?四、搭建RocketMQ控制台

参考:

https://·/article/290092

下载课程提供的懒人包:rocketmq-console-ng-1.0.1.jar

在命令行中启动jar

java -jar rocketmq-console-ng-1.0.1.jar

访问网页:

http://localhost:17890/

控制台使用说明参考:https://github.com/eacdy/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md

五、RocketMQ实现分布式事务

半消息(Half(Prepare)Message):暂时无法消费的消息。生产者将消息发送到了MQServer,但这个消息会被标记为“咱不能投递”状态,先储存起来;消费者不会去消费这条消息。消息回查(Message Status Check):网络断开或生产者重启可能导致丢失事务消息的第二次确认。当MQServer发现消息长时间出于半消息状态时,将向消息生产者发送请求,询问该消息的最终状态(提交或回滚)。

消息的三种状态

Commit:提交事务消息,消费者可以消费此消息Rollback:回滚事务消息,broker会删除该消息,消费者不能消费UNKNOWN:broker需要回查确认该消息的状态 ?六、SpringCloudStream

SpringCloudStream是一款用于构建消息驱动的微服务框架

Spring Cloud Stream编程模型

Destination Binder(目标绑定器):与消息中间件通信的组件Destination Bindings(目标绑定):Binding是连接应用程序跟消息中间件的桥梁,用于消息的消费和上产,由binder创建Message:消息 ?6.1、用SpringCloudStream编写生产者 6.1.1、pom.xml中添加依赖 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> ?6.1.2、BootApplication类添加注解 import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; @EnableBinding(Source.class) public class ContentCenterApplication { ... } 6.1.3、在application.yml中添加属性 spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: #用来指定topic destination: stream-test-topic 6.1.4、在TestController中添加GetMapping @Autowired private Source source; @GetMapping("/test-steam") public String testStream(){ this.source.output() .send( MessageBuilder .withPayload("消息体") .build() ); return "success"; } 6.1.5、运行效果

?发现消息已经被投递到RokectMQ

?修改application.yml配置属性,避免显示SPringCloudStream发送心跳包的日志

#设置日志输出级别 logging: level: com.itmuch.contentcenter.feignclient.UserCenterFeignClient: debug #避免显示SpringCloudStream发送心跳包的日志 com.alibaba.nacos: error ?6.2、SpringCloudStream编写消费者 6.2.1、在pom.xml中添加依赖 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> 6.2.2、在BootApplication中添加注解 import org.springframework.cloud.stream.messaging.Sink; @EnableBinding(Sink.class) public class UserCenterApplication { ... } 6.2.3、在application.yml中添加属性 spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input: #用来指定topic,要和content-center微服务的topic匹配 destination: stream-test-topic #一定要设置,必填项,如果用其他MQ,该属性可以不设置 group: binder-group ?6.2.4、添加测试Service package personal.qin.usercenter.rocketmq;//package com.itmuch.usercenter.rocketmq; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.messaging.support.ErrorMessage; import org.springframework.stereotype.Service; @Service @Slf4j public class TestStreamConsumer { @StreamListener(Sink.INPUT) public void receive(String messageBody){ log.info("通过stream收到了消息:messageBody={}", messageBody); // throw IllegalArgumentException("抛异常") } /** * 全局异常处理 * * @param message 发生异常的消息 */ @StreamListener("errorChannel") public void error(Message<?> message) { ErrorMessage errorMessage = (ErrorMessage) message; log.warn("RocketMQ-SpringCloudStream发生异常,errorMessage={}", errorMessage); } } 6.2.5、测试运行效果 ,可以正常消费消息

七、消息过滤

参考:https://·/article/290424

八、SpringCloudStream异常处理

参考:https://·/article/290435

九、SpringCloudStream+RocketMQ实现生产者分布式事务(事务消息) 9.1、修改application.yml spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 #添加事务控制时使用 bindings: output: producer: #事务消息 transactional: true #要和@RocketMQTransactionListener的txProducerGroup的值一致 group: tx-add-bonus-group 9.2、新建添加积分事务监听类 package com.itmuch.contentcenter.rocketmq; import com.alibaba.fastjson.JSON; import com.itmuch.contentcenter.dao.messaging.RocketmqTransactionLogMapper; import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO; import com.itmuch.contentcenter.domain.entity.messaging.RocketmqTransactionLog; import com.itmuch.contentcenter.service.content.ShareService; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class AddBonusTransactionListener implements RocketMQLocalTransactionListener { private final ShareService shareService; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); Integer shareId = Integer.valueOf((String) headers.get("share_id")); String dtoString = (String) headers.get("dto"); ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class); try { this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); // select * from xxx where transaction_id = xxx RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne( RocketmqTransactionLog.builder() .transactionId(transactionId) .build() ); if (transactionLog != null) { return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.ROLLBACK; } } 9.3、ShareService类 package com.itmuch.contentcenter.service.content; import com.alibaba.fastjson.JSON; import com.itmuch.contentcenter.dao.content.ShareMapper; import com.itmuch.contentcenter.dao.messaging.RocketmqTransactionLogMapper; import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO; import com.itmuch.contentcenter.domain.dto.content.ShareDTO; import com.itmuch.contentcenter.domain.dto.messaging.UserAddBonusMsgDTO; import com.itmuch.contentcenter.domain.dto.user.UserDTO; import com.itmuch.contentcenter.domain.entity.content.Share; import com.itmuch.contentcenter.domain.entity.messaging.RocketmqTransactionLog; import com.itmuch.contentcenter.domain.enums.AuditStatusEnum; import com.itmuch.contentcenter.feignclient.UserCenterFeignClient; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.messaging.Source; import org.springframework.http.ResponseEntity; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.client.RestTemplate; import java.util.Objects; import java.util.UUID; @Slf4j @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class ShareService { private final ShareMapper shareMapper; // private final RestTemplate restTemplate; private final UserCenterFeignClient userCenterFeignClient; private final RocketMQTemplate rocketMQTemplate; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; private final Source source; public ShareDTO findById(Integer id){ Share share= this.shareMapper.selectByPrimaryKey(id); Integer userId = share.getUserId(); //调用用户微服务的/users/{userId} //用HttpGet方法去请求,并返回一个对象 // ResponseEntity<UserDTO> forEntity = restTemplate.getForEntity( // "http://localhost:8080/users/{id}", // UserDTO.class, // userId); // //获取响应的状态码 // HttpStatus statusCode = forEntity.getStatusCode(); // UserDTO userDTO = this.restTemplate.getForObject( // "http://user-center/users/{userId}", //Ribbon会自动把user-center转换成其在nacos上注册的地址,并且进行负载均衡 // UserDTO.class, userId); UserDTO userDTO = this.userCenterFeignClient.findById(userId); //消息的装配 ShareDTO shareDTO = new ShareDTO(); BeanUtils.copyProperties(share, shareDTO); //将share对象中的数据拷贝到shareDTO对象 shareDTO.setWxNickname(userDTO.getWxNickname()); return shareDTO; } public Share auditById(Integer id, ShareAuditDTO auditDTO) { // 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常 Share share = this.shareMapper.selectByPrimaryKey(id); if (share == null) { throw new IllegalArgumentException("参数非法!该分享不存在!"); } if (!Objects.equals("NOT_YET", share.getAuditStatus())) { throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!"); } // 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分 if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) { // 发送半消息。。 String transactionId = UUID.randomUUID().toString(); this.source.output() .send( MessageBuilder .withPayload( UserAddBonusMsgDTO.builder() .userId(share.getUserId()) .bonus(50) .build() ) // header也有妙用... .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .setHeader("share_id", id) .setHeader("dto", JSON.toJSONString(auditDTO)) .build() ); } else { this.auditByIdInDB(id, auditDTO); } return share; } @Transactional(rollbackFor = Exception.class) public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) { Share share = Share.builder() .id(id) .auditStatus(auditDTO.getAuditStatusEnum().toString()) .reason(auditDTO.getReason()) .build(); this.shareMapper.updateByPrimaryKeySelective(share); // 4. 把share写到缓存 } @Transactional(rollbackFor = Exception.class) public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) { this.auditByIdInDB(id, auditDTO); this.rocketmqTransactionLogMapper.insertSelective( RocketmqTransactionLog.builder() .transactionId(transactionId) .log("审核分享...") .build() ); } } 9.4、ShareAdminController类 package com.itmuch.contentcenter.controller.content; //import com.itmuch.contentcenter.auth.CheckAuthorization; import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO; import com.itmuch.contentcenter.domain.entity.content.Share; import com.itmuch.contentcenter.service.content.ShareService; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/admin/shares") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class ShareAdminController { private final ShareService shareService; @PutMapping("/audit/{id}") // @CheckAuthorization("admin") //认证和授权 public Share auditById(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) { return this.shareService.auditById(id, auditDTO); } } 十、SpringCloudStream+RocketMQ实现消费者分布式事务(事务消息) 10.1、修改application.yml spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input: #用来指定topic,要和content-center微服务的topic匹配 destination: add-bonus #一定要设置,必填项,如果用其他MQ,该属性可以不设置 group: binder-group 10.2、UserService类 package com.itmuch.usercenter.service.user; import com.itmuch.usercenter.dao.bonus.BonusEventLogMapper; import com.itmuch.usercenter.dao.user.UserMapper; import com.itmuch.usercenter.domain.dto.messaging.UserAddBonusMsgDTO; import com.itmuch.usercenter.domain.entity.bonus.BonusEventLog; import com.itmuch.usercenter.domain.entity.user.User; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Date; @Slf4j @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class UserService { private final UserMapper userMapper; private final BonusEventLogMapper bonusEventLogMapper; public User findById(Integer id){ return this.userMapper.selectByPrimaryKey(id); } @Transactional(rollbackFor = Exception.class) public void addBonus(UserAddBonusMsgDTO msgDTO) { // 1. 为用户加积分 Integer userId = msgDTO.getUserId(); Integer bonus = msgDTO.getBonus(); User user = this.userMapper.selectByPrimaryKey(userId); user.setBonus(user.getBonus() + bonus); this.userMapper.updateByPrimaryKeySelective(user); // 2. 记录日志到bonus_event_log表里面 this.bonusEventLogMapper.insert( BonusEventLog.builder() .userId(userId) .value(bonus) .event(msgDTO.getEvent()) .createTime(new Date()) .description(msgDTO.getDescription()) .build() ); log.info("积分添加完毕..."); } } 10.3、AddBonusStreamConsumer类 package com.itmuch.usercenter.rocketmq; import com.itmuch.usercenter.dao.bonus.BonusEventLogMapper; import com.itmuch.usercenter.dao.user.UserMapper; import com.itmuch.usercenter.domain.dto.messaging.UserAddBonusMsgDTO; import com.itmuch.usercenter.domain.entity.bonus.BonusEventLog; import com.itmuch.usercenter.domain.entity.user.User; import com.itmuch.usercenter.service.user.UserService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) @Slf4j public class AddBonusStreamConsumer { private final UserService userService; @StreamListener(Sink.INPUT) public void receive(UserAddBonusMsgDTO message) { message.setEvent("CONTRIBUTE"); message.setDescription("投稿加积分.."); this.userService.addBonus(message); } } 10.4、BonusController类 package com.itmuch.usercenter.controller.user; import com.itmuch.usercenter.domain.dto.messaging.UserAddBonusMsgDTO; import com.itmuch.usercenter.domain.dto.user.UserAddBonseDTO; import com.itmuch.usercenter.domain.entity.user.User; import com.itmuch.usercenter.service.user.UserService; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/users") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class BonusController { private final UserService userService; @PutMapping("/add-bonus") public User addBonus(@RequestBody UserAddBonseDTO userAddBonseDTO) { Integer userId = userAddBonseDTO.getUserId(); userService.addBonus( UserAddBonusMsgDTO.builder() .userId(userId) .bonus(userAddBonseDTO.getBonus()) .description("兑换分享...") .event("BUY") .build() ); return this.userService.findById(userId); } } 十一、知识盘点

参考:https://·/article/290489


特别声明:本系列教程(SpringCloudAlibaba)参考自慕课网大目老师提供的网上视频课程,有需要的同学可以自行搜索学习


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #springcloud #消息队列