将事件源 CQRS 系统实现为 RDBMS

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 63w+ 字,讲解图 2808+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2200+ 小伙伴加入学习 ,欢迎点击围观

在最近涉及事件源 cqrs 系统的项目中,我们决定做一些与大多数讨论的解决方案相比看起来有些不寻常的事情。然而,它们让我们获得了一些好的属性,否则很难(如果可能的话)。

事件存储为常规表

我们决定将事件存储实现为 rdbms 中的常规表。我们使用了 postgresql,但这里几乎没有特定于 postgresql 的内容。我们知道这个数据库非常可靠、强大而且非常成熟。最重要的是,单节点酸交易提供了一些非常好的好处。

该表最后包含以下字段:

  • event_id (int) – 来自全局序列的主键
  • stream_id (uuid) – 事件流的 id,通常是 ddd 聚合
  • seq_no (int) – 特定流的历史序列号
  • transaction_time (timestamp) – 事务开始时间,在一个事务中提交的所有事件都相同
  • correlation_id (uuid)
  • payload (json)

并非所有这些对于事件存储都是强制性的,但有一个重要且不常见的区别: event_id - 全局,顺序增加的数字。我们稍后会讲到。

你可以做到吗?

如果您在常规数据库表中寻找事件存储,那么获取这样的全局事件 ID 非常便宜。数据库非常有效地生成、存储、索引等此类列。唯一 实际的 问题是您是否负担得起首先使用数据库表。

我们一直在构建的系统并不面向广泛的网络。它适用于拥有成百上千用户的公司内部使用。这是一个相对较小的规模,postgres 服务没有问题。

总而言之,如果您要打造下一个亚马逊,我不推荐您这样做。但很可能你不是,所以你可能能够负担得起使用更简单技术的奢侈。

全局顺序事件 ID 的好处

现在我们有了这个特殊的事件 ID,我们可以用它做什么呢?

我们来看看我们事件存储的读取接口:


 public interface eventstorereader {
    list<event> geteventsforstream(uuid streamid, long aftersequence, int limit);
    list<event> geteventsforallstreams(long aftereventid, int limit);
    optional<long> getlasteventid();
}

第一种方法很明显,随处可见。我们仅使用它从事件存储中恢复单个流(聚合)以处理新命令。

另外两个使用事件 ID,在特定事件之后返回一批事件,以及最后一个事件的 ID。它们是我们阅读模型(预测)的基础。

读取模型是通过轮询(带有提示)事件存储来实现的。他们 记住最后处理的事件的 id 。每隔一段时间(或者当被来自事件存储的通知唤醒时),他们从存储中读取下一批事件并在单个线程中按顺序处理它们。

这种线性的、单线程的处理可能已经很简单了,但显然可扩展性有限。如果你每分钟得到 600 个事件,这意味着平均每个事件你不能慢于 100 毫秒,无论如何。实际上,您还需要考虑开销并留出一些余量,因此它需要比这更快。

它可以通过在读取模型中进行分片或并行化写入来解决,但目前我们认为没有必要。拥有多个并行运行的独立、专门的模型肯定有助于实现这一点。

将投影的最后处理的事件 ID 与当前的全局最大值进行比较,您可以 立即知道投影落后了多少 。它是队列大小的逻辑等价物。

全局序列也可用于 减轻最终一致性(或陈旧性)的缺点

执行命令可以返回最后写入事件的 ID。然后查询可以使用这个 id,请求:“我可以等 5 秒,但如果你的数据早于这个 id,请不要给我结果”。大多数时候,这只是几毫秒的问题。对于这个价格,当用户进行更改时,她会立即看到结果。这是来自服务器的实际数据,而不是通过在用户界面中复制域逻辑来实现的模拟!

它在域方面也很有用。我们有一些应用程序和域服务可以查询一些特定于域的预测(例如,用于唯一检查)。如果你知道事件存储中的最后一个事件是 x,你可以等到投影赶上那个点,然后再对命令进行进一步的处理。这就是解决通常用 saga 解决的许多问题所需的全部内容。

最后但同样重要的是,由于所有事件都是有序的, 因此投影始终是一致的 。它可能会落后几秒钟或几天,但它永远不会不一致。根本不可能遇到诸如一个流处理到星期一,而另一个流处理到星期四这样的问题。如果在特定事件发生之前发生了某些事情,则视图模型中始终保持相同的顺序。

它使代码和系统状态更容易编写、维护和推理。

重新审视可扩展性和复杂性

无论实际客户需求和现实规模如何,都倾向于使用复杂的、高可扩展性的技术。这些工具有它们的用处,但它们并不是显而易见的赢家,也不是解决所有问题的金锤子。此外,如果考虑到开发和运营的复杂性及其局限性,它们确实非常昂贵。

有时一个更简单的工具就能很好地解决问题。您不仅可以节省开发和运营费用,还可以获得一些无法大规模使用的非常强大的工具。包括全局计数器、线性化和酸性交易。

我们的示例显示了一个系统,该系统足够复杂以保证使用 cqrs 进行事件溯源,但规模足够小以允许使用线性事件存储,甚至使用线性投影,所有这些都在普通的 postgres 数据库中。

选择无聊的技术 有很多充分的理由。如果你创新(你应该),请注意你为什么要创新,不要同时在所有领域进行创新。

这篇文章也出现在 绿洲数字博客 上。

更新:讨论

在 reddit 上对该帖子进行了非常有趣的讨论

很多注意力都集中在 sql 数据库的使用上。它确实会减慢速度,它具有许多您不会在仅附加日志中使用的功能,但它也具有您需要的所有功能。限制本身非常高,至少每秒写入数千(或数万)次。我们非常了解这些数据库的行为方式,每个人都知道如何使用它们,您可以在拐角处找到管理员。

仅就线性、单线程写入而言,它们可以进一步扩展。单线程写入与硬件限制保持一致,同时能够每秒处理数百万个事务(尽管不一定使用 sql 数据库,但)。一个很好的相关示例是 lmax 架构 。无论如何,您很可能不需要多线程或分布式系统。

相关文章