public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; //队列中元素数量 final ReentrantLock takeLock = this.takeLock; // 拿东西的锁 takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); // notEmpty这个条件不成立 进入等待 } x = dequeue(); // 出栈 // 例子:count = 1 --> c = count.getAndDecrement() --> c = 1 count = 0 c = count.getAndDecrement(); // getAndDecrement表示获取当前值然后减一 if (c > 1) notEmpty.signal(); // 元素数量至少两个 所以这里可以唤醒一个await的线程 } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x;}
当线程调用了notEmpty.await();进入等待之后 会有别的线程获取到takeLock锁进来
如果count.get()也等于0也会等待 会又有线程拿到takeLock锁
lockInterruptibly和lock区别
例子
测试线程拿到ReentrantLock锁后调用了Condition的await后 会不会有新的线程拿到锁
package blockingqueue;import java.util.ArrayDeque;import java.util.Queue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;/** * Created by User on 2017/12/21. */public class App { ReentrantLock lock = new ReentrantLock(); Condition cannotEmpty = lock.newCondition(); Queue queue = new ArrayDeque(); public static void main(String[] args) { App app = new App(); new Thread(()->{ for (;;) { app.queue.add(new Object()); System.out.println("添加了一个产品"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } app.lock.lock(); app.cannotEmpty.signal(); app.lock.unlock(); } }).start(); for (int i = 0; i < 5; i++) { new Thread(()->{ try { for (;;) app.consume(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } private void consume() throws InterruptedException { lock.lock(); try { System.out.println("进来一个消费者"); while (queue.size() == 0) { cannotEmpty.await(); } System.out.println("开始消费"); Thread.sleep(1000); queue.poll(); System.out.println("消费结束"); if (queue.size() > 0) cannotEmpty.signal(); } finally { lock.unlock(); } }}