经典的生产者-消费者模式在 Java 中相对简单,因为我们有 java.util.concurrent.BlockingQueue。为了避免繁忙的等待和容易出错的手动锁定,我们只需利用 put() 和 take()。如果队列已满或为空,它们都会阻塞。我们所需要的只是一堆线程共享对同一个队列的引用:一些在生产,一些在消费。当然,队列的容量必须有限,否则我们很快就会在生产者表现优于消费者的情况下耗尽内存。 Greg Young 在 Devoxx Poland 期间强调了这条规则:
永远,永远不要创建无界队列
使用
BlockingQueue
生产者消费者
这是一个最简单的例子。首先,我们需要一个将对象放入共享队列的生产者:
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Value
class Producer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = new User("User " + System.currentTimeMillis());
log.info("Producing {}", user);
queue.put(user);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
生产者只是每秒向给定的队列发布一个
User
类的实例(无论它是什么)。显然,在现实生活中,将
User
放入队列是系统内某些操作的结果,例如用户登录。类似地,消费者从队列中获取新项目并处理它们:
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Value
class Producer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = new User("User " + System.currentTimeMillis());
log.info("Producing {}", user);
queue.put(user);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
同样在现实生活中,处理意味着存储在数据库中或对用户运行一些欺诈检测。我们使用队列将处理线程与消费线程分离,例如减少延迟。为了运行一个简单的测试,让我们启动几个生产者和消费者线程:
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Value
class Producer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = new User("User " + System.currentTimeMillis());
log.info("Producing {}", user);
queue.put(user);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
我们有 2 个生产者和 3 个消费者,一切似乎都在工作。在现实生活中,您可能会有一些隐式生产者线程,例如 HTTP 请求处理线程。在消费者方面,您很可能会使用线程池。这种模式运作良好,但尤其是消费端相当低级。
引入
ObservableQueue<T>
本文的目的是介绍一种抽象,它在生产者端表现得像队列,但在消费者端表现得像 RxJava 的 Observable。换句话说,我们可以将添加到队列中的对象视为我们可以在客户端映射、过滤、组合等的流。有趣的是,这不再是引擎盖下的队列。 ObservableQueue<T> 只是将所有新对象直接转发给订阅的消费者,并且在没有人收听(“ 热 ”可观察)的情况下不会缓冲事件。
ObservableQueue<T> 本身并不是队列,它只是一个 API 与另一个 API 之间的桥梁。它类似于 java.util.concurrent.SynchronousQueue,但如果没有人对消费感兴趣,对象将被简单地丢弃。
这是第一个实验性实现。这只是一个玩具代码,不要认为它已准备好生产。我们稍后也会大大简化它:
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Value
class Producer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = new User("User " + System.currentTimeMillis());
log.info("Producing {}", user);
queue.put(user);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
关于它有几个有趣的事实:
- 我们必须跟踪所有订阅者,即愿意接收新项目的消费者。如果其中一个订阅者不再感兴趣,我们必须删除该订阅者,否则会发生内存泄漏(继续阅读!)
- 这个队列的行为就好像它总是空的。它从不保存任何项目——当你将一些东西放入这个队列时,它会自动传递给订阅者并被遗忘
- 从技术上讲,这个队列是无限的(!),这意味着您可以放置任意数量的项目。然而,由于项目被传递给所有订阅者(如果有的话)并立即被丢弃,这个队列实际上总是空的(见上文)
- 仍然有可能生产者生成了太多事件而消费者无法跟上 - RxJava 现在具有背压支持,本文未涵盖。
生产者可以像使用任何其他 BlockingQueue<T> 一样使用 ObservableQueue<T>,前提是我正确实施了队列契约。然而,消费者看起来更轻巧、更聪明:
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Value
class Producer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = new User("User " + System.currentTimeMillis());
log.info("Producing {}", user);
queue.put(user);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
上面的代码只打印
"B"
和
"C"
。
"A"
被设计
丢失,因为
ObservableQueue
会丢弃项目以防万一没有人在听。显然
Producer
类现在使用
users
队列。一切正常,您可以随时调用
users.observe()
并应用数十个
Observable
运算符之一。但是有一个警告:默认情况下 RxJava 不强制执行任何线程,因此消费发生在与生产相同的线程中!我们失去了生产者消费者模式最重要的特性,即线程解耦。幸运的是,在 RxJava 中一切都是声明式的,线程调度也是如此:
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Value
class Producer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = new User("User " + System.currentTimeMillis());
log.info("Producing {}", user);
queue.put(user);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
现在让我们看看真正的 RxJava 功能。想象一下,您想要计算每秒有多少用户登录,其中每个登录都作为一个事件放入队列中:
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Value
class Producer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = new User("User " + System.currentTimeMillis());
log.info("Producing {}", user);
queue.put(user);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
性能也是可以接受的,这样的队列在我的笔记本电脑上每秒可以接受大约 300 万个对象,只有一个订阅者。将此类视为从使用队列的遗留系统到现代反应世界的适配器。可是等等!使用
ObservableQueue<T>
很容易,但是使用同步
subscribers
集的实现似乎太低级了。幸运的是有
Subject<T, T>
。
Subject
是
Observable
的“另一面”——你可以将事件推送到
Subject
但它仍然实现了
Observable
,所以你可以轻松地创建任意
Observable
。看看
ObservableQueue
与其中一个
Subject
实现的外观多么漂亮:
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Value
class Producer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = new User("User " + System.currentTimeMillis());
log.info("Producing {}", user);
queue.put(user);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
上面的实现更加简洁,我们根本不必担心线程同步。