

来源: 阅读:

  • BlockingQueue简介





  SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。

  • BlockingQueue内容


  抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用


  • BlockingQueue实现原理


public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -817911632652898426L;

    /** 数组对象,用于放置对象 */
    final Object[] items;

    /** put, offer, or add方法放入数组的索引 */
    int putIndex;

    /**  take, poll, peek or remove方法取出数据的数组索引 */
    int takeIndex;
    /** queue队列的总数 */
    int count;

    final ReentrantLock lock;
    /** 非空信号量,可以取数*/
    private final Condition notEmpty;
    /** 非满信号量,可以放数 */
    private final Condition notFull;


 1 public void put(E e) throws InterruptedException {
 2         checkNotNull(e);
 3         final ReentrantLock lock = this.lock;
 4         lock.lockInterruptibly();
 5         try {
 6             while (count == items.length)
 7                 notFull.await();
 8             insert(e);
 9         } finally {
10             lock.unlock();
11         }
12 }
 1 public E take() throws InterruptedException {
 2         final ReentrantLock lock = this.lock;
 3         lock.lockInterruptibly();
 4         try {
 5             while (count == 0)
 6                 notEmpty.await();
 7             return extract();
 8         } finally {
 9             lock.unlock();
10         }
11     }


  • BlockingQueue应用


 1 public class BlockingQueueTest {
 2     public static void main(String[] args) {
 3         final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
 4         for (int i = 0; i < 2; i++) {
 5             new Thread(new Runnable() {
 6                 @Override
 7                 public void run() {
 8                     while (true) {
 9                         System.out.println("Thread "+Thread.currentThread().getName()+"正在准备放入数据");
10                         try {
11                             //模拟线程的放数速度
12                             Thread.sleep(new Random().nextInt(1000));
13                         } catch (InterruptedException e) {
14                             // TODO Auto-generated catch block
15                             e.printStackTrace();
16                         }
17                         try {
18                             queue.put(1);
19                         } catch (InterruptedException e) {
20                             // TODO Auto-generated catch block
21                             e.printStackTrace();
22                         }
23                         System.out.println("Thread "+Thread.currentThread().getName()+"放入数据,此时队列中的数据为:"+queue.size());
24                     }
25                 }
26             }).start();
27             new Thread(new Runnable() {
28                 @Override
29                 public void run() {
30                     while (true) {
31                         System.out.println("Thread "+Thread.currentThread().getName()+"正在取得数据");
32                         try {
33                             //模拟线程的去数速度
34                             Thread.sleep(100);
35                         } catch (InterruptedException e) {
36                             // TODO Auto-generated catch block
37                             e.printStackTrace();
38                         }
39                         try {
40                             queue.take();
41                         } catch (InterruptedException e) {
42                             // TODO Auto-generated catch block
43                             e.printStackTrace();
44                         }
45                         System.out.println("Thread "+Thread.currentThread().getName()+"取得数据,此时队列中的数据为:"+queue.size());
46                     }
47                 }
48             }).start();
49         }
51     }
52 }


 1 public class BlockingQueueCommunication {
 2     public static void main(String[] args) {
 3         final Business business = new Business();
 4         new Thread(new Runnable() {
 6             @Override
 7             public void run() {
 8                 // TODO Auto-generated method stub
 9                 for (int i = 0; i < 50; i++) {
10                     try {
11                         business.sub(i);
12                     } catch (InterruptedException e) {
13                         // TODO Auto-generated catch block
14                         e.printStackTrace();
15                     }
16                 }
17             }
18         }).start();
19         for (int i = 0; i < 50; i++) {
20             try {
21                 business.main(i);
22             } catch (InterruptedException e) {
23                 // TODO Auto-generated catch block
24                 e.printStackTrace();
25             }
26         }
27     }
28     static class Business{
29         BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);
30         BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);
31         {
32             try {
33                 queue2.put(1);//保证queue2阻塞
34             } catch (InterruptedException e) {
35                 // TODO Auto-generated catch block
36                 e.printStackTrace();
37             }
38         }
40         public void main(int i) throws InterruptedException{
41             queue1.put(1);//阻塞queue1
42             for (int j = 0; j < 100; j++) {
43                 System.out.println("main thread is looping of "+j +" in " + i);
44             }
45             queue2.take();//唤醒queue2
46         }
47         public void sub(int i) throws InterruptedException{
48             queue2.put(1);//阻塞queue2
49             for (int j = 0; j < 10; j++) {
50                 System.out.println("sub thread is looping of "+j +" in " + i);
51             }
52             queue1.take();//唤醒queue1
53         }
54     }
55 }


 1 class Producer implements Runnable {
 2    private final BlockingQueue queue;
 3    Producer(BlockingQueue q) { queue = q; }
 4    public void run() {
 5      try {
 6        while(true) { queue.put(produce()); }
 7      } catch (InterruptedException ex) { ... handle ...}
 8    }
 9    Object produce() { ... }
10  }
12  class Consumer implements Runnable {
13    private final BlockingQueue queue;
14    Consumer(BlockingQueue q) { queue = q; }
15    public void run() {
16      try {
17        while(true) { consume(queue.take()); }
18      } catch (InterruptedException ex) { ... handle ...}
19    }
20    void consume(Object x) { ... }
21  }
23  class Setup {
24    void main() {
25      BlockingQueue q = new SomeQueueImplementation();
26      Producer p = new Producer(q);
27      Consumer c1 = new Consumer(q);
28      Consumer c2 = new Consumer(q);
29      new Thread(p).start();
30      new Thread(c1).start();
31      new Thread(c2).start();
32    }
33  }






关于爱程序网 - 联系我们 - 广告服务 - 友情链接 - 网站地图 - 版权声明 - 人才招聘 - 帮助