GVKun编程网logo

java并发:初探消费者和生产者模式(深入理解java并发原理)

22

如果您想了解java并发:初探消费者和生产者模式和深入理解java并发原理的知识,那么本篇文章将是您的不二之选。我们将深入剖析java并发:初探消费者和生产者模式的各个方面,并为您解答深入理解java

如果您想了解java并发:初探消费者和生产者模式深入理解java并发原理的知识,那么本篇文章将是您的不二之选。我们将深入剖析java并发:初探消费者和生产者模式的各个方面,并为您解答深入理解java并发原理的疑在这篇文章中,我们将为您介绍java并发:初探消费者和生产者模式的相关知识,同时也会详细的解释深入理解java并发原理的运用方法,并给出实际的案例分析,希望能帮助到您!

本文目录一览:

java并发:初探消费者和生产者模式(深入理解java并发原理)

java并发:初探消费者和生产者模式(深入理解java并发原理)

消费者和生产者模式

用继承Thread方式,用wait和notifyAll方法实现。

消费者和生产者模式的特点

1. 什么时候生产:仓库没有满的时候,生产者这可以生产,消费者也可以消费,仓库满的时候停止生产
2. 什么时候消费: 仓库有货的时候消费,没有货不能消费
3. 通知生产:消费者发现没有货,消费者通知生产者生产
4. 通知消费:生产者生产出产品后,通知消费者消费

代码实现

package com.java.javabase.thread.base;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.requiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/*
    1. 什么时候生产:仓库没有满的时候,生产者这可以生产,消费者也可以消费,仓库满的时候停止生产
    2. 什么时候消费: 仓库有货的时候消费,没有货不能消费
    3. 通知生产:消费者发现没有货,消费者通知生产者生产
    4. 通知消费:生产者生产出产品后,通知消费者消费
 */
@Data
@Slf4j
@AllArgsConstructor
@requiredArgsConstructor
class Dept {
    private int capacity;
    private int size;

    /*
    @param val 是生产的产品数量
     */
    public synchronized void produce(int val) {
        //
        int remainder = val;
        while (remainder > 0) {
            if (size >= capacity) {
                try {
                    //仓库满了,等待不在执行生产
                    wait();
                } catch (InterruptedException e) {
                    e.printstacktrace();
                }
            }
            int incre = (size + remainder) > capacity ? capacity - size : remainder;
            size += incre;
            remainder -= incre;
            log.info("produce {} .depet increment {},remainder {},current size is {}",val,incre,remainder,size);
            //结束生产,通知消费或者生产
            notifyAll();
        }


    }

        public synchronized void custom(int val) {
            int remainder = val;
            while(remainder>0){
                if(size<=0){
                    try {
                        //仓库无货,等待不在消费
                        wait();
                    } catch (InterruptedException e) {
                        e.printstacktrace();
                    }
                }
                int descre =(size<remainder)?size:remainder;
                size -=descre;
                remainder -=descre;
                log.info("custom {}. depet descrement {},descre,size);
                //结束消费,通知消费或者生产
                notifyAll();
            }

    }
}

@AllArgsConstructor
class Producer {
    private Dept dept;

    public void produce(final int val) {
        new Thread() {
            @Override
            public void run() {
                dept.produce(val);
            }
        }.start();
    }


}

@AllArgsConstructor
class Custom {
    private Dept dept;

    public void custom(final int val) {
        new Thread() {
            @Override
            public void run() {
                dept.custom(val);
            }
        }.start();
    }

}

public class DemoProCustomTest {
    public static void main(String[] args) {
        Dept dept =new Dept(100,0);
        Producer producer =new Producer(dept);
        Custom custom =new Custom(dept);
        producer.produce(100);
        producer.produce(190);
        custom.custom(150);
        custom.custom(80);


    }
}

日志结果输出

因为多线程,所以下面的结果只是其中之一的可能性

2019-07-31 19:10:13,297   [Thread-0] INFO  Dept  - produce 100 .depet increment 100,remainder 0,current size is 100
2019-07-31 19:10:13,299   [Thread-3] INFO  Dept  - custom 80. depet descrement 80,current size is 20
2019-07-31 19:10:13,299   [Thread-2] INFO  Dept  - custom 150. depet descrement 20,remainder 130,current size is 0
2019-07-31 19:10:13,300   [Thread-1] INFO  Dept  - produce 190 .depet increment 100,remainder 90,300   [Thread-2] INFO  Dept  - custom 150. depet descrement 100,remainder 30,300   [Thread-1] INFO  Dept  - produce 190 .depet increment 90,current size is 90
2019-07-31 19:10:13,300   [Thread-2] INFO  Dept  - custom 150. depet descrement 30,current size is 60

java 多线程 - 生产者消费者模式

java 多线程 - 生产者消费者模式

 

 

进程定义:

进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位

线程定义:

线程是进程的一个实体,是 CPU 调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源 (如程序计数器,一组寄存器和栈), 但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.

 

线程运行图:

 

 

线程有三种创建方式

