JAVA并发编程——生产者与消费者模式(传统版&阻塞队列版)

位流蝉翼
• 阅读 1843

1.前言

2.生产者与消费者模型传统版代码示例

3.生产者与消费者模型阻塞队列版代码示例

1.前言
今天我们要用两种方式来实现生产者和消费者模式,我们要先介绍一个概念,生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过消息队列进行通讯,所以生产者生产完数据后不用等待消费者处理,而是直接扔给队列,消费者不找生产者要数据,而是直接从队列里取数据,队列相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个队列就是用来给生产者和消费者解耦的。我们今天要用两种方式来实现这个模型,一种是阻塞队列版本,一种是传统版本。

2.生产者与消费者模型传统版代码示例
我们先来对传统消费者和生产者模型的一个JAVA代码的实现:
假设我们现在有这么一个需求,一个生产者和一个消费者,他们共同控制一个变量number,一个线程生产数据,将number变为1,一个线程消费数据,将number从1变为0,从此往复循环。

我们使用lock和Condition版本的锁,关于Lock和Condition,可以根据我写的另外一篇文章:
JAVA并发编程——Synchronized与Lock的区别以及Lock的使用
我们先设定三个变量:

    //生产者和消费者线程操作的共享对象
    private volatile int number = 0;
    //一把锁
    private Lock lock = new ReentrantLock();
    //condition用以精确唤醒线程,用来代替wait()和notify()方法
    private Condition condition = lock.newCondition();

