我们经常构建需要同时执行以下几项操作的应用程序:调用后端(微)服务、写入数据库、发送 JMS 消息等。但是,如果在调用其中一项时出现错误,会发生什么情况远程资源,例如,如果在调用 Web 服务后数据库插入失败?如果远程服务调用写入数据,您可能会处于全局不一致状态,因为服务已提交其数据,但尚未提交对数据库的调用。在这种情况下,您将需要补偿错误,通常补偿的管理是复杂的并且是手写的。
Red Hat 的 Arun Gupta 在 DZone Getting Started with Microservices Refcard 中撰写了关于不同微服务模式的文章。事实上,这些模式中的大多数都显示了一个调用多个其他微服务的微服务。在所有这些情况下,全局数据一致性变得相关,即确保对微服务的后者调用之一的失败得到补偿,或者重新尝试调用的提交,直到所有微服务中的所有数据再次一致.在其他关于微服务的文章中,通常很少或根本没有提到跨远程边界的数据一致性,例如标题为“ 微服务不是免费的午餐 ”的好文章,作者只是用“ 当事情必须发生时 ”的陈述来触及问题 ...事务性...事情变得复杂,因为我们需要管理...分布式事务以将各种操作联系在一起。 ”确实我们这样做了,但在此类文章中从未提及如何做到这一点。
在分布式环境中管理一致性的传统方法是使用分布式事务。事务管理器已到位以监督全局系统是否保持一致。已经开发了像两阶段提交这样的协议来标准化这个过程。 JTA、JDBC 和 JMS 是使应用程序开发人员能够保持多个数据库和消息服务器一致的规范。 JCA 是一种规范,允许开发人员围绕企业信息系统 (EIS) 编写包装器。在 最近的一篇文章 中,我写了一篇关于如何构建通用 JCA 连接器的文章,它允许您将对微服务的调用之类的东西绑定到这些全局分布式事务中,这样您就不必编写自己的框架代码来处理期间的故障分布式事务。连接器负责确保您的数据 最终是一致的 。
但是您不会总是能够访问支持 JCA 的完整 Java EE 应用程序服务器,尤其是在微服务环境中,因此我现在扩展了该库以包括在以下环境中自动处理提交/回滚/恢复:
- 弹簧靴
- Spring + Tomcat / 码头
- Servlets + Tomcat / 码头
- 春批
- 独立的 Java 应用程序
为了能够做到这一点,应用程序需要使用 JTA 兼容的事务管理器,即 Atomikos 或 Bitronix 之一。
以下描述基于您已阅读 较早的博客文章 这一事实。
设置远程调用以使其在事务中登记的过程类似于使用早期博客文章中介绍的 JCA 适配器。有两个步骤:1) 在传递给从
BasicTransactionAssistanceFactory
类检索的
TransactionAssistant
对象的回调中调用远程服务,以及 2) 设置中央提交/回滚处理程序。
第一步,即属于 执行阶段 的代码(见之前的博文),如下(使用Spring时):
@Service
@Transactional
public class SomeService {
@Autowired @Qualifier("xa/bookingService")
BasicTransactionAssistanceFactory bookingServiceFactory;
public String doSomethingInAGlobalTransactionWithARemoteService(String username) throws Exception {
//write to say a local database...
//call a remote service
String msResponse = null;
try(TransactionAssistant transactionAssistant = bookingServiceFactory.getTransactionAssistant()){
msResponse = transactionAssistant.executeInActiveTransaction(txid->{
BookingSystem service = new BookingSystemWebServiceService().getBookingSystemPort();
return service.reserveTickets(txid, username);
});
}
return msResponse;
}
}
清单 1:在事务中调用 Web 服务
第 5-6 行提供第 13 行用于获取
TransactionAssistant
工厂实例。请注意,您必须确保此处使用的名称与下面清单 3 中设置期间使用的名称相同。这是因为当事务提交或回滚时,事务管理器需要找到用于提交或补偿第 16 行调用的相关回调。您的应用程序中很可能会有多个这样的远程调用,并且对于您集成的每个远程服务,您必须编写如清单 1 所示的代码。注意这段代码与使用 JDBC 调用数据库没有什么不同。对于您登记到事务中的每个数据库,您需要:
- 注入数据源(类似于第 5-6 行)
- 从数据源获取连接(第 13 行)
- 创建语句(第 14 行)
- 执行语句(第 15-16 行)
- 关闭连接(第 13 行,当 try 块调用自动关闭资源的 close 方法时)。使用完交易助手后,在交易完成之前 关闭交易助手非常重要 。
为了创建 BasicTransactionAssistanceFactory 的实例
(清单 1 中的第 5-6 行),我们使用 Spring
@Configuration
:
@Service
@Transactional
public class SomeService {
@Autowired @Qualifier("xa/bookingService")
BasicTransactionAssistanceFactory bookingServiceFactory;
public String doSomethingInAGlobalTransactionWithARemoteService(String username) throws Exception {
//write to say a local database...
//call a remote service
String msResponse = null;
try(TransactionAssistant transactionAssistant = bookingServiceFactory.getTransactionAssistant()){
msResponse = transactionAssistant.executeInActiveTransaction(txid->{
BookingSystem service = new BookingSystemWebServiceService().getBookingSystemPort();
return service.reserveTickets(txid, username);
});
}
return msResponse;
}
}
清单 2:Spring 的
@Configuration
,用于创建工厂
清单 2 的第 4 行使用与清单 1 的第 5 行的
@Qualifier
中相同的名称。清单 2 的第 5 行中的方法通过在 JNDI 中查找来创建一个工厂,在本例中使用 Bitronix。使用 Atomikos 时代码看起来略有不同 - 有关详细信息,请参阅
demo/genericconnector-demo-springboot-atomikos
项目。
上面提到的第二步是设置提交/回滚回调。当提交或回滚清单 1 第 8-20 行周围的事务时,事务管理器将使用它。请注意,由于清单 1 第 2 行的
@Transactional
批注,存在一个事务。此设置如清单 3 所示:
@Service
@Transactional
public class SomeService {
@Autowired @Qualifier("xa/bookingService")
BasicTransactionAssistanceFactory bookingServiceFactory;
public String doSomethingInAGlobalTransactionWithARemoteService(String username) throws Exception {
//write to say a local database...
//call a remote service
String msResponse = null;
try(TransactionAssistant transactionAssistant = bookingServiceFactory.getTransactionAssistant()){
msResponse = transactionAssistant.executeInActiveTransaction(txid->{
BookingSystem service = new BookingSystemWebServiceService().getBookingSystemPort();
return service.reserveTickets(txid, username);
});
}
return msResponse;
}
}
清单 3:设置提交/回滚处理程序
第 12 行将回调连同清单 1 和清单 2 中使用的唯一名称一起传递给配置器。
如果您正在集成的服务仅提供执行方法和该执行的补偿方法,则第 9 行的提交很可能是空的。此提交回调来自两阶段提交,其目的是将分布式系统不一致的时间保持在绝对最小值。请参阅本文末尾的讨论。
第 5 行和第 9 行实例化一个新的 Web 服务客户端。请注意,回调处理程序
应该是无状态的
!它是可序列化的,因为在某些平台上,例如 Atomikos,它将与事务信息一起被序列化,以便在必要时可以在恢复期间调用它。我想你可以让它有状态,只要它保持可序列化,但我建议让它无状态。
传递给第 4 行和第 8 行回调的事务 ID(名为
txid
的字符串)在此示例中传递给 Web 服务。在一个更实际的示例中,您将使用该 ID 来查找您在执行阶段保存的上下文信息(参见清单 1 的第 15 和 16 行)。然后,您将使用该上下文信息(例如来自先前对 Web 服务的调用的参考号)来调用以提交或回滚清单 1 中进行的 Web 服务调用。
这些清单的独立变体,例如在 Spring 环境之外使用这个库,除了您需要手动管理事务之外几乎相同。有关几个受支持环境中的代码示例,请参阅 Github 上的
demo
文件夹。
请注意,在通用连接器的 JCA 版本中,您可以配置通用连接器是否在内部处理恢复。如果没有,您必须提供事务管理器可以调用的回调,以查找您认为尚未完成的事务。在本文讨论的非 JCA 实现中,这始终由通用连接器在内部处理。通用连接器会将上下文信息写入目录,并在恢复期间使用它来告诉事务管理器需要清理的内容。严格来说,这不太对,因为如果你的硬盘出现故障,所有未完成交易的信息都会丢失。在严格的两阶段提交中,这就是允许事务管理器调用资源以获取需要恢复的未完成事务列表的原因。在当今的 RAID 控制器世界中,生产机器没有理由会因硬盘故障而丢失数据,因此目前没有提供回调到通用连接器的选项,该连接器可以告诉它正在进行哪些事务一个需要恢复的状态。如果节点发生灾难性硬件故障,节点无法重新启动和运行,您需要将通用连接器写入的所有文件从旧硬盘物理复制到第二个硬盘节点。然后,在第二个节点上运行的事务管理器和通用连接器将协调工作以完成所有挂起的事务,方法是提交或回滚它们,以崩溃时相关的为准。此过程与在灾难恢复期间复制事务管理器日志没有什么不同,具体取决于您使用的是哪个事务管理器。您需要这样做的可能性非常小——在我的职业生涯中,我从未听说过我所从事的项目/产品的生产机器以这种方式失败。
您可以使用清单 4 中所示的第二个参数配置此上下文信息的写入位置:
@Service
@Transactional
public class SomeService {
@Autowired @Qualifier("xa/bookingService")
BasicTransactionAssistanceFactory bookingServiceFactory;
public String doSomethingInAGlobalTransactionWithARemoteService(String username) throws Exception {
//write to say a local database...
//call a remote service
String msResponse = null;
try(TransactionAssistant transactionAssistant = bookingServiceFactory.getTransactionAssistant()){
msResponse = transactionAssistant.executeInActiveTransaction(txid->{
BookingSystem service = new BookingSystemWebServiceService().getBookingSystemPort();
return service.reserveTickets(txid, username);
});
}
return msResponse;
}
}
清单 4:配置通用连接器。显示的值也是默认值。
清单 4 设置了事务变得与恢复相关之前的最小年龄。在这种情况下,只有超过 30 秒时,事务才会被视为与通过恢复进行的清理相关。您可能需要根据执行业务流程所花费的时间来调整此值,这可能取决于为您调用的每个后端服务配置的超时期限的总和。低值和高值之间存在权衡:值越低,事务管理器中运行的后台任务在故障恢复期间进行清理所需的时间越少。这意味着该值越小,不一致的窗口就越小。但要小心,如果该值太低,恢复任务将尝试回滚实际上仍处于活动状态的事务。您可以正常配置事务管理器的超时时间,清单4中设置的值应该大于等于事务管理器的超时时间。此外,存储上下文数据的目录在清单 4 中配置为本地目录。您可以指定任何目录,但请确保该目录存在,因为通用连接器不会尝试创建它。
如果您在 Tomcat 环境中使用 Bitronix,您可能会发现没有太多关于如何配置环境的信息。在 Bitronix 从 codehaus.org 转移到 Github 之前,它曾经被很好地记录下来。我已经用 Bitronix 创建了
一个问题
来改进文档。
demo/genericconnector-demo-tomcat-bitronix
文件夹中的源代码和自述文件包含提示和链接。
关于使用通用连接器的最后一件事是提交和回滚的工作方式。连接器所做的只是搭载在 JTA 事务之上,因此在需要回滚的情况下,它会通过回调获得通知。通用连接器然后将此信息传递给您在清单 3 中注册的回调中的代码。后端数据的实际回滚不是通用连接器所做的事情 - 它只是调用您的回调,以便您可以告诉后端系统回滚数据。通常你不会这样回滚,而是将写入的数据标记为不再有效,通常使用状态。很难正确回滚在执行阶段已写入的所有数据痕迹。在严格的两阶段提交协议设置中,例如使用两个数据库,写入每个资源的数据在执行和提交/回滚之间保持锁定状态,第三方事务无法触及。事实上,这是两阶段提交的缺点之一,因为锁定资源会降低可伸缩性。通常,您集成的后端系统不会在执行阶段和提交阶段之间锁定数据,而且提交回调实际上将保持为空,因为它无事可做——数据通常在第 16 行时已经在后端提交清单 1 在执行阶段返回。然而,如果你想构建一个更严格的系统,并且你可以影响你正在集成的后端的实现,那么后端系统中的数据可以在执行和提交阶段之间“锁定”,通常通过使用状态,例如执行后的“保留票”和提交后的“预订票”。不允许第三方交易访问处于“保留”状态的资源/票证。
通用连接器和许多演示项目可在
https://github.com/maxant/genericconnector/
获得,二进制文件和源代码可从
Maven
获得。