概述
随着互联网的发展,软件系统由原来的单体应用转变为分布式应用。分布式系统把一个单体应用拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作。这种分布式系统下不同服务之间通过远程协作完成的事务称之为分布式事务,例如用户注册送积分事务、创建订单减库存事务,银行转账事务等都是分布式事务
举个例子,使用传统本地事务完成转账逻辑,任一步骤出问题都会回滚
begin transaction;
// 1.本地数据库操作:张三减少金额
// 2.本地数据库操作:李四增加金额
commit transation;
但在分布式系统下,就变成这样
begin transaction;
// 1.本地数据库操作:张三减少金额
// 2.远程调用:让李四增加金额
commit transation;
如果执行到第二步,远程调用成功了,李四增加了金额,但因为网络延迟没能及时响应,那么本地系统就会认为事务失败,从而回滚张三减少金额的操作
分布式事务产生的场景
1. 微服务架构
典型的场景就是微服务之间通过远程调用完成事务操作,比如:订单服务和库存服务,下单的同时,订单服务请求库存服务减库存
2. 单体系统访问多个数据库实例
当单体系统需要访问多个数据库时就会产生分布式事务,比如:用户信息和订单信息分别在两个数据库存储,用户管理系统删除用户信息,需要分别删除用户信息及用户订单信息,由于数据分布在不同的数据库,需要通过不同的数据库链接操作数据,产生分布式事务
3. 多服务访问同一个数据库
比如:订单服务和库存服务访问同一个数据库也会产生分布式事务,两个服务持有不同的数据库链接进行操作,产生分布式事务
2PC(两阶段提交)
Two-Phase Commit,两阶段提交,指将整个事务流程分为两个阶段,准备阶段(prepare-phase)、提交阶段(commit-phase)
举例:张三和李四聚餐,饭店老板要求先买单,才能出票,两人就商议 AA。只有张三和李四都付款,老板才能出票安排就餐
- 准备阶段:老板要求张三付款,张三付款,老板要求李四付款,李四付款
- 提交阶段:老板出票,两人拿票就餐
该例形成一个事务,若张三或李四其中一人拒绝付款,或钱不够,老板都不会出票,并且会把已收款退回。整个事务过程由事务管理器和参与者组成,老板就是事务管理器,张三和李四就是事务参与者。事务管理器负责
决策整个分布式事务的提交和回滚,事务参与者负责自己本地事务的提交和回滚
部分关系数据库如 Oracle、MySQL 都支持两阶段提交协议:
- 准备阶段:事务管理器给每个参与者发送 Prepare 消息,每个数据库参与者在本地执行事务,并写本地的 Undo/Redo 日志,此时事务没有提交(Undo 日志记录修改前的数据,用于数据库回滚,Redo 日志记录修改后的数据,用于提交事务后写入数据文件)
- 提交阶段:如果事务管理器收到了参与者的执行失败或者超时消息,直接给每个参与者发送回滚消息;否则,发送提交消息;参与者根据事务管理器的指令执行提交或者回滚操作,并释放事务处理过程中使用的锁资源
XA 规范
2PC 提供了解决分布式事务的方案,但不同的数据库实现却不一样。为了统一标准,国际开放标准组织 Open Group 定义分布式事务的模型(DTP)和 分布式事务协议(XA)
DTP 模型由以下元素组成:
- AP(Application Program):应用程序,可以理解为使用 DTP 分布式事务的程序
- RM(Resource Manager):资源管理器,可以理解为事务的参与者,一般指一个数据库实例,通过资源管理器控制数据库
- TM(Transaction Manager):事务管理器,负责协调和管理事务,事务管理器控制全局事务,管理事务生命周期,并协调各个 RM
XA 规范定义了 RM(资源管理器)与 TM(事务管理器)的交互接口,另外,XA 规范还对 2PC 做了优化,执行流程如下:
- 应用程序(AP)持有用户库和积分库两个数据源
- 应用程序(AP)通过 TM 通知用户库 RM 新增用户,同时通知积分库 RM 为该用户新增积分,此时 RM 并未提交事务,用户和积分资源锁定
- TM 收到执行回复,只要有一方失败则分别向其他 RM 发起回滚事务,回滚完毕,资源锁释放
- TM 收到执行回复,全部成功,此时向所有 RM 发起提交事务,提交完毕,资源锁释放
MySQL 从 5.0.3 开始支持 XA 分布式事务协议,且只有 InnoDB 存储引擎支持,这里通过 JDBC 来演示如何通过 TM 控制多个 RM 完成 2PC 分布式事务
import com.mysql.jdbc.jdbc2.optional.MysqlXAConnection;
import com.mysql.jdbc.jdbc2.optional.MysqlXid;
import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;import java.sql.*;
public class MysqlXAConnectionTest {
public static void main(String[] args) throws SQLException {
// true 表示打印 XA 语句, 用于调试
boolean logXaCommands = true;
// 获得资源管理器操作接口实例 RM1
Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn1, logXaCommands);
XAResource rm1 = xaConn1.getXAResource();
// 获得资源管理器操作接口实例 RM2
Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn2, logXaCommands);
XAResource rm2 = xaConn2.getXAResource();
// AP(应用程序)请求 TM(事务管理器) 执行一个分布式事务, TM 生成全局事务 ID
byte[] gtrid = "distributed_transaction_id_1".getBytes();
int formatId = 1;
try {
// TM 生成 RM1 上的事务分支 ID
byte[] bqual1 = "transaction_001".getBytes();Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
// 执行 RM1 上的事务分支
rm1.start(xid1, XAResource.TMNOFLAGS);
PreparedStatement ps1 = conn1.prepareStatement("INSERT into user(name) VALUES ('jack')");
ps1.execute();
rm1.end(xid1,XAResource.TMSUCCESS);
// TM 生成 RM2 上的事务分支 ID
byte[] bqual2 = "transaction_002".getBytes();Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
// 执行 RM2 上的事务分支
rm2.start(xid2, XAResource.TMNOFLAGS);
PreparedStatement ps2 = conn2.prepareStatement("INSERT into user(name) VALUES ('r服务器托管网ose')");
ps2.execute();
rm2.end(xid2, XAResource.TMSUCCESS);
// phase1: 询问所有的RM 准备提交事务分支
int rm1_prepare = rm1.prepare(xid1);
int rm2_prepare = rm2.prepare(xid2);
// phase2: 提交所有事务分支
if (rm1_prepare == XAResource.XA_OK && rm2_prepare 服务器托管网== XAResource.XA_OK) {
// 所有事务分支都 prepare 成功, 提交所有事务分支
rm1.commit(xid1, false);rm2.commit(xid2, false);
} else {
// 如果有事务分支没有成功, 则回滚
rm1.rollback(xid1);rm1.rollback(xid2);
}
} catch (XAException e) {
e.printStackTrace();
}
}
}
AT 模式
2PC 原理简单,实现方便,但也有缺点:
- 需要本地数据库支持 XA 协议
- 资源锁需要等到两个阶段结束才释放,性能较差
Seata 是由阿里团队研发的开源的分布式事务框架,是工作在应用层的中间件,主要优点是性能较好,且不长时间占用连接资源,以高效并且对业务零侵入的方式解决微服务场景下的分布式事务问题。它提供的 AT 模式在传统 2PC 的基础上进行改进,并解决 2PC 方案面临的问题
Seata 把一个分布式事务理解成一个包含了若干分支事务的全局事务,全局事务的职责是协调其下管理的分支事务达成一致,要么一起成功提交,要么一起失败回滚
与传统 2PC 类似,Seata 定义了三个组件来协议分布式事务的处理过程:
- TC(Transaction Coordinator):事务协调器,它是独立的中间件,需要独立部署运行,它维护全局事务的运行状态,接收 TM 指令发起全局事务的提交与回滚,负责与 RM 通信协调各各分支事务的提交或回滚
- TM(Transaction Manager):事务管理器,TM 需要嵌入应用程序中工作,它负责开启一个全局事务,并最终向 TC 发起全局提交或全局回滚的指令
- RM(Resource Manager):控制分支事务,负责分支注册、状态汇报,并接收事务协调器 TC 的指令,驱动分支(本地)事务的提交和回滚
拿新用户注册送积分举例:
- 用户服务的 TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID
- 用户服务的 RM 向 TC 注册分支事务,该分支事务在用户服务执行新增用户逻辑,并将其纳入 XID 对应全局事务的管辖
- 用户服务执行分支事务,向用户表插入一条记录
- 逻辑执行到远程调用积分服务时(XID 在微服务调用链路的上下文中传播),积分服务的 RM 向 TC 注册分支事务,该分支事务执行增加积分的逻辑,并将其纳入 XID 对应全局事务的管辖
- 积分服务执行分支事务,向积分记录表插入一条记录,执行完毕后,返回用户服务
- 用户服务分支事务执行完毕
- TM 向 TC 发起针对 XID 的全局提交或回滚决议
- TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求
传统 2PC 的 RM 实际上是在数据库层,RM 本质上就是数据库自身,通过 XA 协议实现,而 Seata 的 RM 是以 jar 包的形式作为中间件层部署在应用程序这一侧的。传统 2PC 无论第二阶段的决议是 commit 还是 rollback,事务性资源的锁都要保持到 Phase2 完成才释放,而 Seata 的做法是在 Phase1 就将本地事务提交,这样就可以省去 Phase2 持锁的时间,整体提高效率
有关使用 Seata 实现 2PC 方案可以参考:https://www.cnblogs.com/Yee-Q/p/17744259.html
TCC
TCC 是Try、Confirm、Cancel 三个单词的缩写,TCC 要求每个分支事务实现三个操作:预处理 Try、确认 Confirm、撤销 Cancel。Try 操作做业务检查及资源预留,Confirm 做业务确认操作,Cancel 实现一个与 Try 相反的操作即回滚操作。TM 首先发起所有的分支事务的 try 操作,任何一个分支事务的 try 操作执行失败,TM 就会发起所有分支事务的 Cancel 操作。若 Try 操作全部成功,TM 就会发起所有分支事务的 Confirm 操作,其中 Confirm/Cancel 操作若执行失败,TM 会进行重试
分支事务失败的情况:
TCC 分为三个阶段:
- Try 阶段是做业务检查及资源预留,此阶段仅是一个初步操作,它和后续的 Confirm 一起才能真正构成一个完整的业务逻辑
- Confirm 阶段是做确认提交,Try 阶段所有分支事务执行成功后开始执行 Confirm,通常情况下,采用 TCC 就认为只要 Try 成功,Confirm 就一定成功,若 Confirm 阶段真的出错了,需引入重试机制或人工处理
- Cancel 阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放,通常情况下,采用 TCC 就认为 Cancel 也是一定成功的,若 Cancel 阶段真的出错了,需引入重试机制或人工处理
TM 事务管理器可以实现为独立的服务,也可以让全局事务发起方充当 TM 的角色。TM 在发起全局事务时会生成全局事务记录,全局事务 ID 贯穿整个分布式事务调用链条,用来记录事务上下文,追踪和记录状态
TCC 需要注意三种异常处理:
- 空回滚:是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候没有执行 Try 方法。当故障恢复后,分布式事务进行回滚会调用 Cancel 方法,从而形成空回滚。解决思路是识别出这个空回滚,即需要知道一阶段是否执行。如果执行了,那就是正常回滚;如果没执行,那就是空回滚。可以根据全局事务 ID 和分支事务 ID 新建一张记录表。执行 Try 方法时在表中插入一条记录,表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚,如果该记录不存在,则是空回滚
- 幂等:为了避免 TCC 的提交重试机制引发数据不一致,要求 TCC 的 Try、Confirm 和 Cancel 接口保证幂等,这样不会重复使用或者释放资源。解决思路和空回滚类似,每次执行前都查询状态
- 悬挂:RPC 调用分支事务 Try 时,如果发生网络拥堵,RPC 调用超时,TM 就调用分支事务 Cancel 回滚,可能回滚完成后,之前的 Try 请求才真正到达并执行行,而 Try 方法预留的业务资源,只有该分布式事务才能使用,而分布式事务又已经回滚,即该业务资源后续没法处理了,对于这种情况就称为悬挂。解决思路和空回滚类似,每次执行前都查询状态
如果拿 TCC 事务的处理流程与 2PC 两阶段提交做比较,2PC 通常都是在跨库的 DB 层面,而 TCC 则在应用层面的处理,需要通过业务逻辑来实现。TCC 的优势在于,可以让应用自己定义数据操作的粒度,降低锁冲突、提高吞吐量。不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现 try、confirm、cancel 三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略
可靠消息最终一致性
可靠消息最终一致性方案是指,事务发起方(消息发送者)执行完成本地事务,并将消息发给消息中间件,事务参与方(消息消费者)从消息中间件接收消息,并执行完成本地事务
因为事务发起/参与方和消息中间件都是通过网络通信,网络通信的不确定性有可能导致问题,因此可靠消息最终一致性方案要解决以下几个问题:
-
本地事务与消息发送的原子性问题:
事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息,即本地事务和消息发送要么都成功,要么都失败
先尝试以下操作,先发送消息,再操作数据库:
begin transaction; // 1. 发送 MQ // 2. 数据库操作 commit transation;
这种情况无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,数据库操作失败
如果先进行数据库操作,再发送消息
begin transaction; // 1. 数据库操作 // 2. 发送 MQ commit transation;
这种情况貌似没有问题,如果发送 MQ 消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但 MQ 其实已经正常发送了,同样会导致不一致
-
事务参与方接收消息的可靠性:事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息
-
消息重复消费的问题:若某一个节点消费成功但超时了,此时消息中间件会重复投递此消息,导致消息的重复消费,因此要实现事务参与方的方法幂等性
下面讨论具体的解决方案
1. 本地消息表
通过本地事务保证业务数据的一致性,然后通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除
以注册送积分为例来说明,用户服务负责添加用户,积分服务负责增加积分
交互流程如下:
- 用户服务在本地事务新增用户和增加 积分消息日志(用户表和消息表通过本地事务保证一致)
- 可以启动独立的线程,定时对消息日志表的消息进行扫描并发送至消息中间件,消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试
2. RocketMQ 事务消息方案
RocketMQ 事务消息设计主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题
还以注册送积分的例子来描述,执行流程如下:
- Producer(MQ 发送方)发送事务消息至 MQ Server,MQ Server 将消息状态标记为 Prepared(预备状态),注意此时这条消息消费者(MQ 订阅方)是无法消费的
- MQ Server 接收到 Producer 发送给的消息则回应发送成功,表示 MQ 已接收到消息
- Producer 端执行业务逻辑,即执行添加用户操作,通过本地数据库事务控制
- 若 Producer 本地事务执行成功自动向 MQ Server 发送 commit 消息,MQ Server 接收到 commit 消息后将“增加积分消息”状态标记为可消费,此时 MQ 订阅方(积分服务)正常消费消息;若 Producer 本地事务执行失败则自动向 MQ Server 发送 rollback 消息,MQ Server 接收到 rollback 消息后 将删除“增加积分消息”
- MQ 订阅方(积分服务)消费消息,消费成功则向 MQ 回应 ack,否则将重复接收消息,这里 ack 默认自动回应,即程序执行正常则自动回应 ack
如果执行 Producer 端本地事务过程中,执行端挂掉,或者超时,MQ Server 会不停的询问同组的其他 Producer 来获取事务执行状态,这个过程叫事务回查,MQ Server 会根据事务回查结果来决定是否投递消息。以上主干流程已由 RocketMQ 实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可
最大努力通知
最大努力通知也是一种解决分布式事务的方案,下边是一个是充值的例子:
交互流程如下:
- 账户系统调用充值系统接口
- 充值系统完成支付处理,向账户系统发起充值结果通知,若通知失败,则充值系统发起重试
- 账户系统接收到充值结果通知修改充值状态
- 账户系统未接收到通知,会主动调用充值系统的接口查询充值结果
最大努力通知方案的核心在于,发起通知方通过一定的机制最大努力将业务处理结果通知到接收方,具体包括:
- 消息重复通知机制:因为接收通知方可能没接收到通知,此时要有一定的机制保证重试通知
- 消息校对机制:如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求
最大努力通知与可靠消息一致性有什么不同?
- 解决方案思想不同:可靠消息一致性,消息发送方需要保证将消息发出去,并且将消息发到消息接收方,消息的可靠性关键由消息发送方保证。最大努力通知,发起通知方尽最大的努力将业务处理结果告知接收通知方,但可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在发起通知方
- 两者的业务应用场景不同:可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去
- 技术解决方向不同:可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)
通过对最大努力通知的理解,采用 MQ 的 ack 机制可以实现最大努力通知
方案一:利用 MQ 的 ack 机制由 MQ 向接收通知方发送通知
- 发起通知方将通知发给 MQ,如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果
- 接收通知方监听 MQ 接收消息,业务处理完成回应 ack,若接收通知方没有回应 ack 则 MQ 会重复通知
- 接收通知方可通过消息校对接口来校对消息的一致性
方案二:也是利用 MQ 的 ack 机制,与方案一不同的是由通知程序向接收通知方发送通知
- 发起通知方将通知发给 MQ
- 通知程序监听 MQ,接收 MQ 的消息,通知程序若没有回应 ack 则 MQ 会重复通知
- 通知程序通过互联网接口协议(如 http、webservice)调用接收通知方案接口,完成通知
- 接收通知方可通过消息校对接口来校对消息的一致性
方案一和方案二的不同点:
- 方案一中接收通知方案监听 MQ,此方案是业务应用与内部应用之间的通知
- 方案二中通知程序监听 MQ,收到 MQ 的消息后由通知程序通过互联网接口协议调用接收通知方,此方案是业务应用与外部应用之间的通知,例如支付宝、微信的支付结果通知
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
全面、系统的 PyTorch 学习指南,使读者能够熟练掌握 PyTorch 的基本用法、常用模块和实践技巧。通过学习本文,读者将能够运用 PyTorch 开展各种深度学习任务,如图像分类、自然语言处理和推荐系统等。同时,本文还将介绍 PyTorch 代码优化与…