写在前面的话
好久没有写博客了,不知道写点什么。正好要在公司分享技术,我就想到了好久之前看过的分布式事务的框架seata。趁着这个机会,查漏补缺,整理一篇博客出来,从理论到实战,让我对其设计思想,实际应用又更多一分认识。
什么是Seata?
- Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
Seata术语
- TC (Transaction Coordinator) - 事务协调者: 维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM (Transaction Manager) - 事务管理器: 定义全局事务的范围:开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) - 资源管理器: 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
事务模式-AT模式
本质是对两阶段提交协议的演变
- 一阶段:业务数据和回滚的日志记录在同一个本地事务中提交。
- 二阶段:
- 提交异步化
- 如果发生异常,回滚通过一阶段提交的回滚日志进行反向补偿
写隔离 (重要)
- 一阶段提交本地事务需要拿到全局锁
- 二阶段全局回滚需要拿到对应分支数据的本地锁
这里拿官方文档的示例说明(不是我懒,官方的图挺好的):
- 这里有两个全局事务,
tx1
和tx2
,分别对一张表的m字段进行更新操作 - 由图可知,
tx1
先开启本地事务拿本地锁,执行更新操作update a set m = m - 100 where id = 1
。然后获取全局锁。如果拿到全局事务锁,就提交本地事务,释放本地锁。此后tx2
开启事务,执行更新操作,然后尝试去获取全局锁
,此时全局锁
被tx1
持有,于是tx2
就进行重试并等待全局锁
。
tx1
二阶段全局提交,释放全局锁
,然后tx2
才能拿到全局锁
提交本地事务
如果
tx1
的二阶段全局回滚,那么tx1
需要重新获取其数据(对应分支)的本地锁进行反向补偿。这时如果tx2
还在等待全局锁,同时持有本地锁,那么tx1
拿不到对应数据的本地锁的本地分支就会回滚失败。分支的回滚就会进行重试,直到tx2
的全局锁获取等待超时,于是放弃全局锁,并回滚本地事务释放本地锁,这时tx1
拿到本地锁就回滚成功。由于整个过程
tx1
一直持有全局锁
,所以不会发生赃写
的问题
读隔离 (重要)
- 在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,
Seata(AT 模式)
的默认全局隔离级别是 读未提交(Read Uncommitted) 。如果应用在特定场景下,必需要求全局的 读已提交 ,目前Seata
的方式是通过SELECT FOR UPDATE
语句的代理。
- 如图所示,执行
select for update
时会申请全局锁,如果全局锁被其他事务持有,则释放本地锁并重试。整个过程,查询是block
的,直到拿到全局锁。即读取到的数据是已提交
的,才将数据返回。
工作机制
以一个示例来说明整个 AT 分支的工作过程。
业务表:product
Field | Type | Key |
---|---|---|
id | bigint(20) | PRI |
name | varchar(100) | |
since | varchar(100) |
AT 分支事务的业务逻辑:
update product set name = 'GTS' where name = 'TXC';
一阶段
解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = ‘TXC’)等相关的信息。
查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
select id, name, since from product where name = 'TXC'; ## 得到修改前的镜像 { id: 1, name: TXC, since: 2014 }
得到前镜像
执行业务 SQL:更新这条记录的 name 为 ‘GTS’。
查询后镜像:根据前镜像的结果,通过 主键 定位数据。
select id, name, since from product where id = 1`; ## 得到修改后的镜像 { id: 1, name: GTS, since: 2014 }
插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到
UNDO_LOG
表中。{ "branchId": 641789253, "undoItems": [{ "afterImage": { "rows": [{ "fields": [{ "name": "id", "type": 4, "value": 1 }, { "name": "name", "type": 12, "value": "GTS" }, { "name": "since", "type": 12, "value": "2014" }] }], "tableName": "product" }, "beforeImage": { "rows": [{ "fields": [{ "name": "id", "type": 4, "value": 1 }, { "name": "name", "type": 12, "value": "TXC" }, { "name": "since", "type": 12, "value": "2014" }] }], "tableName": "product" }, "sqlType": "UPDATE" }], "xid": "xid:xxx" }
提交前,向
TC
注册分支:申请product
表中,主键值等于 1 的记录的 全局锁 。本地事务提交:业务数据的更新和前面步骤中生成的
UNDO LOG
一并提交。将本地事务提交的结果上报给
TC
。
二阶段-回滚
收到
TC
的分支回滚请求,开启一个本地事务,执行如下操作。通过
XID
和Branch ID
查找到相应的UNDO LOG
记录。数据校验:拿
UNDO LOG
中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。根据
UNDO LOG
中的前镜像和业务SQL
的相关信息生成并执行回滚的语句:update product set name = 'TXC' where id = 1;
提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
二阶段-提交
- 收到
TC
的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。 - 异步任务阶段的分支提交请求将异步和批量地删除相应
UNDO LOG
记录。
项目实战
这里只是简单的使用。注册中心采用eureka,网关使用gateway,rpc使用
feign
工程结构
├─account-service -- 账户服务 --
├─eureka-server -- 注册中心 --
├─gateway-service -- 网关服务 --
├─goods-service -- 商品服务 --
└─order-service -- 订单服务 --
建表语句
业务sql
create table account
(
id bigint unsigned not null primary key auto_increment comment '主键id',
money decimal(11, 2) not null default 0 comment '账户余额',
gmt_create timestamp not null default current_timestamp comment '创建时间',
gmt_modify timestamp not null default current_timestamp on update current_timestamp comment '修改时间'
) comment '账户表';
create table goods
(
id bigint unsigned not null primary key auto_increment comment '主键id',
name varchar(32) not null default '' comment '商品名称',
stock int unsigned not null default 0 comment '库存数量',
price decimal(11, 2) not null default 0 comment '商品价格',
gmt_create timestamp not null default current_timestamp comment '创建时间',
gmt_modify timestamp not null default current_timestamp on update current_timestamp comment '修改时间'
) comment '商品表';
create table order_detail
(
id bigint unsigned not null primary key auto_increment comment '主键id',
goods_id bigint unsigned not null comment '商品id',
account_id bigint unsigned not null comment '账户id',
price decimal(11, 2) not null default 0 comment '订单价格',
amount int unsigned not null default 1 comment '商品数量',
gmt_create timestamp not null default current_timestamp comment '创建时间',
gmt_modify timestamp not null default current_timestamp on update current_timestamp comment '修改时间'
) comment '订单表';
insert into account(money)
values (10000);
insert into goods(name, stock, price)
values ('华为Meta 30', 10, 5000.00);
回滚日志表
CREATE TABLE `undo_log`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL comment,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
项目配置文件
application.yml
# 在项目中加入seata的事务分组配置
spring:
cloud:
alibaba:
seata:
tx-service-group: eyestarrysky-seata
在Seata
项目提供的file.conf
中加入如下配置
## transaction log store
store {
# 修改数据库配置
## database store
db {
datasource = "druid"
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://xxx:3306/seata"
user = "xxx"
password = "xxx"
}
}
service {
# 这里记得配置事务分组(eyestarrysky-seata)
vgroup_mapping.eyestarrysky-seata = "default"
default.grouplist = "127.0.0.1:8091"
enableDegrade = false
# disable seata
disableGlobalTransaction = false
}
项目启动
启动
seata-server
依次启动注册中心,网关中心和业务工程
业务流程
- 下图解释了
TC
,TM
,RM
在项目中的具体角色和业务流程 (名词解释)
- 调用订单服务下单接口
- 订单服务调用商品服务检查库存
- 订单服务调用账户服务检查账户余额
- 本地事务-生成订单
- 远程事务扣减库存
- 账户服务扣减账户余额
- 简单看下代码,使用非常方便,在
OrderService
中的提交订单方法上加@GlobalTransactional
注解,name
的值应该填充为application.yml
中配置的事务分组名称eyestarrysky-seata,flag
字段方便我在测试接口时控制提交,和回滚
@GlobalTransactional(name = "eyestarrysky-seata", rollbackFor = Exception.class)
public void submitOrder(Long goodsId, Long accountId, Integer amount, Integer flag) {
//查询商品
Goods goods = goodsClient.findById(goodsId);
if (goods == null) {
throw new IllegalArgumentException("商品数据找不到");
}
if (goods.getStock() < amount) {
log.warn("【生成订单失败,商品库存不足】商品信息: {}, 购买数量amount = {}", JSON.toJSONString(goods), amount);
return;
}
BigDecimal orderPrice = goods.getPrice().multiply(BigDecimal.valueOf(amount));
Account account = accountClient.findOneAccount(accountId);
if (account == null) {
throw new IllegalArgumentException("账户找不到");
}
if (account.getMoney().compareTo(orderPrice) < 0) {
log.warn("【生成订单失败,账户余额不足】账户余额: {}, 订单总额amount = {}", account.getMoney(), orderPrice);
return;
}
OrderDetail orderDetail = new OrderDetail();
orderDetail.setAccountId(accountId);
orderDetail.setAmount(amount);
orderDetail.setGoodsId(goodsId);
orderDetail.setPrice(orderPrice);
//1.本地事务-扣减库存
orderDetailMapper.insert(orderDetail);
//2. 远程事务-扣减库存
goodsClient.reduceStock(goodsId, amount);
//3. 远程事务-扣减余额
accountClient.deductionMoney(accountId, orderPrice);
if (flag > 0) {
log.error("【发生异常,事务回滚】");
throw new RuntimeException("回滚事务");
}
}
事务执行中的数据情况
商品数据
{ "id": 1, "price": 5000 "stock": 6 }
账户数据
{ "id": 1, "money": 30000.00, "gmtCreate": "2020-09-08T17:26:44.000+00:00", "gmtModify": "2021-05-28T08:56:33.000+00:00" }
正常下单
# 购买商品一件,订单正常生成
# 商品
{
"id": 1,
"price": 5000
"stock": 5
}
# 账户
{
"id": 1,
"money": 25000.00,
"gmtCreate": "2020-09-08T17:26:44.000+00:00",
"gmtModify": "2021-05-28T08:56:33.000+00:00"
}
日志打印
# 订单生成前,开启事务,并生成xid
i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [172.17.0.2:8091:141593301408030720]
# 生成订单完成 --> 扣减商品库存前
# 此时undo_log表里面写入一条数据
{
"branch_id": 141593374497972224,
"xid": "172.17.0.2:8091:141593301408030720",
"rollback_info": "xxx"
}
# 扣减库存完成,undo_log表多了一条数据,xid是相同的
{
"branch_id": 141594493718634496,
"xid": "172.17.0.2:8091:141593301408030720",
"rollback_info": "xxx"
}
订单下单失败的情况
- 这里我简单的测试了3种情况
- 下单超时情况(全局事务未提交前),回滚成功
- 本地事务发生异常,回滚成功
- 分支事务发生异常,回滚成功
写在最后
- 这篇文章只是对
seata
的简单介绍和应用,后面我会另写一篇文章(埋个坑)来从源码的角度更深入地分析,seata
是如何处理分布式事务的。