SpringCloud Alibaba使用Seata处理分布式事务的技巧

Seata简介

在传统的单体项目中,我们使用@Transactional注解就能实现基本的ACID事务了。但是前提是:1) 数据库支持事务(如:MySQL的innoDB引擎)2) 所有业务都在同一个数据库中执行

随着微服务架构的引入,需要对数据库进行分库分表,每个服务拥有自己的数据库,这样传统的事务就不起作用了,那么我们如何保证多个服务中数据的一致性呢?

这样就出现了分布式事务,而Seata就是为微服务架构而生的一种高性能、易于使用的分布式事务解决方案。

Seata 中有三个基础组件:

    Transaction Coordinator(TC协调者):维护全局和分支事务的状态,驱动全局提交或回滚。 Transaction Manager(TM事务管理):定义全局事务的范围,开启、提交或回滚一个全局事务。 Resource Manager(RM资源管理):管理分支事务资源,与 TC 通讯并报告分支事务状态,管理本地事务的提交与回滚。

可以这么说一个分布式事务就是全局事务GlobalTransaction,而全局事务是由一个个的分支事务组成的,每个分支事务就是一个本地事务。

Seata的生命周期

    TM 要求 TC 生成一个全局事务,并由 TC 生成一个全局事务XID 返回。 XID 通过微服务调用链传播。 RM 向 TC 注册本地事务,将其注册到 ID 为 XID 的全局事务中。 TM 要求 TC 提交或回滚XID 对应的全局事务。 TC 驱动 XID 对应的全局事务对应的所有的分支事务提交或回滚。

Seata安装和配置

安装nacos,本案例使用了nacos作为注册中心https://github.com/alibaba/nacos/releases下载nacos,本文使用的是windows版本1.4.0使用命令行进入bin目录,以单机模式启动nacos

startup -m standalone

安装和配置Seata

http://seata.io/zh-cn/blog/download.html下载Seata,这里是Windows版本的1.4.0解压后,进入conf目录,配置file.conf和registry.conf两个文件

file.conf主要是数据库的配置,配置如下

registry.conf 是注册中心的配置

另外conf目录中还需要一个脚本文件:nacos-config.sh 用于对nacos进行初始化配置在seata1.4.0中是没有的,需要自行创建,内容如下:

#!/usr/bin/env bash# Copyright 1999-2019 Seata.io Group.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at、##      http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.while getopts ":h:p:g:t:u:w:" optdo  case $opt in  h)    host=$OPTARG    ;;  p)    port=$OPTARG    ;;  g)    group=$OPTARG    ;;  t)    tenant=$OPTARG    ;;  u)    username=$OPTARG    ;;  w)    password=$OPTARG    ;;  ?)    echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "    exit 1    ;;  esacdoneurlencode() {  for ((i=0; i < ${#1}; i++))  do    char="${1:$i:1}"    case $char in    [a-zA-Z0-9.~_-]) printf $char ;;    *) printf '%%%02X' "'$char" ;;    esac  done}if [[ -z ${host} ]]; then    host=localhostfiif [[ -z ${port} ]]; then    port=8848fiif [[ -z ${group} ]]; then    group="SEATA_GROUP"fiif [[ -z ${tenant} ]]; then    tenant=""fiif [[ -z ${username} ]]; then    username=""fiif [[ -z ${password} ]]; then    password=""finacosAddr=$host:$portcontentType="content-type:application/json;charset=UTF-8"echo "set nacosAddr=$nacosAddr"echo "set group=$group"failCount=0tempLog=$(mktemp -u)function addConfig() {  curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$(urlencode $1)&group=$group&content=$(urlencode $2)&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null  if [[ -z $(cat "${tempLog}") ]]; then    echo " Please check the cluster status. "    exit 1  fi  if [[ $(cat "${tempLog}") =~ "true" ]]; then    echo "Set $1=$2 successfully "  else    echo "Set $1=$2 failure "    (( failCount++ ))  fi}count=0for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do  (( count++ ))  key=${line%%=*}    value=${line#*=}  addConfig "${key}" "${value}"doneecho "========================================================================="echo " Complete initialization parameters,  total-count:$count ,  failure-count:$failCount "echo "========================================================================="if [[ ${failCount} -eq 0 ]]; then  echo " Init nacos config finished, please start seata-server. "else  echo " init nacos config fail. "fi