生产方法:

   public void increment() throws Exception {
        try {
            //加锁
            lock.lock();
            //如果线程不是0,则认定为还未消费,先唤醒消费线程
            while (number != 0) {
                condition.await();
            }
            //生产数据
            number++;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            //唤醒所有线程
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

消费方法:

     public void decrement() throws Exception {
        try {
            //加锁
            lock.lock();
            //如果线程是0,则认定为还未生产,先唤醒生产线程
            while (number == 0) {
                condition.await();
            }
            //消费
            number--;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            //唤醒所有线程
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

接下来我们各开启1000个线程进行生产和消费:

   public static void main(String[] args) {

        ShareData shareData = new ShareData();

        for (int i = 1; i <= 500; i++) {
            new Thread(() -> {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "线程AA生产:").start();
        }

        for (int i = 1; i <= 500; i++) {
            new Thread(() -> {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "线程BB消费:").start();
        }

        for (int i = 1; i <= 500; i++) {
            new Thread(() -> {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "线程CC生产:").start();
        }

        for (int i = 1; i <= 500; i++) {
            new Thread(() -> {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "线程DD消费:").start();
        }

    }

}

运行结果为:
JAVA并发编程——生产者与消费者模式(传统版&阻塞队列版)

成功地实现了传统版本生产者和消费者模式。

3.生产者与消费者模型阻塞队列版代码示例
接下来我们使用阻塞队列版本来实现以下生产者和消费者模式,至于阻塞队列,在之前的文章讲过:
JAVA并发编程——阻塞队列理论及其API介绍

我们设定三个变量和一个构造方法

    private volatile boolean flag = true;//默认开启,true生产+f消费  false 退出生产+消费
    private AtomicInteger atomicInteger = new AtomicInteger(); //代表生产的数据
    private BlockingQueue<String> blockingQueue = null;//阻塞队列
    public MyResources(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;//阻塞队列的类型通过构造方法确定
    }

生产方法:

    public void myProd() throws InterruptedException {
        String data = null; //生产的数据内容
        boolean retValue; //生产是否成功
        while (flag) { //true代表生产着消费者模式开启,false代表结束
            data = atomicInteger.getAndIncrement() + "";
            //阻塞队列超时等待方法
            retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
            if (retValue) System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "成功");
            else System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败");
            Thread.sleep(1000);
        }
        System.out.println("falg = false 生产叫停!");
    }

消费方法:

    public void myCustomer() throws InterruptedException {
        String result = null;
        while (flag) { //true代表生产着消费者模式开启,false代表结束
            //阻塞队列超时等待方法
            result = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if (result == null || result.equalsIgnoreCase("")) {
                flag = false;
                System.out.println("超过两秒钟没有数据消费,退出");
                System.out.println();
                System.out.println();
                return;
            }
            System.out.println(Thread.currentThread().getName() + "\t 消费队列" + result + "成功");
            Thread.sleep(1000);
        }
    }

停止生产方法:

    public void stop() {
        this.flag = false;//当flag等于0的时候,生产和消费就停止了
    }

接下来我们创建两个线程进行生产和消费,生产和消费超过10秒后,停止两个线程的运行:

 public static void main(String[] args) throws InterruptedException {
        MyResources myResources = new MyResources(new ArrayBlockingQueue<String>(3));

        new Thread(() -> {
            try {
                myResources.myProd();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "生产线程").start();

        new Thread(() -> {
            try {
                myResources.myCustomer();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "消费线程").start();


        Thread.sleep(10000);
        System.out.println("生产叫停!");
        myResources.stop();

    }

运行结果:

JAVA并发编程——生产者与消费者模式(传统版&阻塞队列版)

可以看出,利用阻塞队列,生产到时间了以后,就会自动停止了!

总结:
这章我们主要利用两种方式实现了生产者和消费者模型:
传统模式:利用lock类和condition
阻塞队列模式,利用了阻塞队列。

点赞
收藏
评论区
推荐文章
捉虫大师 捉虫大师
4年前
一种极致性能的缓冲队列
本文已收录https://github.com/lkxiaolou/lkxiaolou欢迎star。背景在多线程下的生产者消费者模型中,需求满足如下情况:对生产者生产投递数据的性能要求非常高多个生产者,单个(多个也可以,本文只介绍单个的情况)消费者当消费者跟不上生产者速度时,可容忍少部分数据丢失生产者是单条单条地生产数据举个日志采集的例子,日志在不同的
Wesley13 Wesley13
3年前
java多线程之消费者生产者模式
/@authorshijin生产者与消费者模型中,要保证以下几点:1同一时间内只能有一个生产者生产生产方法加锁sychronized2同一时间内只能有一个消费者消费消费方法加锁sychronized3生产者生产的同时消费者不能消费生产方法加锁sychronized
Stella981 Stella981
3年前
Linux系统 Centos7 环境基于Docker部署Rocketmq服务
消息队列基本概述MQ,MessageQueue,基于TCP协议构建的简单协议,区别于具体的通信协议。基于通信协议定义和抽象的更高层次的通信模型,一般都是生产者和消费者模型,又或者说服务端和客户端模型。生产者/消费者模型:一般通过定义生产者和消费者实现消息通信从而屏
Wesley13 Wesley13
3年前
3.rabbitmq
rabbitmq发布订阅模式模型组成一个消费者Producer,一个交换机Exchange,多个消息队列Queue,多个消费者Consumer一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送
Wesley13 Wesley13
3年前
ActiveMQ消息队列详解
activemq消息队列,分为生产者和消费者。.1创建生产者publicclassProducter{//ActiveMq的默认用户名privatestaticfinalStringUSERNAMEActiveMQConnection.DEFAULT\_USER;//ActiveMq的默认登录密码pr
Wesley13 Wesley13
3年前
C++ 多线程编程总结
在开发C程序时,一般在吞吐量、并发、实时性上有较高的要求。设计C程序时,总结起来可以从如下几点提高效率:并发异步缓存下面将我平常工作中遇到一些问题例举一二,其设计思想无非以上三点。1任务队列1.1   以生产者消费者模型设计任务队列      生产者消费者模型是人们非常熟
Wesley13 Wesley13
3年前
Java并发系列9
今天要讲的BlockingQueue可谓是大名鼎鼎,在并发编程中比较常见的一个类。BlockingQueue顾名思义是表示一个阻塞队列,注意这两个词:阻塞和队列。可以拿我们熟悉的生产者消费者队列来举例,一条流水线上,A生产零件,B组装零件,A就是生产者,B是消费者。如果A生成的太快,则零件堆积,A需要休息一会儿等待B把零件消费完;如果A生产的太
Easter79 Easter79
3年前
SpringMVC中配置RabbitMQ
        RabbitMQ是工作在amqp协议(advancedmessagequeueprotocal,高级消息队列协议)上的一个消息中间件。它通过一个生产者消费者模型来处理应用中产生的消息。        除了生产者和消费者,此模型中另外一个重要的概念叫“工作队列”,也称为“任务队列”(TaskQueue),任务队列背后的核心想法是避免
Wesley13 Wesley13
3年前
Java中生产者与消费者模式
 生产者消费者模式首先来了解什么是生产者消费者模式。该模式也称有限缓冲问题(英语:Boundedbufferproblem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同
Easter79 Easter79
3年前
SpringCloud 系列文章
SpringCloud生产者与消费者上一篇文章我们介绍了Euarka的搭建,本篇文章,我们搭建俩个服务,生产者服务与消费者服务。我们就以电商系统为例:服务生产者,订单查询服务orderserver,服务消费者orderclient说明:orderserver服务提供查询订单信息的
Wesley13 Wesley13
3年前
Java并发 阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加操作支持阻塞地插入和移除方法。支持阻塞插入的方法是指当队列满时会阻塞插入元素的线程,直到队列不满;支持阻塞移除的方法是指当队列为空时获取元素的线程无法继续获取元素直到队列不空。可以发现阻塞队列非常适合消费者和生产者场景下进行使用,生产者生产数据就是向阻塞队列中插入元素,消费者消