Java 实例 – 生产者/消费者问题(长文讲解)

更新时间:

💡一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 82w+ 字,讲解图 3441+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2900+ 小伙伴加入学习 ,欢迎点击围观

在多线程编程中,生产者/消费者问题是一个经典的同步问题,它描述了两个角色——生产者和消费者——如何协作处理共享资源。对于Java开发者而言,理解这一问题不仅能提升对并发编程的认知,还能帮助解决实际开发中常见的数据传递与线程协调场景。本文将通过实例详解这一问题,并结合Java语言特性,逐步拆解其解决方案。


一、生产者/消费者问题的核心概念

1.1 问题背景与类比

想象一个快递仓库:

  • 生产者是快递员,不断将包裹放入仓库;
  • 消费者是分拣员,从仓库取出包裹进行分拣。
    如果仓库容量有限,当仓库满时,快递员需要等待;当仓库空时,分拣员也需要等待。这种协作机制正是生产者/消费者问题的直观体现。

1.2 核心矛盾

  • 资源竞争:多个线程同时访问共享资源(如仓库)时,需避免数据不一致。
  • 同步问题:生产者和消费者需要协调工作节奏,避免“生产阻塞”或“消费饥饿”。

二、传统解决方案:基于 wait()/notify() 的实现

2.1 基础代码框架

通过 synchronized 关键字和 Objectwait()/notify() 方法实现同步。

class SharedResource {  
    private int data;  
    private boolean available = false;  

    public synchronized void produce(int value) {  
        while (available) {  
            try {  
                wait(); // 仓库已满,生产者等待  
            } catch (InterruptedException e) {  
                Thread.currentThread().interrupt();  
            }  
        }  
        data = value;  
        available = true;  
        notify(); // 唤醒消费者  
    }  

    public synchronized int consume() {  
        while (!available) {  
            try {  
                wait(); // 仓库为空,消费者等待  
            } catch (InterruptedException e) {  
                Thread.currentThread().interrupt();  
            }  
        }  
        available = false;  
        notify(); // 唤醒生产者  
        return data;  
    }  
}  

2.2 关键点解析

  • wait():线程释放锁,进入等待队列,直到被 notify() 唤醒。
  • notify():随机唤醒一个等待该锁的线程。
  • 循环检查条件:避免“虚假唤醒”(False Wakeup),确保线程在被唤醒后仍需检查资源状态。

2.3 局限性

  • 代码冗余:需在 produceconsume 方法中重复等待逻辑。
  • 线程安全风险:若未正确使用 synchronized,可能导致竞态条件(Race Condition)。

三、现代解决方案:Java并发工具类 BlockingQueue

Java的 java.util.concurrent 包提供了更简洁、线程安全的实现方式,例如 ArrayBlockingQueue

3.1 BlockingQueue 的核心特性

  • 阻塞操作:当队列满时,put() 方法会阻塞生产者;当队列空时,take() 方法会阻塞消费者。
  • 内置线程安全:无需手动编写同步代码,底层通过 ReentrantLockCondition 实现。

3.2 实例代码

import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.BlockingQueue;  

public class ProducerConsumerExample {  
    private static final int CAPACITY = 3;  
    private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(CAPACITY);  

    static class Producer implements Runnable {  
        @Override  
        public void run() {  
            try {  
                for (int i = 1; i <= 10; i++) {  
                    Thread.sleep(500);  
                    queue.put(i);  
                    System.out.println("Produced: " + i + " (Queue Size: " + queue.size() + ")");  
                }  
            } catch (InterruptedException e) {  
                Thread.currentThread().interrupt();  
            }  
        }  
    }  

    static class Consumer implements Runnable {  
        @Override  
        public void run() {  
            try {  
                while (true) {  
                    Integer data = queue.take();  
                    System.out.println("Consumed: " + data + " (Queue Size: " + queue.size() + ")");  
                }  
            } catch (InterruptedException e) {  
                Thread.currentThread().interrupt();  
            }  
        }  
    }  

    public static void main(String[] args) {  
        new Thread(new Producer()).start();  
        new Thread(new Consumer()).start();  
    }  
}  

3.3 代码分析

  • put() 方法:当队列满时,生产者阻塞,直到有空间可用。
  • take() 方法:当队列空时,消费者阻塞,直到有数据可取。
  • 输出示例
    Produced: 1 (Queue Size: 1)  
    Consumed: 1 (Queue Size: 0)  
    Produced: 2 (Queue Size: 1)  
    Consumed: 2 (Queue Size: 0)  
    ...  
    

四、线程同步机制的深入探讨

4.1 synchronizedReentrantLock 的对比

特性synchronizedReentrantLock
语法关键字,隐式获取锁显式调用 lock() 方法
中断响应无法中断等待线程可通过 interrupt() 中断
条件变量内置 wait()/notify()需配合 Condition 实现
性能JVM 优化较好可配置公平性策略

4.2 Condition 的作用

通过 Lock.newCondition() 可创建多个条件变量,实现更灵活的线程通知机制。例如:

Lock lock = new ReentrantLock();  
Condition notFull = lock.newCondition();  
Condition notEmpty = lock.newCondition();  

// 生产者逻辑  
lock.lock();  
try {  
    while (queue.isFull()) {  
        notFull.await(); // 等待队列未满  
    }  
    // 生产数据  
    notEmpty.signal(); // 通知消费者  
} finally {  
    lock.unlock();  
}  

4.3 死锁与竞态条件的预防

  • 死锁:避免多个线程同时持有不同锁并互相等待。
  • 竞态条件:通过原子操作(如 AtomicInteger)或锁粒度控制减少不确定性。

五、实践案例:文件读写中的生产者/消费者模式

假设需要从文件中读取数据并处理:

  1. 生产者线程:逐行读取文件内容,放入队列。
  2. 消费者线程:从队列取出数据,执行复杂计算(如统计词频)。
import java.util.concurrent.LinkedBlockingQueue;  

class FileProcessor {  
    private static final BlockingQueue<String> dataQueue = new LinkedBlockingQueue<>(100);  

    static class FileReader implements Runnable {  
        @Override  
        public void run() {  
            // 模拟文件读取  
            try (BufferedReader br = ...) {  
                String line;  
                while ((line = br.readLine()) != null) {  
                    dataQueue.put(line); // 生产数据  
                }  
            }  
        }  
    }  

    static class DataProcessor implements Runnable {  
        @Override  
        public void run() {  
            while (true) {  
                String line = dataQueue.take(); // 消费数据  
                // 处理逻辑...  
            }  
        }  
    }  
}  

六、结论与进阶建议

6.1 核心知识点总结

  • 生产者/消费者问题是多线程协作的核心场景,需通过同步机制解决资源竞争。
  • BlockingQueue 是Java中推荐的解决方案,简化了代码并提升了安全性。
  • 线程同步工具(如 LockCondition)提供了更灵活的控制选项。

6.2 进阶学习方向

  1. 深入 java.util.concurrent:学习 CountDownLatchCyclicBarrier 等工具。
  2. JVM 内存模型:理解 volatilehappens-before 原则。
  3. 高并发系统设计:如消息队列(RabbitMQ)、分布式锁等。

通过本文的实践案例与代码示例,读者可以快速掌握生产者/消费者模式的实现方法,并在实际开发中避免常见的并发问题。记住,线程安全的核心在于明确资源访问边界,合理利用现成的并发工具

最新发布