在seata的根目录,与conf同级的目录下,还需要config.txt 配置文件,默认也是没有的

只需要对mysql的配置进行修改

完整文件:

transport.type=TCPtransport.server=NIOtransport.heartbeat=truetransport.enableClientBatchSendRequest=truetransport.threadFactory.bossThreadPrefix=NettyBosstransport.threadFactory.workerThreadPrefix=NettyServerNIOWorkertransport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandlertransport.threadFactory.shareBossWorker=falsetransport.threadFactory.clientSelectorThreadPrefix=NettyClientSelectortransport.threadFactory.clientSelectorThreadSize=1transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThreadtransport.threadFactory.bossThreadSize=1transport.threadFactory.workerThreadSize=defaulttransport.shutdown.wait=3service.vgroupMapping.my_test_tx_group=defaultservice.default.grouplist=127.0.0.1:8091service.enableDegrade=falseservice.disableGlobalTransaction=falseclient.rm.asyncCommitBufferLimit=10000client.rm.lock.retryInterval=10client.rm.lock.retryTimes=30client.rm.lock.retryPolicyBranchRollbackOnConflict=trueclient.rm.reportRetryCount=5client.rm.tableMetaCheckEnable=falseclient.rm.tableMetaCheckerInterval=60000client.rm.sqlParserType=druidclient.rm.reportSuccessEnable=falseclient.rm.sagaBranchRegisterEnable=falseclient.rm.tccActionInterceptorOrder=-2147482648client.tm.commitRetryCount=5client.tm.rollbackRetryCount=5client.tm.defaultGlobalTransactionTimeout=60000client.tm.degradeCheck=falseclient.tm.degradeCheckAllowTimes=10client.tm.degradeCheckPeriod=2000client.tm.interceptorOrder=-2147482648store.mode=filestore.lock.mode=filestore.session.mode=filestore.publicKey=xxstore.file.dir=file_store/datastore.file.maxBranchSessionSize=16384store.file.maxGlobalSessionSize=512store.file.fileWriteBufferCacheSize=16384store.file.flushDiskMode=asyncstore.file.sessionReloadReadSize=100store.db.datasource=druidstore.db.dbType=mysqlstore.db.driverClassName=com.mysql.jdbc.Driverstore.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=truestore.db.user=rootstore.db.password=123456store.db.minConn=5store.db.maxConn=30store.db.globalTable=global_tablestore.db.branchTable=branch_tablestore.db.queryLimit=100store.db.lockTable=lock_tablestore.db.maxWait=5000store.redis.mode=singlestore.redis.single.host=127.0.0.1store.redis.single.port=6379store.redis.sentinel.masterName=xxstore.redis.sentinel.sentinelHosts=xxstore.redis.maxConn=10store.redis.minConn=1store.redis.maxTotal=100store.redis.database=0store.redis.password=xxstore.redis.queryLimit=100server.recovery.committingRetryPeriod=1000server.recovery.asynCommittingRetryPeriod=1000server.recovery.rollbackingRetryPeriod=1000server.recovery.timeoutRetryPeriod=1000server.maxCommitRetryTimeout=-1server.maxRollbackRetryTimeout=-1server.rollbackRetryTimeoutUnlockEnable=falseserver.distributedLockExpireTime=10000client.undo.dataValidation=trueclient.undo.logSerialization=jacksonclient.undo.onlyCareUpdateColumns=trueserver.undo.logSaveDays=7server.undo.logDeletePeriod=86400000client.undo.logTable=undo_logclient.undo.compress.enable=trueclient.undo.compress.type=zipclient.undo.compress.threshold=64klog.exceptionRate=100transport.serialization=seatatransport.compressor=nonemetrics.enabled=falsemetrics.registryType=compactmetrics.exporterList=prometheusmetrics.exporterPrometheusPort=9898

在conf目录中,使用Git Bash进入命令行,输入

sh nacos-config.sh 127.0.0.1

这是对Seata进行初始化配置,上图表示所有配置都成功设置了在nacos中可以看到出现了seata相关的配置

接下来在seata数据库中,新建三个表

