缘由
Spring Cloud 作为微服务框架,工作当中必用。原因也很简单,Spring cloud 依托Springboot,
背靠Spring Framework,又有Eureka 和 Alibaba 两个大厂的支持,大多数的互联网公司会优先选择以此作为后端开发框架。微服务架构下每个服务都会有一个数据库,不同的数据库、不同的服务实例都会产生事务问题。
简述
微服务架构下的功能开发会遇到分布式事务问题,有2PC 3PC TCC等解决方案,基于性能考虑,公司会考虑使用Seata作为分布式事务解决方案,其中提供了基于2PC的AT模式、TCC模式、Saga模式等等,不过最常用的还是AT模式,只需要一个GlobalTransaction注解即可,无业务侵入性。
前文对Seata的客户端 TM RM 进行了解说。本文继续对服务端TC进行解说。
正文
Seata Server 是一个Springboot应用,较早的版本是一个并未使用Spring boot,1.5.2版本的代码已经开始使用 Springboot了。Server端主要起到了TC的角色,对应用中的Seata客户端RM TM进行交互,同样使用Netty作为通信组件,引入了很多的注册中心比如Nacos就在其中,Seata TC就像应用一样,可以启动多个实例,TM RM客户端会在调用TC是进行负载均衡。
日志收集
支持File输出、Kafka、Logstash、Console输出,TC的日志收集可以作为基础设施的扩展进行维护,其稳定性的要求不亚于应用。
Server 启动
ServerRunner是一个启动入口,其中应用启动完成后会执行 Server.start(),启动服务端的Netty,另外设置一个Bean销毁回调,触发避免对注册中心的下线失败。
进入 Server 类,仅有一个start() 作为应用的入口。
- 创建一个线程池作为Netty的工作线程池,同样是可配置的.
- 初始化Session容器,支持 DB Redis File三种模式,可以通过SPI扩展.
- 初始化锁管理器工程的模式,未指定则使用存储模式,可以通过SPI扩展.
- 创建定时任务:删除UndoLog 检查事务超时 异步提交 重试提交 重试回滚.
- 获取本机Ip.
- 加入Spring销毁回调.
- 启动Netty.
- 向注册中心注册.
Netty 启动后,就可以接收RM TM的请求了。TC的功能在 TCInboundHandler 接口中定义:
public interface TCInboundHandler {
// 开始全局事务
GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);
// 提交事务
GlobalCommitResponse handle(GlobalCommitRequest globalCommit, RpcContext rpcContext);
// 回滚事务
GlobalRollbackResponse handle(GlobalRollbackRequest globalRollback, RpcContext rpcContext);
// 注册分支事务
BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);
// 分支事务报告
BranchReportResponse handle(BranchReportRequest branchReport, RpcContext rpcContext);
// 全局锁查询
GlobalLockQueryResponse handle(GlobalLockQueryRequest checkLock, RpcContext rpcContext);
// 全局事务状态
GlobalStatusResponse handle(GlobalStatusRequest globalStatus, RpcContext rpcContext);
// 全局事务报告
GlobalReportResponse handle(GlobalReportRequest globalReport, RpcContext rpcContext);
}
TCInboundHandler 的实现类是DefaultCoordinator,此对象会作为一个Netty Handler,处理Netty请求。
TCInboundHandler 还有一个抽象实现 AbstractTCInboundHandler,为TC的功能处理提供异常机制
Seata抽象了一个事务管理器 TransactionManager 作为全局事务管理器,大多数情况下都会使用AT模式,也是Seata基于2PC开发的一种高性能模式,其TC的功能逻辑在 ATCore 中。
DefaultCoordinator 聚合了 DefaultCore。
public DefaultCore(RemotingServer remotingServer) {
// SPI 加载数据所有的事务管理器实现
List allCore = EnhancedServiceLoader.loadAll(AbstractCore.class,
new Class[] {RemotingServer.class}, new Object[] {remotingServer});
if (CollectionUtils.isNotEmpty(allCore)) {
for (AbstractCore core : allCore) {
// 以分支事务枚举为key,存在Map中,后续可根据分支事务类型获取对应的事务管理器
coreMap.put(core.getHandleBranchType(), core);
}
}
}
# META-INF/services/io.seata.server.coordinator.AbstractCore
io.seata.server.transaction.at.ATCore
io.seata.server.transaction.tcc.TccCore
io.seata.server.transaction.saga.SagaCore
io.seata.server.transaction.xa.XACore
DefaultCore对TransactionManager接口方法的实现,并没有做真正的业务处理,全部调用某个分支事务类的事务处理器进行处理。
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
// 先获取 AbstractCore 的具体实现类,再进行业务处理
return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid,
applicationData, lockKeys);
}
GlobalSession开启
全局事务的开启是第一步,TM向TC获取一个 xid,开启全局事务的逻辑是在 DefaultCore.begin(String applicationId, String transactionServiceGroup, String name, int timeout)中。
具体的实现就是 new GlobalSession(),设置session持久化方式(redis file db),开启Session,记录开始时间,生成并返回 xid.
ipAddress + IP_PORT_SPLIT_CHAR + port + IP_PORT_SPLIT_CHAR + UUIDGenerator.generateUUID()
分支事务注册
SessionHelper.newBranchByGlobal()创建分支事务 branchSession,把 branchId 放入MDC,对分支事务和全局事务加锁,然后将分支事务放入全局事务(持久化和内存都会加入),响应分支事务Id。
branchSessionLock(globalSession, branchSession); // 加锁,只有AT模式才有加锁的实现,其他模式全是空实现
从 branchSession.getApplicationData()
获取 autoCommit skipCheckLock 两个参数进行加锁 branchSession.lock(autoCommit, skipCheckLock)
,lock的调用是 LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock)
,LockerManagerFactory.getLockManager()
获取的是Seata server启动时配置的锁实现(db file redis)
# AbstractLockManager#acquireLock(io.seata.server.session.BranchSession, boolean, boolean)
@Override
public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {
if (branchSession == null) {
throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
}
String lockKey = branchSession.getLockKey();
if (StringUtils.isNullOrEmpty(lockKey)) {
// no lock
return true;
}
// get locks of branch
List locks = collectRowLocks(branchSession);
if (CollectionUtils.isEmpty(locks)) {
// no lock
return true;
}
// 获取锁实现进行加锁
return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
}
redis 的实现非常简单,db 和file,暂不做分析。
@Override
public boolean acquireLock(List rowLocks, boolean autoCommit, boolean skipCheckLock) {
// rowLocks 是请求参数传递过来的
if (CollectionUtils.isEmpty(rowLocks)) {
return true;
}
try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
if (ACQUIRE_LOCK_SHA != null && autoCommit) {
return acquireLockByLua(jedis, rowLocks);
} else {
return acquireLockByPipeline(jedis, rowLocks, autoCommit, skipCheckLock);
}
}
}
分支事务注册后会执行本地事务本地事务提交,会有一个锁的概念,提交本地事务前会对数据加锁,加锁成功才能提交。
全局事务提交
globalSession.asyncCommit(); 提交事务
全局事务回滚
若是第一阶段失败,直接删除分支事务即可,若是第二阶段,则通知RM进行回滚。
总结
Seata Server 部分的逻辑比RM TM的逻辑复杂不少,本文仅涉及AT模式,接下来会对Saga TCC模式进行分析。分布式事务本身就是一个分布式架构中产生的一个难题,功能设计时尽可能的避免产生分布式事务,若不能避免,可以借鉴Seata框架的事务实现,了解其中的运行原理,逐步的推敲高性能的分布式事务解决方案,对项目的稳定性和扩展性至关重要。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net