Java 实例 – 生产者/消费者问题(长文讲解)
💡一则或许对你有用的小广告
欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论
- 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于
Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...
,点击查看项目介绍 ;- 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;
截止目前, 星球 内专栏累计输出 82w+ 字,讲解图 3441+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2900+ 小伙伴加入学习 ,欢迎点击围观
在多线程编程中,生产者/消费者问题是一个经典的同步问题,它描述了两个角色——生产者和消费者——如何协作处理共享资源。对于Java开发者而言,理解这一问题不仅能提升对并发编程的认知,还能帮助解决实际开发中常见的数据传递与线程协调场景。本文将通过实例详解这一问题,并结合Java语言特性,逐步拆解其解决方案。
一、生产者/消费者问题的核心概念
1.1 问题背景与类比
想象一个快递仓库:
- 生产者是快递员,不断将包裹放入仓库;
- 消费者是分拣员,从仓库取出包裹进行分拣。
如果仓库容量有限,当仓库满时,快递员需要等待;当仓库空时,分拣员也需要等待。这种协作机制正是生产者/消费者问题的直观体现。
1.2 核心矛盾
- 资源竞争:多个线程同时访问共享资源(如仓库)时,需避免数据不一致。
- 同步问题:生产者和消费者需要协调工作节奏,避免“生产阻塞”或“消费饥饿”。
二、传统解决方案:基于 wait()
/notify()
的实现
2.1 基础代码框架
通过 synchronized
关键字和 Object
的 wait()
/notify()
方法实现同步。
class SharedResource {
private int data;
private boolean available = false;
public synchronized void produce(int value) {
while (available) {
try {
wait(); // 仓库已满,生产者等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
data = value;
available = true;
notify(); // 唤醒消费者
}
public synchronized int consume() {
while (!available) {
try {
wait(); // 仓库为空,消费者等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
available = false;
notify(); // 唤醒生产者
return data;
}
}
2.2 关键点解析
wait()
:线程释放锁,进入等待队列,直到被notify()
唤醒。notify()
:随机唤醒一个等待该锁的线程。- 循环检查条件:避免“虚假唤醒”(False Wakeup),确保线程在被唤醒后仍需检查资源状态。
2.3 局限性
- 代码冗余:需在
produce
和consume
方法中重复等待逻辑。 - 线程安全风险:若未正确使用
synchronized
,可能导致竞态条件(Race Condition)。
三、现代解决方案:Java并发工具类 BlockingQueue
Java的 java.util.concurrent
包提供了更简洁、线程安全的实现方式,例如 ArrayBlockingQueue
。
3.1 BlockingQueue
的核心特性
- 阻塞操作:当队列满时,
put()
方法会阻塞生产者;当队列空时,take()
方法会阻塞消费者。 - 内置线程安全:无需手动编写同步代码,底层通过
ReentrantLock
和Condition
实现。
3.2 实例代码
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
private static final int CAPACITY = 3;
private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(CAPACITY);
static class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 1; i <= 10; i++) {
Thread.sleep(500);
queue.put(i);
System.out.println("Produced: " + i + " (Queue Size: " + queue.size() + ")");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
Integer data = queue.take();
System.out.println("Consumed: " + data + " (Queue Size: " + queue.size() + ")");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}
3.3 代码分析
put()
方法:当队列满时,生产者阻塞,直到有空间可用。take()
方法:当队列空时,消费者阻塞,直到有数据可取。- 输出示例:
Produced: 1 (Queue Size: 1) Consumed: 1 (Queue Size: 0) Produced: 2 (Queue Size: 1) Consumed: 2 (Queue Size: 0) ...
四、线程同步机制的深入探讨
4.1 synchronized
与 ReentrantLock
的对比
特性 | synchronized | ReentrantLock |
---|---|---|
语法 | 关键字,隐式获取锁 | 显式调用 lock() 方法 |
中断响应 | 无法中断等待线程 | 可通过 interrupt() 中断 |
条件变量 | 内置 wait() /notify() | 需配合 Condition 实现 |
性能 | JVM 优化较好 | 可配置公平性策略 |
4.2 Condition
的作用
通过 Lock.newCondition()
可创建多个条件变量,实现更灵活的线程通知机制。例如:
Lock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
// 生产者逻辑
lock.lock();
try {
while (queue.isFull()) {
notFull.await(); // 等待队列未满
}
// 生产数据
notEmpty.signal(); // 通知消费者
} finally {
lock.unlock();
}
4.3 死锁与竞态条件的预防
- 死锁:避免多个线程同时持有不同锁并互相等待。
- 竞态条件:通过原子操作(如
AtomicInteger
)或锁粒度控制减少不确定性。
五、实践案例:文件读写中的生产者/消费者模式
假设需要从文件中读取数据并处理:
- 生产者线程:逐行读取文件内容,放入队列。
- 消费者线程:从队列取出数据,执行复杂计算(如统计词频)。
import java.util.concurrent.LinkedBlockingQueue;
class FileProcessor {
private static final BlockingQueue<String> dataQueue = new LinkedBlockingQueue<>(100);
static class FileReader implements Runnable {
@Override
public void run() {
// 模拟文件读取
try (BufferedReader br = ...) {
String line;
while ((line = br.readLine()) != null) {
dataQueue.put(line); // 生产数据
}
}
}
}
static class DataProcessor implements Runnable {
@Override
public void run() {
while (true) {
String line = dataQueue.take(); // 消费数据
// 处理逻辑...
}
}
}
}
六、结论与进阶建议
6.1 核心知识点总结
- 生产者/消费者问题是多线程协作的核心场景,需通过同步机制解决资源竞争。
BlockingQueue
是Java中推荐的解决方案,简化了代码并提升了安全性。- 线程同步工具(如
Lock
、Condition
)提供了更灵活的控制选项。
6.2 进阶学习方向
- 深入
java.util.concurrent
包:学习CountDownLatch
、CyclicBarrier
等工具。 - JVM 内存模型:理解
volatile
、happens-before
原则。 - 高并发系统设计:如消息队列(RabbitMQ)、分布式锁等。
通过本文的实践案例与代码示例,读者可以快速掌握生产者/消费者模式的实现方法,并在实际开发中避免常见的并发问题。记住,线程安全的核心在于明确资源访问边界,合理利用现成的并发工具。