GVKun编程网logo

JAVA wait () 和 notifyAll () 实现线程间通讯(java wait notify通知指定线程)

1

想了解JAVAwait()和notifyAll()实现线程间通讯的新动态吗?本文将为您提供详细的信息,我们还将为您解答关于javawaitnotify通知指定线程的相关问题,此外,我们还将为您介绍关于

想了解JAVA wait () 和 notifyAll () 实现线程间通讯的新动态吗?本文将为您提供详细的信息,我们还将为您解答关于java wait notify通知指定线程的相关问题,此外,我们还将为您介绍关于15、Condition(await、signalAll)和object(wait、notifyall)、Java Object 对象的 wait () 和 notify ()、notifyAll ()、Java Object 对象的 wait() 和 notify()、notifyAll()、Java 中Object中的 wait、notify/notifyAll 方法详解的新知识。

本文目录一览:

JAVA wait () 和 notifyAll () 实现线程间通讯(java wait notify通知指定线程)

JAVA wait () 和 notifyAll () 实现线程间通讯(java wait notify通知指定线程)

本例是阅读 Think in Java 中相应章节后,自己实际写了一下自己的实现

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/*
假设一个学生,日出而作日落而息
*/
class Student
{
    private boolean Awake=false;
    public synchronized void wakeUp()
    {
        Awake=true;
        System.out.println("WakeUp!");
        notifyAll();    //notifyAll()写在这里!1.临界区 2.已经做了一些改变
    }
    public synchronized void sleep()
    {
        Awake=false;
        System.out.println("Sleep!");
        notifyAll();    //同上
    }
    public  synchronized void waitForSleep() throws InterruptedException {
        while(Awake!=false) wait(); //等待写在这里!1.临界区 2.等待外界改变的条件
    }
    public  synchronized void waitForAwake() throws InterruptedException{
        while(Awake!=true) wait();  //同上
    }
    public boolean isAwake() {
        return Awake;
    }
}
class SunRise implements Runnable
{//日出
    Student student=null;
    public SunRise(Student student)
    {
        this.student=student;
    }
    @Override public void run()
    {
        try {
            while (!Thread.interrupted()) {
                student.waitForSleep(); //先等待环境变化
                TimeUnit.MILLISECONDS.sleep(100);
                student.wakeUp();       //环境已变化,起床!
                System.out.println("End Awake");
            }

        }catch (InterruptedException e)
        {
            e.printStackTrace();
        }

    }
}
class SunFall implements Runnable
{
    Student student=null;
    public SunFall(Student student)
    {
        this.student=student;
    }
    @Override public void run()
    {
        try{
            while (!Thread.interrupted()) {
                student.waitForAwake(); //先等待环境变化
                TimeUnit.MILLISECONDS.sleep(100);
                student.sleep();        //环境已变化,睡觉!
                System.out.println("End Sleep");
            }

        }catch (InterruptedException e)
        {
            e.printStackTrace();
        }

    }
}
public class Main{
    public static void main(String[]args)
    {
        Student me=new Student();
        SunRise sunRise=new SunRise(me);
        SunFall sunFall=new SunFall(me);
        ExecutorService service= Executors.newCachedThreadPool();
        service.execute(sunRise);
        service.execute(sunFall); 
    }
}

 输出是

WakeUp!
End Awake
Sleep!
End Sleep

的不停循环。

应该算成功了吧。

15、Condition(await、signalAll)和object(wait、notifyall)

15、Condition(await、signalAll)和object(wait、notifyall)

1、介绍

  • 1、condition:

    一般是和锁绑定的,通过lock.newCondition()产生 可以生产多个,分别控制不同的线程角色(生产者和消费者)

  • 2、object:

    任何对象都有,一般使用临界资源(queue数组)进行控制

    由于queue只有一个,所以notifyall的时候会唤醒所有线程(生产者和消费)不能够“分类”控制

  • 3、使用共同点==>都必须先加锁,然后在操作,否则会报await被打断的异常。

  • 4、加锁锁定后,如果线程一直在运行,其他线程申请这个锁,肯定会被阻塞,但是如果此线程await之后,就会释放锁,让其他线程进来

  • 5、为什么要加锁?看总结