 1 实现 Runnable 接口

 2 继承 Thread 类

 3 使用 Callable 和 Future 接口创建线程。具体是创建 Callable 接口的实现类,并实现 clall () 方法。并使用 FutureTask 类来包装 Callable 实现类的对象,且以此 FutureTask 对象作为 Thread 对象的 target 来创建线程。

 

线程的一些方法:  

  守护线程 执行一些非业务方法,比如 gc。当全部都是守护线程的时候,jvm 退出

 

  线程优先级  设置线程优先级:setPriority (int priorityLevel)。参数 priorityLevel 范围在 1-10 之间,值越大优先级越高,

          能被执行到的概率越大,但非优先执行。

   sleep() 让当前的正在执行的线程暂停指定的时间,并进入阻塞状态 时间到了,恢复到就绪状态

   join() 等待线程结束,我执行完了 你才能执行

   yield() 谦让,我暂时不执行,我回到就绪状态

        另外 yield () 方法还与线程优先级有关,当某个线程调用 yiled () 方法从运行状态转换到就绪状态后,CPU 从就绪状态线程队列中只会选择与该线程优先级相同或优先级更高的线程去执行。但是分配还是随机的 可能还是执行到原来的线程

 

线程的生命周期

    新建状态(New):当线程对象对创建后,即进入了新建状态

    就绪状态(Runnable):当调用线程对象的 start () 方法(t.start ();),线程即进入就绪状态。处于就绪状态的线程,只是说明此线程已经做好了准备,能否在执行需要看待 cpu 的调度。

    运行状态(Running): cpu 调度到该线程,该线程就会运行。

    阻塞状态(Blocked):线程因为某些原因,暂时被挂起,暂时不继续执行的状态。常见的有以下三种

      1. 等待阻塞:运行状态中的线程执行 wait () 方法,使本线程进入到等待阻塞状态; 

      2. 同步阻塞 -- 线程在获取 synchronized 同步锁失败 (因为锁被其它线程所占用),它会进入同步阻塞状态;

      3. 其他阻塞 -- 通过调用线程的 sleep () 或 join () 或发出了 I/O 请求时,线程会进入到阻塞状态。当 sleep () 状态超时、join () 等待线程终止或者超时、或者 I/O 处理完毕时,线程重新转入就绪状态。

     死亡状态(Dead):线程执行完了或者因异常退出了 run () 方法,该线程结束生命周期。

 

注:

    只有使用 yield () 方法以后 线程从运行到就绪状态

    无论使用 wait (),sleep (), join () 都是先进入阻塞 当满足了相关条件以后,在进入就绪状态

 

线程交互

    1 使用 wait ()、notify () 和 notifyAll () 时需要首先对调用对象加锁【必须使用在同步代码块】   
    2 调用 wait () 方法后,线程状态会从 RUNNING 变为 WAITING,并将当线程加入到 lock 对象的等待队列中【会释放锁】 ,后面的代码就不执行了 。当被唤醒并被执行时,是接着上次执行到的 wait () 方法代码后面继续往下执行的。
    3 调用 notify () 或者 notifyAll () 方法后(不会释放锁),等待在 lock 对象的等待队列的线程不会马上从 wait () 方法返回,必须要等到调用 notify () 或者 notifyAll () 方法的线程将 lock 锁释放,等待线程才有机会从等待队列返回。这里只是有机会,因为锁释放后,等待线程会出现竞争,只有竞争到该锁的线程才会从 wait () 方法返回,其他的线程只能继续等待

生产者线程:

如果水果的数量在 [0,9] 则开始生产,否则转到消费线程。

public class Producer implements Runnable {
    private Fruit fruit;
    
    
    public Producer() {
        super();
    }
    

    public Producer(Fruit fruit) {
        super();
        this.fruit = fruit;
    }


    public Fruit getFruit() {
        return fruit;
    }

    public void setFruit(Fruit fruit) {
        this.fruit = fruit;
    }
    @Override
    public void run() {
        // TODO Auto-generated method stub

        while(true){
            synchronized (fruit) {
                //积压了10个水果未卖出去  停掉生产的线程等待卖出去    
                if(fruit.getNumber()>=10){
                    try {
                        fruit.wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                if(fruit.getNumber()>=0&&fruit.getNumber()<10){
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    System.out.println(fruit.getName()+"被生产出来了");
                    
                    fruit.setNumber(fruit.getNumber()+1);
                    System.out.println("当前水果数量为"+fruit.getNumber());
                    fruit.notify();
                }
            }    
        }

    }

}

消费者线程:

 如果水果的数量在大于 0,即可消费,否则转到生产线程

public class Customer implements Runnable {
	private Fruit fruit;
	
	
	public Customer() {
		super();
	}


	public Customer(Fruit fruit) {
		super();
		this.fruit = fruit;
	}


	public Fruit getFruit() {
		return fruit;
	}


	public void setFruit(Fruit fruit) {
		this.fruit = fruit;
	}
	
	@Override
	public void run() {
		while(true){
			//处理并发问题
			synchronized (fruit) {
				//判断水果的数量是否>0;
				if(fruit.getNumber()>0){
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					System.out.println(fruit.getName()+"被买走了");
					//减少水果数量
					fruit.setNumber(fruit.getNumber()-1);
					System.out.println("当前水果数量为"+fruit.getNumber());
					fruit.notify();
				}
				else{
					try {
						
						//水果已经卖完拿了 等待生产
						fruit.wait();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
				
			}
		
		}

	}

}

Fruit 类

public class Fruit {
    private int number;
    private String name;

    public Fruit() {
        super();
    }
    public Fruit(int number, String name) {
        super();
        this.number = number;
        this.name = name;
    }
    public int getNumber() {
        return number;
    }
    public void setNumber(int number) {
        this.number = number;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }

    
}

 

当调用 obj.notify ()/notifyAl0 () l 后,调用线程依旧持有 obj 锁,当调用 obj.wait () 后,调用线程会释放 obj 锁,如果将 synchronized (fruit) 放在 while (true) 前面,会因为同步锁未及时释放,一直运行到 obj.wait () 才释放同步锁。  如果用的 synchronized 同步代码块  在当运行到 while 结束循环的 break 时会自动释放锁  ,而用 lock 时 不会自动释放锁,  synchronized 在实现 runable 接口时  修饰在方法上  锁对象用 this   而继承类 Thread 时 修饰在静态方法上锁对象表示当前类的类对象

 

public class SalesThread implements Runnable{
    //这个类本身就会被共享  故变量不用再加static
    
    private int tickets=100;
    
    public SalesThread(){
        
    }





    @Override
    public void run() {
        
    
        while(true){
            
                if(tickets>0){
                    ff();
                }
                
                else{
                    System.out.println("票已售完!");
                    break;
                }
                
            
        }    
    }
    
public void ff1(){

        synchronized(this){
        
            try {
                Thread.sleep(200);
                System.out.println(Thread.currentThread().getName()+"正在卖第"+(tickets--)+"张票");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }    
        }
    }

 Sleep 方法会先进行阻塞,然后休眠到期就会进去就绪状态 

java 多线程 —— 生产者消费者模式

java 多线程 —— 生产者消费者模式

package test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * java多线程模拟生产者消费者问题
 * 
 * ProducerConsumer是主类,Producer生产者,Consumer消费者,Product产品,Storage仓库
 * 
 */
public class ProducerConsumer {
    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();

        Storage s = pc.new Storage();

        ExecutorService service = Executors.newCachedThreadPool();
        Producer p = pc.new Producer("张三", s);
        Producer p2 = pc.new Producer("李四", s);
        Consumer c = pc.new Consumer("王五", s);
        Consumer c2 = pc.new Consumer("老刘", s);
        Consumer c3 = pc.new Consumer("老林", s);
        service.submit(p);
        //service.submit(p2);
        service.submit(c);
        service.submit(c2);
        service.submit(c3);
        
    }

    /**
     * 消费者
     * 
     */
    class Consumer implements Runnable {
        private String name;
        private Storage s = null;

        public Consumer(String name, Storage s) {
            this.name = name;
            this.s = s;
        }

        public void run() {
            try {
                while (true) {
                    System.out.println(name + "准备消费产品.");
                    Product product = s.pop();
                    System.out.println(name + "已消费(" + product.toString() + ").");
                    System.out.println("===============");
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }

    /**
     * 生产者
     *
     */
    class Producer implements Runnable {
        private String name;
        private Storage s = null;

        public Producer(String name, Storage s) {
            this.name = name;
            this.s = s;
        }

        public void run() {
            try {
                while (true) {
                    Product product = new Product((int) (Math.random() * 10000)); // 产生0~9999随机整数
                    System.out.println(name + "准备生产(" + product.toString() + ").");
                    s.push(product);
                    System.out.println(name + "已生产(" + product.toString() + ").");
                    System.out.println("===============");
                    Thread.sleep(500);
                }
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }

        }
    }

    /**
     * 仓库,用来存放产品
     * 
     */
    public class Storage {
        BlockingQueue<Product> queues = new LinkedBlockingQueue<Product>(10);

        /**
         * 生产
         * 
         * @param p 产品
         * @throws InterruptedException
         */
        public void push(Product p) throws InterruptedException {
            queues.put(p);
        }

        /**
         * 消费
         * 
         * @return 产品
         * @throws InterruptedException
         */
        public Product pop() throws InterruptedException {
            return queues.take();
        }
    }

    /**
     * 产品
     * 
     */
    public class Product {
        private int id;

        public Product(int id) {
            this.id = id;
        }

        public String toString() {// 重写toString方法
            return "产品:" + this.id;
        }
    }

}

 

java 多线程:生产者和消费者模式 (wait-notify) : 单生产和单消费

java 多线程:生产者和消费者模式 (wait-notify) : 单生产和单消费

单生产者

package com.example.t.pc;

import java.util.List;

//生产者
public class P {
    private List list;

    public P(){
    }

    public P(List list){
        this.list = list;
    }

    public void add(){
        while(true){
            synchronized (list){
                try {
                    System.out.println("3s----------------");
                    Thread.sleep(3000);
                    if(list != null && list.size() > 0){
                        System.out.println("生产者:停止生产");
                        list.wait(); //锁释放 原地等待
                        System.out.println("P ok解锁");
                    }

                    list.add("123");
                    list.notify();
                    System.out.println("生产者:开始生产");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

单消费者

package com.example.t.pc;

import java.util.List;

//消费者
public class C {
    private List list;

    public C(){
    }

    public C(List list){
        this.list = list;
    }

    public void sub(){
        while (true){
            synchronized (list){
                try {
                    System.out.println("1s----------------");
                    Thread.sleep(1000);
                    if(list != null && list.size() > 0){
                        list.remove(0);
                        list.notify();
                        System.out.println("消费者: 开始消费");
                    }else{
                        System.out.println("消费者: 停止消费");
                        list.wait();//锁释放 原地等待
                        System.out.println("C ok解锁");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

执行

package com.example.t.pc;

import java.util.ArrayList;
import java.util.List;

public class Main {

    public static void main(String[] args) {
         List list = new ArrayList();

         new Thread(() -> {
            P p = new P(list);
            p.add();
        }).start();

        new Thread(()->{
            C c = new C(list);
            c.sub();
        }).start();
    }




}

 

java 实现生产者-消费者模式

java 实现生产者-消费者模式

生产和消费者模式有很多种,现在介绍几种常见的方式

  •   wait/notify实现生产和消费者模式

 

1、使用wait/notify实现生产和消费者模式:

public class Depot {

    // 实际容量
    private volatile int capacity = 0 ;
    // 最大容量
    private static final int MAX_CAPACITY = 100 ;
    //
    private final Object lock = new Object() ;

    public Depot(){
        
    }

    /**
     *  生产
     * @param: num
     * @return:
     */
    public void producer(int num) {
        if(num<0){
            System.out.println("生产数量不可为负数");
            throw new IllegalArgumentException("生产数量不可为负数") ;
        }

        synchronized (lock){
            // 生产
            int left = 0;
            try {
                left = doProducer(num);

                // 要求生产数量未达标,让出执行权
                while (left > 0){
                    Thread.yield();
                    left = doProducer(left);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            lock.notify();
        }

    }

    /**
     *  实际生产逻辑
     * @param: num
     * @return: 返回未生产数量
     */
    private int doProducer(int num) throws InterruptedException {
        // 实际增加数量
        int grow = num ;
        // 未生产数量
        int left = 0 ;

        // 等待仓库被消费
        while (capacity>=MAX_CAPACITY){
            lock.wait();
        }

        // 计算实际新增数量
        if( (num+capacity) > MAX_CAPACITY ){
            grow = MAX_CAPACITY - capacity ;
            left = num - grow ;
        }
        capacity += grow ;

        // 仓库已经生产,通知消费者消费
        lock.notify();
        System.out.println(Thread.currentThread().getName() + " Plan Producer = " + num + " Actually = " + grow + " left =  " + left + "  capacity = " + capacity);
        return left ;
    }

    /**
     *  消费
     * @param: num
     * @return:
     */
    public void consumer(int num) {
        if(num<0){
            System.out.println("消费数量不可为负数");
            throw new IllegalArgumentException("消费数量不可为负数") ;
        }

        synchronized (lock){
            // 消费仓库
            int left = 0;
            try {
                left = doConsumer(num);

                // 要求消费数量未达标,让出执行权
                while (left > 0){
                    Thread.yield();
                    left = doConsumer(left);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 仓库已经被消费,通知生产者生产
            lock.notify();
        }

    }

    /**
     *  实际消费逻辑
     * @param:   num
     * @return:  剩余未消费数量
     */
    private int doConsumer(int num) throws InterruptedException {
        // 实际消费数量
        int decrease = num ;
        // 剩余未消费
        int left = 0 ;

        // 等待仓库生产
        while (capacity <= 0){
            lock.wait();
        }

        // 计算实际消费数量
        if(decrease > capacity){
            decrease = capacity ;
            left = decrease - capacity ;
        }
        capacity -= decrease ;

        System.out.println(Thread.currentThread().getName() + " Plan Consumer = "+ num + " Actually = " + decrease + " left =  " + left + "  capacity = " + capacity );
        return left ;
    }

}

 

 

测试案例:

public class ProducerAndConsumer {
    public static void main(String[] args) throws InterruptedException {
        Depot depot = new Depot() ;

        for(int x=0 ; x<4; x++){
            new Thread(new Runnable() {
                @Override public void run() {
                    depot.producer(40);
                }
            }).start();
        }

        Thread.sleep(2000L);
        for(int x=0 ; x<3; x++){
            new Thread(new Runnable() {
                @Override public void run() {
                    depot.consumer(40);
                }
            }).start();
        }

        Thread.sleep(2000L);

    }
}

 

 

运行结果:

Thread-0 Plan Producer = 40 Actually = 40 left =  0  capacity = 40
Thread-1 Plan Producer = 40 Actually = 40 left =  0  capacity = 80
Thread-2 Plan Producer = 40 Actually = 20 left =  20  capacity = 100
Thread-4 Plan Consumer = 40 Actually = 40 left =  0  capacity = 60
Thread-2 Plan Producer = 20 Actually = 20 left =  0  capacity = 80
Thread-3 Plan Producer = 40 Actually = 20 left =  20  capacity = 100
Thread-6 Plan Consumer = 40 Actually = 40 left =  0  capacity = 60
Thread-5 Plan Consumer = 40 Actually = 40 left =  0  capacity = 20
Thread-3 Plan Producer = 20 Actually = 20 left =  0  capacity = 40

 

其他待续.........

 

我们今天的关于java并发:初探消费者和生产者模式深入理解java并发原理的分享已经告一段落,感谢您的关注,如果您想了解更多关于java 多线程 - 生产者消费者模式、java 多线程 —— 生产者消费者模式、java 多线程:生产者和消费者模式 (wait-notify) : 单生产和单消费、java 实现生产者-消费者模式的相关信息,请在本站查询。

本文标签: