irpas技术客

SpringCloud Alibaba 使用Seata解决分布式事物_小毕超

irpas 6842

一、Seata

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。 seata提供了AT、TCC、及SAGA(长事务)、XA等模式。是将各分布式分支事务统一为全局事务,通过全局事务的成功与否判断是否需要回滚提交各分支事务。

seata架构中有三大组件: - TC(Transaction Coordinator)事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。 - TM(Transaction Manager)事务管理器 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。 - RM(Resouce Manager)资源管理器:控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

分布式事务解决流程:

TM向TC注册一个全局事务,TC返回全局事务id,XIDXID通过微服务调用链传播RM将本地事务注册为XID到TC的相应全局事务的一个分支。TM通知TC提交或回滚XID所对应的全局事务TC通知XID全局事务下的分支事务提交或回滚 二、使用Seata解决分布式事物

在上篇文章中我们介绍了Seata集群的搭建方式,在做下面的演示前,大家确保已经安装好Seata-server端。

新建两个SpringBoot项目,分别为Order 和 Stock 服务,简单模拟下在Order 中接受订单的信息,调用Stock 服务减库存后,Order 增加一条订单信息。

创建数据库

首先先创建两个数据库:order_db 和 stock_db 库

在order_db 库中创建order_info表:

CREATE TABLE `order_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `order_id` int(11) DEFAULT NULL, `order_name` varchar(255) DEFAULT NULL, `buy_count` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

在stock_db 表中创建 stock_info表:

CREATE TABLE `stock_info` ( `id` int(11) NOT NULL, `name` varchar(255) DEFAULT NULL, `count` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

由于我们使用Seata默认的AT模式,还需要在每个业务库中创建undo_log表:

CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; Stock服务搭建

下面新建 Stock 服务,在pom中引入下面依赖:

<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <exclusions> <exclusion> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.4.2</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.6</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>com.bxc</groupId> <artifactId>common</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>

其中 common 是我们在前面讲解SpringCloud全家桶时创建的公共的包,主要用于Controller的统一返回封装,可以替换为自己的包,修改controller的返回为自己的对象。

其中spring-cloud-starter-alibaba-seata包中的seata目前默认为1.3版本的,由于我seata-server安装的是1.4.2版本的,所以需要替换下版本。

修改配制文件:

server: port: 8082 spring: cloud: nacos: discovery: server-addr: 192.168.40.130:8848 application: name: seata-provider #数据库连接配置 datasource: url: jdbc:mysql://192.168.40.130:3306/provider?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&allowMultiQueries=true&allowPublicKeyRetrieval=true username: root password: root123 type: com.alibaba.druid.pool.DruidDataSource mybatis-plus: mapper-locations: classpath:mapper/*.xml type-aliases-package: com.bxc.seata.entity configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl map-underscore-to-camel-case: false seata: enabled: true application-id: ${spring.application.name} tx-service-group: default_tx_group # 事务群组(可以每个应用独立取名,也可以使用相同的名字) service: vgroup-mapping: default_tx_group: default # TC 集群(必须与seata-server保持一致) enable-degrade: false # 降级开关 disable-global-transaction: false # 禁用全局事务(默认false) grouplist: default: 192.168.40.130:8091 transport: shutdown: wait: 3 thread-factory: boss-thread-prefix: NettyBoss worker-thread-prefix: NettyServerNIOWorker server-executor-thread-prefix: NettyServerBizHandler share-boss-worker: false client-selector-thread-prefix: NettyClientSelector client-selector-thread-size: 1 client-worker-thread-prefix: NettyClientWorkerThread type: TCP server: NIO heartbeat: true serialization: seata compressor: none enable-client-batch-send-request: true # 客户端事务消息请求是否批量合并发送(默认true) config: type: nacos nacos: namespace: serverAddr: 192.168.40.130:8848 group: SEATA_GROUP username: "nacos" password: "nacos" cluster: default dataId: seataServer.properties registry: type: nacos nacos: application: seata-server server-addr: 192.168.40.130:8848 group: DEFAULT_GROUP #amespace: username: "nacos" password: "nacos" cluster: default

注意配制中的ip,修改为相应自己的ip,其中tx-service-group要根据seata-server端指定的service.vgroupMapping一致。

主启动类:

@SpringBootApplication @EnableDiscoveryClient @EnableAutoDataSourceProxy public class StockApplication { public static void main(String[] args) { SpringApplication.run(StockApplication.class, args); } }

下面创建一个Dao,用于扣除库存:

@Mapper @Repository public interface StockDao { @Update("update stock_info set count=(count-#{subCount}) where id = #{id} and (count-#{subCount})>=0") Integer subStock(@Param("id") Long id,@Param("subCount") Integer subCount); }

创建Service

public interface StockService { boolean subStock(Long id, Integer subCount); } @Service public class StockServiceImpl implements StockService { @Autowired StockDao stockDao; @Override public boolean subStock(Long id, Integer subCount) { return stockDao.subStock(id, subCount) > 0; } }

最后创建controller 接口:

@RestController public class StockController { @Autowired StockService stockService; @PutMapping("/stock") public ResponseTemplate subStock(@RequestParam("id") Long id,@RequestParam("subCount") Integer subCount) { return stockService.subStock(id, subCount) ? ResSuccessTemplate.builder().build() : ResFailTemplate.builder().build(); } }

Stock服务就简单的搭建好了,主要就是做了向stock_info 表扣除库存的动作,在开始前可以先在stock_info 表中添加一条数据:

INSERT INTO `provider`.`stock` (`id`, `name`, `count`) VALUES ('1', '商品', '20');

下面order服务调用传递id 为1和 subCount 参数即可。

Order服务搭建

Order服务的pom依赖和application.yml配制文件和Stock服务相同,复制过来即可,注意修改端口和连接的数据库,这里我将 Order服务的端口设为8081。

首先创建调用Stock服务的Feign客户端:

@Component @FeignClient(value = "seata-provider") public interface StockClient { @PutMapping("/stock") ResSuccessTemplate subStock(@RequestParam("id") Long id, @RequestParam("subCount") Integer subCount); }

order 的实体和Dao,直接采用MybatisPlus的方式

@Data @TableName("order_info") public class OrderEntity { @TableId(value = "id", type = IdType.AUTO) private Long id; @TableField(value = "order_id") private Long orderId; @TableField(value = "order_name") private String orderName; @TableField(value = "buy_count") private Integer buyCount; } @Mapper @Repository public interface OrderDao extends BaseMapper<OrderEntity> { }

创建测试的Service

@Service public class OrderServiceImpl implements OrderService { @Autowired OrderDao orderDao; @Autowired StockClient stockClient; @Transactional @Override public boolean order(Long id, String name, Integer bugName) { ResponseTemplate responseTemplate = stockClient.subStock(id, bugName); if (responseTemplate.getCode() == 200) { OrderEntity entity = new OrderEntity(); entity.setOrderId(id); entity.setOrderName(name); entity.setBuyCount(bugName); int insert = orderDao.insert(entity); return insert > 0; } return false; } }

下面创建controller 测试入口:

@RestController public class OrderController { @Autowired OrderService orderService; @GetMapping("/order") public ResponseTemplate order(){ return orderService.order(1L,"商品",2)? ResSuccessTemplate.builder().build(): ResFailTemplate.builder().build(); } }

到此Order服务也已经搭建成功,启动Order服务。

在浏览器中访问:http://localhost:8081/order 我们没有主动抛出错误,这里直接访问成功,看下stock_info表和order_info表的数据: stock_info表已经扣除库存2了。 order_info也添加了一条订单信息。

但我们要演示分布式事物,肯定需要在Order端进行报错,我们手动写一个异常:

@Transactional @Override public boolean order(Long id, String name, Integer bugName) { ResponseTemplate responseTemplate = stockClient.subStock(id, bugName); if (responseTemplate.getCode() == 200) { OrderEntity entity = new OrderEntity(); entity.setOrderId(id); entity.setOrderName(name); entity.setBuyCount(bugName); int insert = orderDao.insert(entity); int a = 1/0; return insert > 0; } return false; }

添加了 int a = 1/0; 再次调用上面接口: 查看stock_info表,看到又扣除了两条库存:

查看order_info表发现并没有生成订单: 此时出现了数据不一致的情况,下面我们添加上Seata的 @GlobalTransactional 全局事物注解再次进行测试:

@GlobalTransactional @Transactional @Override public boolean order(Long id, String name, Integer bugName) { ResponseTemplate responseTemplate = stockClient.subStock(id, bugName); if (responseTemplate.getCode() == 200) { OrderEntity entity = new OrderEntity(); entity.setOrderId(id); entity.setOrderName(name); entity.setBuyCount(bugName); int insert = orderDao.insert(entity); int a = 1/0; return insert > 0; } return false; }

再次调用上面接口: 查看stock_info表,看到库存没有减少:

查看order_info表发现也并没有生成订单: 此时就已经保证了数据的一致性。 喜欢的小伙伴可以关注我的个人微信公众号,获取更多学习资料!


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #springcloud #Alibaba #使用Seata解决分布式事物 #一SeataSeata #Seata #将为用户提供了 #ATTCCSAGA