2、优缺点:

  • 1、Condition:

    由于可创建多个,可以精确唤醒一组线程,其他线程继续等待,免得浪费资源。 比如队列为null,则wait当前消费者,唤醒所有生产者,而不会唤醒其他消费者 因为即使唤醒其他的消费者,由于此时队列为null,也会即刻进入wait状态 同时还会进行锁竞争,损耗性能

  • 2、object:

    只有一个,同样如果队列为null时,wait当前消费者,然后notifyall(),唤醒其他所有线程(生产者和消费者)

    其他消费者,第一竞争锁,然后进来后,发现队列还是为null,立马await

    也就是没有做事情,还竞争锁,损耗性能。

  • 3、ConditionCPU负载

  • 4、ObjectCPU负载

Condition,100W ==> 5753毫秒

Object,100W ==> 5797毫秒

2、代码,注意:一定要先锁住

两种方式加锁,模拟实现消息队列

  • 1、QueueImpl.java

      import java.util.concurrent.locks.Condition;
      import java.util.concurrent.locks.ReentrantLock;
    
      /**
       * 模拟消息队列,分别使用condition和object对象进行加锁,同步控制。
       *
       * condition:
       *      一般是和锁绑定的,通过lock.newCondition()产生
       *      可以生产多个,分别控制不同的线程角色(生产者和消费者)
       *
       * object:
       *      任何对象都有,一般使用临界资源(queue数组)进行控制
       *      由于queue只有一个,所以notifyall的时候会唤醒所有线程(生产者和消费)
       *      不能够分开控制
       *
       * 使用共同点:
       *      都必须先加锁,然后在操作,否则会报await被打断的异常
       *      线程执行await、yieldhi释放锁,并且进行则塞等待
       *      sleep不会释放锁
       */
      public class QueueImpl<T> {
          private Object[] queue;
          private int size;                //队列数组长度
          private int msgSize = 0;         //队列中的数据量
          private int productIndex = 0;   //生产者位置
          private int consumerIndex = 0;  //消费者已经消费了的记录
    
          private ReentrantLock lock = new ReentrantLock(true);
          //用于锁定生产者,队列满的时候,notFull锁定,等待消费者唤醒
          private Condition notFull = lock.newCondition();
          //用于消费者,数组空的时候,notEmtry锁住,等待生产者唤醒
          private Condition notEmtry = lock.newCondition();
    
          public QueueImpl(int size) {
              queue = new Object[size];
              this.size = size;
          }
    
          /**
           * 生产者插入数据, await、signalAll写法
           */
          public boolean conditionPut(T msg) {
              lock.lock();  //必须加锁,否则会报wait会被打断异常
    
              try {
                  //之所以用while,是因为被唤醒后,队列可能又被其他生产者写满了,继续阻塞
                  while (msgSize == size) {
                      //如果放在这里,那么就是等生产者填满队列之后,在唤醒消费者
                      //notEmtry.signalAll();
                      notFull.await();
                  }
    
                  if (productIndex > 100000) {
                      productIndex = productIndex % size;
                      consumerIndex = consumerIndex % size;
                  }
                  queue[productIndex % size] = msg;
                  productIndex++;
                  msgSize++;
    
                  //放在这里,那么就是生产一条,消费一条
                  notEmtry.signalAll();
              } catch (InterruptedException e) {
                  e.printStackTrace();
                  return false;
              } finally {
                  lock.unlock();
              }
              return true;
          }
    
          /**
           * 消费者拿取数据, await、signalAll写法
           *
           * [@return](https://my.oschina.net/u/556800)
           */
          public T conditionTake() {
              T result = null;
              lock.lock();  //必须加锁,否则会报wait会被打断异常
    
              try {
                  //之所以用while,是因为队列可能又被其他线程消费完了,继续阻塞
                  while (msgSize == 0) {
                      //放在这里,表示把数据都消费完了,在唤醒生产者进行生产
                      //notFull.signalAll();
                      notEmtry.await();
                  }
    
                  result = (T) queue[consumerIndex % size];
                  consumerIndex++;
                  msgSize--;
    
                  //放在这里,消费一条,就唤醒生产者
                  notFull.signalAll();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } finally {
                  lock.unlock();
              }
              return result;
          }
    
          /**
           * 生产者插入数据, wait、notifyall写法
           *
           * [@param](https://my.oschina.net/u/2303379) msg
           * [@return](https://my.oschina.net/u/556800)
           */
          public boolean objectPut(T msg) {
              synchronized (queue) {  //必须加锁,否则会报wait会被打断异常
                  try {
                      //之所以用while,是因为被唤醒后,队列可能又被其他生产者写满了,继续阻塞
                      while (msgSize == size) {
                          //写在这里,表示队列写满了,才唤醒消费者消费
                          //queue.notifyAll();
                          queue.wait();
                      }
    
                      if (productIndex > 10000) {
                          productIndex = productIndex % size;
                          consumerIndex = consumerIndex % size;
                      }
                      queue[productIndex % size] = msg;
                      productIndex++;
                      msgSize++;
    
                      //写在这里,表示队列写满了,生产一条,就唤醒消费者消费
                      queue.notify();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                      return false;
                  }
              }
              return true;
          }
    
          /**
           * 消费者拿取数据, wait、notifyall写法
           *
           * [@return](https://my.oschina.net/u/556800)
           */
          public T objectTake() {
              T result = null;
              synchronized (queue) {    //必须加锁,否则会报wait会被打断异常
                  try {
                      //之所以用while,是因为队列可能又被其他线程消费完了,继续阻塞
                      while (msgSize == 0) {
                          //写在这里,表示消费者把队列中的信息全部消费了,才唤醒生产者生产
                          //queue.notifyAll();
                          queue.wait();
                      }
    
                      result = (T) queue[consumerIndex % size];
                      consumerIndex++;
                      msgSize--;
    
                      //写在这里,表示消费一条,就唤醒生产者进行生产
                      queue.notify();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
              return result;
          }
      }
    
  • 2、ProductAndConsumer.java

      import java.util.concurrent.atomic.AtomicInteger;
    
      /**
       * Condition测试
       * Condition一般是和锁绑定的,通过lock.newCondition()产生
       * Condition只能锁定和唤醒相同相同condition对象的值
       * <p>
       * 本例子实现消费者和生产者对一个数组进行生产和消费操作
       *
       * [@Author](https://my.oschina.net/arthor) liufu
       * @CreateTime 2018/4/18  20:31
       */
      public class ProductAndConsumer {
    
          public static void main(String[] args) {
    
              QueueImpl<String> queue = new QueueImpl<String>(200);
              AtomicInteger atomicInteger = new AtomicInteger(0);
    
              //condition生产者
              new Thread(() -> {
                  while (true) {
                      queue.conditionPut("i am msg:" + atomicInteger.getAndIncrement());
                  }
              }).start();
    
              //condition消费者
              new Thread(() -> {
                  while (true) {
                      System.out.println(queue.conditionTake());
                  }
              }).start();
    
    
    
              //=============================================================
    
    
              //object生产者
              new Thread(() -> {
                  while (true) {
                      queue.objectPut("i am msg:" + atomicInteger.getAndIncrement());
                  }
              }).start();
    
              //object消费者
              new Thread(() -> {
                  while (true) {
                      System.out.println(queue.objectTake());
                  }
              }).start();
          }
      }
    

3、总结

  • 1、必须加锁,lock.lock()或者synchronized (queue)。否则会报线程被打断异常

  • 2、线程调用wait()之后,会释放已经获得的锁给其他线程进来,并且将其自身放置在对象的等待集中,等待notify或者notifyall

  • 3、那么问题来了:大家都在wait,如果执行了notifyall,所有线程立马同时执行,不就会导致线程不安全吗?

    答案是:不会,因为

    notifyall确实是通知了所有执行此对象wait而等待的线程,他们都会被唤醒,但是执行之前它们都需要重新竞争获得锁(这就是为什么要加锁的原因)。这就限制了同一时刻,只会有一个线程获得锁执行,其他线程继续等待。拿到锁后,会从上一次wait()方法之后执行(恢复现场)。

    也就是说必须满足两个条件,缺一不可

      1、被notify到
      2、重新获得锁
    
  • 4、notify和notifyall有什么区别?

    notify只会唤醒等待中的一个线程,其他的不会被唤醒,这个时候唤醒的线程执行完成后,必须继续调用notify方法唤醒另外一个线程。

    而notifyall会唤醒所有等待线程,只是由于有“锁”的原因,只有一个线程执行,其他线程继续阻塞,只是这个时候不需要再继续notify而已。

    所以什么时候用notifyall,什么时候用notify,就要看情况了。

  • 5、NotifyallTest.java

      import java.util.concurrent.atomic.AtomicInteger;
    
      /**
       * 主要是为了测试wait和notifyall到底是什么原理
       *
       * @Author liufu
       * @CreateTime 2018/5/9  10:20
       */
      public class NotifyallTest {
          public static void main(String[] args) throws InterruptedException {
              String[] queue = new String[100];
              AtomicInteger atomicInteger = new AtomicInteger(0);
              for (int i = 0; i < 10; i++) {
                  new Thread(() -> {
                      int index = atomicInteger.getAndIncrement();
                      synchronized (queue) {
                          System.out.println(String.format("thread : %s wait", index));
                          try {
                              queue.wait();
                              System.out.println(String.format("thread : %s runing, 10秒内没有其他线程运行!", index));
                              Thread.sleep(10000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  }).start();
              }
    
              //让上面的所有线程都启动,在notifyall
              Thread.sleep(100);
    
              //这个也要加锁
              synchronized (queue) {
                  queue.notifyAll();
              }
          }
      }
    

Java Object 对象的 wait () 和 notify ()、notifyAll ()

Java Object 对象的 wait () 和 notify ()、notifyAll ()

在一个线程中通过一个对象来获得锁,调用 wait()函数,线程进入阻塞状态。

另一个线程通过也锁定此对象,调用对象的 notify () 方法通知其中给一个调用 wait 的对象结束等待状态。如果是调用 notifyAll () 通知的是前面所有调用此对象 wait () 方法的线程继续执行。

测试代码:

public class ObjectNotifyTestMain { public static void main(String[] args) { testNotify();
        testNotifyAll();
    } private static void testNotifyAll() { Object obj = new Object();
        Thread thread1 = new Thread(() -> { synchronized (obj) { try { System.out.println(Thread.currentThread().getName() + " - start");
                    obj.wait();
                    System.out.println(Thread.currentThread().getName() + " - resume");
                } catch (InterruptedException e) { e.printStackTrace();
                } } });
        Thread thread2 = new Thread(() -> { synchronized (obj) { try { System.out.println(Thread.currentThread().getName() + " - start");
                    obj.wait();
                    System.out.println(Thread.currentThread().getName() + " - resume");

                } catch (InterruptedException e) { e.printStackTrace();
                } } });
        thread1.start();
        thread2.start();
        try { Thread.sleep(100);
            synchronized (obj) { obj.notifyAll();
            } } catch (InterruptedException e) { e.printStackTrace();
        } } private static void testNotify() { Object obj = new Object();
        Thread thread1 = new Thread(() -> { synchronized (obj) { try { System.out.println(Thread.currentThread().getName() + " - start");
                    obj.wait();
                    System.out.println(Thread.currentThread().getName() + " - resume");
                    obj.notify();
                } catch (InterruptedException e) { e.printStackTrace();
                } } });

        Thread thread2 = new Thread(() -> { synchronized (obj) { System.out.println(Thread.currentThread().getName() + " - start");
                obj.notify();
                try { obj.wait();
                    System.out.println(Thread.currentThread().getName() + " - resume");
                } catch (InterruptedException e) { e.printStackTrace();
                } } });


        thread1.start();
        thread2.start();
    } } 

Java Object 对象的 wait() 和 notify()、notifyAll()

Java Object 对象的 wait() 和 notify()、notifyAll()

在一个线程中通过一个对象来获得锁,调用wait()函数,线程进入阻塞状态。

另一个线程通过也锁定此对象,调用对象的notify()方法通知其中给一个调用wait的对象结束等待状态。如果是调用notifyAll()通知的是前面所有调用此对象wait()方法的线程继续执行。

测试代码:

public class ObjectNotifyTestMain { public static void main(String[] args) { testNotify();
        testNotifyAll();
    } private static void testNotifyAll() { Object obj = new Object();
        Thread thread1 = new Thread(() -> { synchronized (obj) { try { System.out.println(Thread.currentThread().getName() + " - start");
                    obj.wait();
                    System.out.println(Thread.currentThread().getName() + " - resume");
                } catch (InterruptedException e) { e.printStackTrace();
                } } });
        Thread thread2 = new Thread(() -> { synchronized (obj) { try { System.out.println(Thread.currentThread().getName() + " - start");
                    obj.wait();
                    System.out.println(Thread.currentThread().getName() + " - resume");

                } catch (InterruptedException e) { e.printStackTrace();
                } } });
        thread1.start();
        thread2.start();
        try { Thread.sleep(100);
            synchronized (obj) { obj.notifyAll();
            } } catch (InterruptedException e) { e.printStackTrace();
        } } private static void testNotify() { Object obj = new Object();
        Thread thread1 = new Thread(() -> { synchronized (obj) { try { System.out.println(Thread.currentThread().getName() + " - start");
                    obj.wait();
                    System.out.println(Thread.currentThread().getName() + " - resume");
                    obj.notify();
                } catch (InterruptedException e) { e.printStackTrace();
                } } });

        Thread thread2 = new Thread(() -> { synchronized (obj) { System.out.println(Thread.currentThread().getName() + " - start");
                obj.notify();
                try { obj.wait();
                    System.out.println(Thread.currentThread().getName() + " - resume");
                } catch (InterruptedException e) { e.printStackTrace();
                } } });


        thread1.start();
        thread2.start();
    } } 

Java 中Object中的 wait、notify/notifyAll 方法详解

Java 中Object中的 wait、notify/notifyAll 方法详解

1、wait()、notify/notifyAll() 方法是Object的本地final方法,无法被重写。

2、wait()执行后拥有当前锁的线程会释放该线程锁,并处于等待状态(等待重新获取锁)

3、notify/notifyAll() 执行后会唤醒处于等待状态线程获取线程锁、只是notify()只会随机唤醒其中之一获取线程锁,notifyAll() 会唤醒所有处于等待状态的线程抢夺线程锁。

三个方法的最佳实践代码如下:

 

public class WaitAndNotify {
    public static void main(String[] args) {
        MethodClass methodClass = new MethodClass();
        Thread t1 = new Thread(() -> {
            try {
                methodClass.product();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            try {
                methodClass.customer();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }, "t2");
        Thread t3 = new Thread(() -> {
            try {
                methodClass.customer();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }, "t3");
        t1.start();
        t2.start();
        t3.start();

    }
}

class MethodClass {
    // 定义生产最大量
    private final int MAX_COUNT = 20;

    int productCount = 0;

    public synchronized void product() throws InterruptedException {
        while (true) {
            System.out.println(Thread.currentThread().getName() + ":::run:::" + productCount);
            Thread.sleep(10);
            if (productCount >= MAX_COUNT) {
                System.out.println("货舱已满,,.不必再生产");
                
                wait();
            }else {
                productCount++;
            }
            
            notifyAll();
        }
    }

    public synchronized void customer() throws InterruptedException {
        while (true) {
            System.out.println(Thread.currentThread().getName() + ":::run:::" + productCount);
            Thread.sleep(10);
            if (productCount <= 0) {
                System.out.println("货舱已无货...无法消费");
                wait();
            }else {
                productCount--;
            }
            
            notifyAll();
        }
    }
}

结果:

t1:::run:::0
t1:::run:::1
t1:::run:::2
t1:::run:::3
t1:::run:::4
t1:::run:::5
t1:::run:::6
t1:::run:::7
t1:::run:::8
t1:::run:::9
t1:::run:::10
t1:::run:::11
t1:::run:::12
t1:::run:::13
t1:::run:::14
t1:::run:::15
t1:::run:::16
t1:::run:::17
t1:::run:::18
t1:::run:::19
t1:::run:::20
货舱已满,,.不必再生产
t3:::run:::20
t3:::run:::19
t3:::run:::18
t3:::run:::17
t3:::run:::16
t3:::run:::15
t3:::run:::14
t3:::run:::13
t3:::run:::12
t3:::run:::11
t3:::run:::10
t3:::run:::9
t3:::run:::8
t3:::run:::7
t3:::run:::6
t3:::run:::5
t3:::run:::4
t3:::run:::3
t3:::run:::2
t3:::run:::1
t3:::run:::0
货舱已无货...无法消费
t2:::run:::0
货舱已无货...无法消费
t1:::run:::0
t1:::run:::1
t1:::run:::2
t1:::run:::3
t1:::run:::4
t1:::run:::5
t1:::run:::6
t1:::run:::7
t1:::run:::8
t1:::run:::9
t1:::run:::10
t1:::run:::11
t1:::run:::12
t1:::run:::13
t1:::run:::14
t1:::run:::15
t1:::run:::16
t1:::run:::17
t1:::run:::18
t1:::run:::19
t1:::run:::20
货舱已满,,.不必再生产
t2:::run:::20
t2:::run:::19
t2:::run:::18
t2:::run:::17
t2:::run:::16
t2:::run:::15
t2:::run:::14
t2:::run:::13
t2:::run:::12
t2:::run:::11
t2:::run:::10
t2:::run:::9
t2:::run:::8
t2:::run:::7
t2:::run:::6
t2:::run:::5
t2:::run:::4
t2:::run:::3
t2:::run:::2
t2:::run:::1
t2:::run:::0
货舱已无货...无法消费

 

我们今天的关于JAVA wait () 和 notifyAll () 实现线程间通讯java wait notify通知指定线程的分享就到这里,谢谢您的阅读,如果想了解更多关于15、Condition(await、signalAll)和object(wait、notifyall)、Java Object 对象的 wait () 和 notify ()、notifyAll ()、Java Object 对象的 wait() 和 notify()、notifyAll()、Java 中Object中的 wait、notify/notifyAll 方法详解的相关信息,可以在本站进行搜索。

本文标签: