将 java.util.concurrent.BlockingQueue 作为 rx.Observa

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 63w+ 字,讲解图 2808+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2200+ 小伙伴加入学习 ,欢迎点击围观

经典的生产者-消费者模式在 Java 中相对简单,因为我们有 java.util.concurrent.BlockingQueue。为了避免繁忙的等待和容易出错的手动锁定,我们只需利用 put() 和 take()。如果队列已满或为空,它们都会阻塞。我们所需要的只是一堆线程共享对同一个队列的引用:一些在生产,一些在消费。当然,队列的容量必须有限,否则我们很快就会在生产者表现优于消费者的情况下耗尽内存。 Greg Young 在 Devoxx Poland 期间强调了这条规则:


永远,永远不要创建无界队列

使用 BlockingQueue 生产者消费者

这是一个最简单的例子。首先,我们需要一个将对象放入共享队列的生产者:


 import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Slf4j @Value class Producer implements Runnable {

private final BlockingQueue<User> queue;

@Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { final User user = new User("User " + System.currentTimeMillis()); log.info("Producing {}", user); queue.put(user); TimeUnit.SECONDS.sleep(1); } } catch (Exception e) { log.error("Interrupted", e); } } }

生产者只是每秒向给定的队列发布一个 User 类的实例(无论它是什么)。显然,在现实生活中,将 User 放入队列是系统内某些操作的结果,例如用户登录。类似地,消费者从队列中获取新项目并处理它们:


 import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Slf4j @Value class Producer implements Runnable {

private final BlockingQueue<User> queue;

@Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { final User user = new User("User " + System.currentTimeMillis()); log.info("Producing {}", user); queue.put(user); TimeUnit.SECONDS.sleep(1); } } catch (Exception e) { log.error("Interrupted", e); } } }


同样在现实生活中,处理意味着存储在数据库中或对用户运行一些欺诈检测。我们使用队列将处理线程与消费线程分离,例如减少延迟。为了运行一个简单的测试,让我们启动几个生产者和消费者线程:


 import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Slf4j @Value class Producer implements Runnable {

private final BlockingQueue<User> queue;

@Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { final User user = new User("User " + System.currentTimeMillis()); log.info("Producing {}", user); queue.put(user); TimeUnit.SECONDS.sleep(1); } } catch (Exception e) { log.error("Interrupted", e); } } }

我们有 2 个生产者和 3 个消费者,一切似乎都在工作。在现实生活中,您可能会有一些隐式生产者线程,例如 HTTP 请求处理线程。在消费者方面,您很可能会使用线程池。这种模式运作良好,但尤其是消费端相当低级。


引入 ObservableQueue<T>

本文的目的是介绍一种抽象,它在生产者端表现得像队列,但在消费者端表现得像 RxJava 的 Observable。换句话说,我们可以将添加到队列中的对象视为我们可以在客户端映射、过滤、组合等的流。有趣的是,这不再是引擎盖下的队列。 ObservableQueue<T> 只是将所有新对象直接转发给订阅的消费者,并且在没有人收听(“ ”可观察)的情况下不会缓冲事件。

ObservableQueue<T> 本身并不是队列,它只是一个 API 与另一个 API 之间的桥梁。它类似于 java.util.concurrent.SynchronousQueue,但如果没有人对消费感兴趣,对象将被简单地丢弃。

这是第一个实验性实现。这只是一个玩具代码,不要认为它已准备好生产。我们稍后也会大大简化它:


 import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Slf4j @Value class Producer implements Runnable {

private final BlockingQueue<User> queue;

@Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { final User user = new User("User " + System.currentTimeMillis()); log.info("Producing {}", user); queue.put(user); TimeUnit.SECONDS.sleep(1); } } catch (Exception e) { log.error("Interrupted", e); } } }

关于它有几个有趣的事实:


  1. 我们必须跟踪所有订阅者,即愿意接收新项目的消费者。如果其中一个订阅者不再感兴趣,我们必须删除该订阅者,否则会发生内存泄漏(继续阅读!)
  2. 这个队列的行为就好像它总是空的。它从不保存任何项目——当你将一些东西放入这个队列时,它会自动传递给订阅者并被遗忘
  3. 从技术上讲,这个队列是无限的(!),这意味着您可以放置​​任意数量的项目。然而,由于项目被传递给所有订阅者(如果有的话)并立即被丢弃,这个队列实际上总是空的(见上文)
  4. 仍然有可能生产者生成了太多事件而消费者无法跟上 - RxJava 现在具有背压支持,本文未涵盖。

生产者可以像使用任何其他 BlockingQueue<T> 一样使用 ObservableQueue<T>,前提是我正确实施了队列契约。然而,消费者看起来更轻巧、更聪明:


 import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Slf4j @Value class Producer implements Runnable {

private final BlockingQueue<User> queue;

@Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { final User user = new User("User " + System.currentTimeMillis()); log.info("Producing {}", user); queue.put(user); TimeUnit.SECONDS.sleep(1); } } catch (Exception e) { log.error("Interrupted", e); } } }

上面的代码只打印 "B" "C" "A" 被设计 丢失,因为 ObservableQueue 会丢弃项目以防万一没有人在听。显然 Producer 类现在使用 users 队列。一切正常,您可以随时调用 users.observe() 并应用数十个 Observable 运算符之一。但是有一个警告:默认情况下 RxJava 不强制执行任何线程,因此消费发生在与生产相同的线程中!我们失去了生产者消费者模式最重要的特性,即线程解耦。幸运的是,在 RxJava 中一切都是声明式的,线程调度也是如此:


 import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Slf4j @Value class Producer implements Runnable {

private final BlockingQueue<User> queue;

@Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { final User user = new User("User " + System.currentTimeMillis()); log.info("Producing {}", user); queue.put(user); TimeUnit.SECONDS.sleep(1); } } catch (Exception e) { log.error("Interrupted", e); } } }

现在让我们看看真正的 RxJava 功能。想象一下,您想要计算每秒有多少用户登录,其中每个登录都作为一个事件放入队列中:


 import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Slf4j @Value class Producer implements Runnable {

private final BlockingQueue<User> queue;

@Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { final User user = new User("User " + System.currentTimeMillis()); log.info("Producing {}", user); queue.put(user); TimeUnit.SECONDS.sleep(1); } } catch (Exception e) { log.error("Interrupted", e); } } }

性能也是可以接受的,这样的队列在我的笔记本电脑上每秒可以接受大约 300 万个对象,只有一个订阅者。将此类视为从使用队列的遗留系统到现代反应世界的适配器。可是等等!使用 ObservableQueue<T> 很容易,但是使用同步 subscribers 集的实现似乎太低级了。幸运的是有 Subject<T, T> Subject Observable 的“另一面”——你可以将事件推送到 Subject 但它仍然实现了 Observable ,所以你可以轻松地创建任意 Observable 。看看 ObservableQueue 与其中一个 Subject 实现的外观多么漂亮:


 import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Slf4j @Value class Producer implements Runnable {

private final BlockingQueue<User> queue;

@Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { final User user = new User("User " + System.currentTimeMillis()); log.info("Producing {}", user); queue.put(user); TimeUnit.SECONDS.sleep(1); } } catch (Exception e) { log.error("Interrupted", e); } } }

上面的实现更加简洁,我们根本不必担心线程同步。


相关文章