反应式 PostgreSQL:使用 Scala 收听通知

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 54w+ 字,讲解图 2476+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 1900+ 小伙伴加入学习 ,欢迎点击围观

在过去,我写了几篇文章( 使用 Akka HTTP、Akka Streams 和反应式 mongo 在 Scala 中构建 REST 服务 以及 使用 Akka、Scala 和 websockets 构建 ReactiveMongo )使用 MongoDB 将更新直接从数据库推送到 Scala 应用程序.如果您只想将您的应用程序订阅到一个流式事件列表,那么这是一个非常好的功能,如果您在应用程序关闭时错过一个事件并不重要。虽然 MongoDB 是一个很棒的数据库,但它并不适合所有用途。有时您需要一个关系数据库,具有定义良好的模式,或者一个可以结合 SQL 和 noSQL 世界的数据库。就个人而言,我一直非常喜欢 Postgresql。它是最好的关系数据库之一,具有强大的 GIS 支持(我非常喜欢),并且获得越来越多的 JSON/Schema-less 支持(我需要在某个时候深入研究)。我不知道 Postgresql 中的一个特性是它提供了一种订阅机制。我在阅读“ 在 Go 中监听来自 PostgreSQL 的通用 JSON 通知 ”一文时了解到这一点,该文章展示了如何在 Go 中使用它。在本文中,我们将尝试了解您需要做什么才能在 Scala 中获得类似的效果(Java 的方法几乎相同)。

这在 PostgreSQL 中如何工作?

在 PostgreSQL 中监听通知其实非常容易。您所要做的就是:


 LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.

想要监听事件的连接使用它想要监听的通道的名称调用 LISTEN。发送连接只运行带有通道名称和可能的负载的 NOTIFY。

准备数据库

我在介绍中提到的关于 Go 的文章中很酷的一点是,它提供了一个存储过程,只要插入、更新或删除表行,它就会自动发送通知。以下摘自 在 Go 中监听来自 PostgreSQL 的通用 JSON 通知 创建一个存储过程,该过程在调用时发送通知。

这个存储过程真正酷的地方在于数据被转换为 JSON,所以我们可以在我们的应用程序中轻松地处理它。对于此示例,我将使用 Go 文章中使用的相同表和数据,因此首先创建一个表:


 LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.

并在表发生任何事情时创建触发器。


 LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.

此时,每当在 products 表上插入、更新或删除一行时,都会创建一个通知事件。我们可以使用 pgsql 命令行简单地测试它:


 LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.

如您所见,INSERT 导致包含数据的异步事件。所以,到目前为止,我们几乎都遵循了 Go 文章中概述的步骤。现在让我们看看如何从 Scala 访问通知。

从 Scala 访问通知

首先让我们设置项目的依赖项。一如既往,我们使用 SBT。这个项目的 build.sbt 看起来像这样:

依赖关系的快速总结:

  • scalikeJDBC :该项目提供了一个易于使用的 JDBC 包装器,因此我们不必使用 Java 连接处理方式和其他东西。
  • akka :我们使用 Akka 框架来管理与数据库的连接。由于 JDBC 驱动程序不是异步的 ar 可以推送数据,我们需要设置一个时间间隔。
  • json4s :这只是一个简单的 Scala JSON 库。我们使用它来快速将传入的数据转换为简单的案例类。

我们将首先向您展示此示例的完整源代码,然后对各个部分进行解释:

如果您熟悉 Akka 和 scalikeJDBC,代码看起来会很眼熟。我们从一些常规设置开始:

在这里,我们定义了我们的案例类,我们将转换传入的 JSON,设置连接池,定义 Akka 系统并启动我们的 Poller actor。这里没什么特别的,唯一特别的是第 23 行。要从 Scala 添加监听器,我们需要访问底层的 JDBC Connection。由于 scalikeJDBC 使用连接池,我们需要显式调用 setAccessToUnderlyingConnectionAllowed 以确保在调用 getInnerMostDelegate 时允许我们访问实际连接,而不仅仅是从连接池中包装一个。有趣的是,如果我们不设置它,我们不会收到错误消息或任何东西,我们只会从这个方法调用中得到一个 Null ....

解决了这个问题,我们的 Actor 开始了,让我们看看它做了什么:

我们在 actor 中做的第一件事是设置 scalikeJDBC 所需的一些属性,并设置一个每 500 毫秒触发一条消息的计时器。还要注意 preStart 和 postStop 函数。在 preStart 中,我们执行一小段 SQL,它告诉 postgres 这个连接将监听名为“events”的通知。我们还将 DB.autoClose 设置为 falls,以避免会话池机制关闭会话和连接。我们想让它们保持活力,这样我们就可以接收事件。当 actor 终止时,我们确保清理计时器和连接。

在接收函数中,我们首先获取真正的 PGConnection,然后从连接中获取通知:

如果没有通知,则返回 Null,所以我们将其包装在一个 Option 中,并且在 Null 的情况下只返回一个空数组。如果有任何通知,我们只需在 foreach 循环中处理它们并打印出结果:

在这里你还可以看到我们只是从通知中获取了“数据”元素,并将其转换为我们的 Product 类以供进一步处理。您现在要做的就是启动应用程序并从同一个 pgsql 终端添加一些事件。如果一切顺利,您将在控制台中看到类似于此的输出:


 LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.

现在你已经有了这个基本的结构,使用它很简单,例如,作为反应流的来源,或者只是使用 websockets 来进一步传播这些事件。