drop table if exists `global_table`;create table `global_table` (  `xid` varchar(128)  not null,  `transaction_id` bigint,  `status` tinyint not null,  `application_id` varchar(32),  `transaction_service_group` varchar(32),  `transaction_name` varchar(128),  `timeout` int,  `begin_time` bigint,  `application_data` varchar(2000),  `gmt_create` datetime,  `gmt_modified` datetime,  primary key (`xid`),  key `idx_gmt_modified_status` (`gmt_modified`, `status`),  key `idx_transaction_id` (`transaction_id`));drop table if exists `branch_table`;create table `branch_table` (  `branch_id` bigint not null,  `xid` varchar(128) not null,  `transaction_id` bigint ,  `resource_group_id` varchar(32),  `resource_id` varchar(256) ,  `lock_key` varchar(128) ,  `branch_type` varchar(8) ,  `status` tinyint,  `client_id` varchar(64),  `application_data` varchar(2000),  `gmt_create` datetime,  `gmt_modified` datetime,  primary key (`branch_id`),  key `idx_xid` (`xid`));drop table if exists `lock_table`;create table `lock_table` (  `row_key` varchar(128) not null,  `xid` varchar(96),  `transaction_id` long ,  `branch_id` long,  `resource_id` varchar(256) ,  `table_name` varchar(32) ,  `pk` varchar(36) ,  `gmt_create` datetime ,  `gmt_modified` datetime,  primary key(`row_key`));

在项目相关的数据库中,新建表undo_log 用于记录撤销日志

CREATE TABLE `undo_log` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `branch_id` bigint(20) NOT NULL,  `xid` varchar(100) NOT NULL,  `context` varchar(128) NOT NULL,  `rollback_info` longblob NOT NULL,  `log_status` int(11) NOT NULL,  `log_created` datetime NOT NULL,  `log_modified` datetime NOT NULL,  `ext` varchar(100) DEFAULT NULL,  PRIMARY KEY (`id`),  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

最后在bin目录中,启动命令行,执行seata-server.bat 启动Seata服务

项目应用Seata

SpringCloud项目中有两个服务:订单服务和库存服务,基本业务是:

购买商品 插入订单 减少库存

订单详情表

DROP TABLE IF EXISTS `tb_order_detail`;CREATE TABLE `tb_order_detail` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '订单详情id ',  `order_id` bigint(20) NOT NULL COMMENT '订单id',  `sku_id` bigint(20) NOT NULL COMMENT 'sku商品id',  `num` int(11) NOT NULL COMMENT '购买数量',  `title` varchar(256) NOT NULL COMMENT '商品标题',  `own_spec` varchar(1024) DEFAULT '' COMMENT '商品动态属性键值集',  `price` bigint(20) NOT NULL COMMENT '价格,单位:分',  `image` varchar(128) DEFAULT '' COMMENT '商品图片',  PRIMARY KEY (`id`),  KEY `key_order_id` (`order_id`) USING BTREE) ENGINE=MyISAM AUTO_INCREMENT=131 DEFAULT CHARSET=utf8 COMMENT='订单详情表';

库存表

DROP TABLE IF EXISTS `tb_stock`;CREATE TABLE `tb_stock` (  `sku_id` bigint(20) NOT NULL COMMENT '库存对应的商品sku id',  `seckill_stock` int(9) DEFAULT '0' COMMENT '可秒杀库存',  `seckill_total` int(9) DEFAULT '0' COMMENT '秒杀总数量',  `stock` int(9) NOT NULL COMMENT '库存数量',  PRIMARY KEY (`sku_id`)) ENGINE=MyISAM DEFAULT CHARSET=utf8 COMMENT='库存表,代表库存,秒杀库存等信息';

父项目定义了springboot、springcloud、springcloud-alibaba的版本

<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.3.10.RELEASE</version>    <relativePath/> <!-- lookup parent from repository --></parent><dependencyManagement>    <dependencies>        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-alibaba-dependencies</artifactId>            <version>0.9.0.RELEASE</version>            <type>pom</type>            <scope>import</scope>        </dependency>        <dependency>            <groupId>com.alibaba.cloud</groupId>            <artifactId>spring-cloud-alibaba-dependencies</artifactId>            <version>2.2.1.RELEASE</version>            <type>pom</type>            <scope>import</scope>        </dependency>        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-dependencies</artifactId>            <version>Hoxton.SR8</version>            <type>pom</type>            <scope>import</scope>        </dependency>    </dependencies></dependencyManagement>

