编写事件源 CQRS 读取模型

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 63w+ 字,讲解图 2808+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2200+ 小伙伴加入学习 ,欢迎点击围观

关于事件溯源和 cqrs 的讨论似乎通常集中在 cqrs 上下文中的整体系统架构或各种类型的领域驱动设计上。然而,读取模型经常被忽略,尽管这方面也有一些有趣的考虑。在这篇文章中,我们将展示一个通过使用事件流来填充视图模型的示例实现。

概述

读取模型的想法非常简单。您获取事件日志,使用适当的函数在最初为空的数据模型上应用(重播)所有事件,然后您将获得填充的模型。代码可能如下所示:


 list<event> events = getevents();
model model = model.empty();
for (event event : events) {
    apply(model, event);
}

我们可以通过函数式编程使它更短:


 list<event> events = getevents();
model model = model.empty();
for (event event : events) {
    apply(model, event);
}

这就是本质。请注意,这只是抽象大纲,实际实现可能会有所不同,包括缓冲、批处理(或流式传输)、持久性等。

应用事件

应用事件的实际 Java 代码可能类似于以下内容:


 list<event> events = getevents();
model model = model.empty();
for (event event : events) {
    apply(model, event);
}

总而言之,它非常简单明了。可以在处理单个事件和整个批次之前和之后使用挂钩对其进行增强。这样的钩子可以用来:

  • 执行交易
  • 插件监控
  • 实施错误处理
  • 根据速度计算批量大小
  • 执行任意操作,例如每批次设置或重新计算一次

最后一个有趣的部分是 dispatchevent 方法。除了遍历类型层次结构、错误处理并使其全部可选之外,它归结为:


 list<event> events = getevents();
model model = model.empty();
for (event event : events) {
    apply(model, event);
}

换句话说,对于每个事件类型(如 ordercreated ),我们 on 一个调用的公共方法,该方法在 projector 对象上采用匹配类型的单个参数。

以上所有都是引擎的一部分,是支持许多视图模型的基础设施。实现投影所需要做的就是实际为投影仪提供有趣事件类型的处理程序。所有其他事件将被忽略。

它可能看起来像这样:


 list<event> events = getevents();
model model = model.empty();
for (event event : events) {
    apply(model, event);
}

投影线

让我们讨论一下多线程。共享的可变状态会立即带来许多问题, 应尽可能避免 。处理它的方法之一是首先不具有并发性,例如通过限制对单个线程的写入。 在大多数情况下, 结合 acid 事务的单线程编写器足以跟上写入负载。 (读取/查询负载可能很重并使用许多线程——此处的所有详细信息仅与写入有关。)

该线程负责将事件应用到读取模型,从查询事件存储到更新视图模型数据库。通常它只是从商店加载一批事件并应用它们。只要有更多事件要处理,它就会继续,并在赶上之后进入睡眠状态。它会在一定时间后或在事件存储收到有关新事件的通知时唤醒。

我们也可以控制这个线程的生命周期。例如,我们有一种方法可以以编程方式暂停和恢复每个投影的线程,甚至可以在管理图形用户界面中公开。

推还是拉?

使用数据库支持的事件存储,重复查询新事件非常容易。这是 模型。不幸的是,这也意味着您最终可能会过于频繁地轮询并产生不必要的负载,或者轮询过于不频繁,因此可能需要更长的时间才能将更改传播到视图模型。

这就是为什么除了轮询事件存储之外,引入通知以在保存新事件后立即唤醒读取模型是个好主意。这有效地成为了一个具有最小延迟和负载 的推送 模型。我们发现 jgroups 是完成这项工作的一个非常好的工具——它支持多种协议并且非常容易设置,与成熟的消息队列相比麻烦少得多。

通知可能包含也可能不包含实际事件。

在后一种(更简单的)设计中,它们仅传播已保存新事件的信息及其顺序 ID(以便所有投影都可以估计它们落后了多少)。唤醒后,执行器可以继续其正常路径,从查询事件存储开始。

为什么?因为处理来自单一来源的事件更容易,但更重要的是因为数据库支持的事件存储可以简单地保证排序并且不会出现丢失或重复消息的问题。查询数据库非常快,因为我们正在按主键顺序读取单个表,而且大部分时间数据无论如何都在 ram 缓存中。瓶颈在于投影线程更新其读取模型数据库。

但是,将事件数据放入通知中没有任何障碍(除了大小或网络流量方面的考虑)。它可能会减少事件存储的负载并节省一些到数据库的往返行程。投影仪需要维护一个缓冲区,并在需要时回退到查询事件存储。或者系统可以使用更可靠的消息队列。

重新开始预测

除了暂停/恢复之外,上面的屏幕截图还显示了一项操作:重新启动。看起来无伤大雅,这是一个非常好的和强大的功能。

由于视图模型完全来自事件日志,因此可以随时将其丢弃并从头开始重新创建(或从某些初始状态/足够旧的快照)。数据在事件日志中是安全的,事件日志是真实的最终来源。

当视图发生任何变化时它很有用:添加了一个字段或一个表,修复了一个错误,计算了一些不同的东西。当它发生时,通常更容易(或需要)从头开始,而不是例如实施大量的 sql 迁移脚本。

甚至可以完全自动化,这样当系统启动并且检测到 db schema 与相应的 java 模型不匹配时,它可以自动重新创建 schema 并重新处理事件日志。这就像使用 hibernate create-drop 策略运行,除了它不会丢失数据。

表现

该解决方案在性能方面可能显得非常有限。

可能引起注意的一点是 单线程编写器 。实际上,单个线程通常足够快,可以轻松跟上负载。并发不仅更难实现和维护,而且还会引入争用。读取(查询)可以是 大量多线程的 并且易于扩展。

我们还通过拥有多个读取模型获益良多,例如将分析与管理和“交易”数据分开。每个模型都是单线程的(用于写入),但多个模型并行使用事件。最后,可以修改解决方案以使用分片或某种分叉连接处理。

另一个有趣的点是从头 开始重新预测

一个好的解决方案类似于 kappa 架构

  • 保持过时的投影正常运行并回答所有查询。
  • 开始一个新的投影,例如到另一个数据库。让它处理事件,不要将任何流量指向它。
  • 当新投影赶上时,重定向流量并关闭旧投影。

在非常小的实例上,尤其是对于开发而言,甚至可以在同一个实例上在线重启。这取决于以下问题的答案:重新处理所有事件需要多长时间?这个预测过时 30 分钟是否可以接受?我们可以在晚上或周末进行部署吗?无论如何都没有人使用该系统?我们必须重播所有历史吗?

这里要考虑的另一个因素是持久性。如果瓶颈太大且无法进一步优化,请考虑使用内存视图模型。

加起来

本质上,这就是实现使用事件存储的读取模型所需的全部。由于 线性事件存储 和在单个线程中处理所有内容,它变得更加简单。如此之多以至于最后它实际上只是一个循环,实现了开头所示的减少。

在以后的帖子中,我将深入探讨实施预测的实际问题。

相关文章