子项目的依赖定义了nacos和seata客户端

<dependency>   <groupId>mysql</groupId>   <artifactId>mysql-connector-java</artifactId>   <scope>runtime</scope></dependency><dependency>   <groupId>com.baomidou</groupId>   <artifactId>mybatis-plus-boot-starter</artifactId>   <version>3.3.2</version></dependency><dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-web</artifactId></dependency><dependency>   <groupId>org.springframework.cloud</groupId>   <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency>   <groupId>com.alibaba.cloud</groupId>   <artifactId>spring-cloud-starter-alibaba-seata</artifactId>   <exclusions>       <exclusion>           <groupId>io.seata</groupId>           <artifactId>seata-spring-boot-starter</artifactId>       </exclusion>   </exclusions></dependency><dependency>   <groupId>io.seata</groupId>   <artifactId>seata-spring-boot-starter</artifactId>   <version>1.2.0</version></dependency>

子项目配置文件

完整配置

server:  port: 8001spring:  application:    name: stock-service  cloud:    nacos:      discovery:        server-addr: localhost:8848    alibaba:      seata:        enabled: true        enable-auto-data-source-proxy: true        tx-service-group: my_test_tx_group        registry:          type: nacos          nacos:            application: seata-server            server-addr: 127.0.0.1:8848            username: nacos            password: nacos        config:          type: nacos          nacos:            server-addr: 127.0.0.1:8848            group: SEATA_GROUP            username: nacos            password: nacos        service:          vgroup-mapping:            my_test_tx_group: default          disable-global-transaction: false        client:          rm:            report-success-enable: false  datasource:    driver-class-name: com.mysql.cj.jdbc.Driver    url: jdbc:mysql://localhost:3306/eshop?serverTimezone=UTC&useUnicode=true&useSSL=false&characterEncoding=utf8    username: root    password: 123456

库存服务定义了减库存的方法

@RestControllerpublic class StockController {    @Autowired    private IStockService stockService;    @PutMapping("/stock")    public ResponseEntity<Stock> reduceSkuStock(@RequestParam("skuId")Long skuId,                                                @RequestParam("number")Integer number){        Stock stock = stockService.getById(skuId);        if(stock.getStock() < number){            throw new RuntimeException("库存不足,SkuId:" + skuId);        }        stock.setStock(stock.getStock() - number);        stockService.updateById(stock);        return ResponseEntity.ok(stock);    }}

订单服务在插入订单后,使用Feign调用了减库存的服务

@Servicepublic class OrderDetailServiceImpl extends ServiceImpl<OrderDetailMapper, OrderDetail> implements IOrderDetailService {    //库存服务Feign    @Autowired    private StockFeignClient stockFeignClient;//    @Transactional    @GlobalTransactional(rollbackFor = {Exception.class})    @Override    public void makeOrder(OrderDetail orderDetail) {        this.save(orderDetail); //保存订单        int x = 11 / 0; //抛出异常        //减库存        stockFeignClient.reduceSkuStock(orderDetail.getSkuId(),orderDetail.getNum());     }}

插订单和减库存属于两个服务,传统的@Transactional已经不能保证它们的原子性了这里使用了Seata提供的@GlobalTransactional全局事务注解,出现任何异常后都能实现业务回滚。测试用例:

@RunWith(SpringRunner.class)@SpringBootTestpublic class OrderServiceApplicationTests {    @Autowired    private IOrderDetailService orderDetailService;    @Test    public void testOrder() {        OrderDetail orderDetail = new OrderDetail();        orderDetail.setNum(100);        orderDetail.setOrderId(9999L);        orderDetail.setPrice(9999L);        orderDetail.setSkuId(27359021728L);        orderDetail.setTitle(UUID.randomUUID().toString());        orderDetailService.makeOrder(orderDetail);    }}

运行后看到启动了全局事务,发生异常后,两个服务也都能成功回滚。

以上就是SpringCloud Alibaba使用Seata 分布式事务的详细内容,更多关于SpringCloud Alibaba分布式事务的资料请关注其它相关文章!

看天,看雪,安安静静,不言不语都是好风景。

SpringCloud Alibaba使用Seata处理分布式事务的技巧

相关文章:

你感兴趣的文章:

标签云: