爱程序网

java并发:阻塞队列

来源: 阅读:

第一节 阻塞队列

1.1 初识阻塞队列

  队列以一种先进先出的方式管理数据,阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空;当队列满时,存储元素的线程会等待队列可用。在多线程进行合作时,阻塞队列是很有用的工具。

  生产者-消费者模式:阻塞队列常用于生产者和消费者的场景,生产者线程可以定期的把中间结果存到阻塞队列中,而消费者线程把中间结果取出并在将来修改它们。队列会自动平衡负载,如果生产者线程集运行的比消费者线程集慢,则消费者线程集在等待结果时就会阻塞;如果生产者线程集运行的快,那么它将等待消费者线程集赶上来。

    

  简单解说一下如何理解上表,比如说阻塞队列的插入方法,add(e)、offer(e)、put(e)等均为阻塞队列的插入方法,但它们的处理方式不一样,add(e)方法可能会抛出异常,而put(e)方法则可能一直处于阻塞状态,下面解说一下这些处理方式:

  A、抛出异常:所谓抛出异常是指当阻塞队列满时,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常;当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。

  B、返回特殊值:插入方法,该方法会返回是否成功,成功则返回true;移除方法,该方法是从队列里拿出一个元素,如果没有则返回null

  C、一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到将数据放入队列或是响应中断退出;当队列为空时,消费者线程试图从队列里take元素,队列也会一直阻塞消费者线程,直到队列可用。

  D、超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

 

1.2 Java中的阻塞队列

  java.util.concurrent包提供了几种不同形式的阻塞队列,如数组阻塞队列ArrayBlockingQueue、链表阻塞队列LinkedBlockingQueue、优先级阻塞队列PriorityBlockingQueue和延时队列DelayQueue等,下面简单介绍一下这几个阻塞队列:

  数组阻塞队列:ArrayBlockingQueue是一个由数组支持的有界阻塞队列,内部维持着一个定长的数据缓冲队列(该队列由数组构成),此队列按照先进先出(FIFO)的原则对元素进行排序,在构造时需要给定容量。ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

  对于数组阻塞队列,可以选择是否需要公平性,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。

  我们可以使用以下代码创建一个公平的阻塞队列:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

  数组阻塞队列的公平性是使用可重入锁实现的,其构造函数代码如下:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
      throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

  链表阻塞队列:LinkedBlockingQueue基于链表的有界阻塞队列,内部维持着一个数据缓冲队列(该队列由链表构成),此队列按照先进先出的原则对元素进行排序。当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(可以通过LinkedBlockingQueue的构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程将会被唤醒,反之对于消费者这端的处理也基于同样的原理。需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小(Integer.Max_VALUE)的容量,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已经被消耗殆尽了。

  LinkedBlockingQueue之所以能够高效的处理并发数据,是因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

  优先级阻塞队列:PriorityBlockingQueue是一个支持优先级排序的无界阻塞队列,默认情况下元素采取自然顺序排列,也可以通过构造函数传入的Compator对象来决定。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只是在没有可消费的数据时阻塞数据的消费者,因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。

  延时队列:DelayQueue是一个支持延时获取元素的使用优先级队列实现的无界阻塞队列。队列中的元素必须实现Delayed接口和Comparable接口(用以指定元素的顺序),也就是说DelayQueue里面的元素必须有public void compareTo(To)和long getDelay(TimeUnit unit)方法存在;在创建元素时可以指定多久才能从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。

  链表双向阻塞队列:LinkedBlockingDeque是由一个链表结构组成的双向阻塞队列。所谓双向队列指的是你可以从队列的两端插入和移出元素,双端队列因多了一个操作入口,在多线程同时入队时减少了一半的竞争。在初始化LinkedBlockingDeque时,可以设置容量,防止其过渡膨胀,相比其他的阻塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素;以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素;插入方法add等同于addLast,移除方法remove等同于removeFirst。双向阻塞队列可以运用在“工作窃取”模式中。

  链表传输队列:LinkedTransferQueue是一个由链表结构组成的无界传输阻塞队列,相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer()方法和transfer()方法。

  transfer()方法:如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法),transfer()方法可以把生产者传入的元素立刻传输给消费者;如果没有消费者在等待接收元素,transfer()方法会将元素存放到队列的tail节点,并等到该元素被消费者消费了才返回。

  transfer()方法的关键代码如下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

  代码解说:第一行代码是试图把存放当前元素的s节点作为tail节点,第二行代码是让CPU自旋等待消费者消费元素。因为自旋会消耗CPU,所以自旋一定的次数后使用Thread.yield()方法来暂停当前正在执行的线程,并执行其他线程。

  tryTransfer()方法:该方法是用来试探生产者传入的元素是否能直接传给消费者,如果没有消费者等待接收元素,则返回false。与transfer()方法的区别:tryTransfer()方法是立即返回(无论消费者是否接收),transfer()方法是必须等到消费者消费了才返回。对于带有时间限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间之后再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

  欲了解LinkedTransferQueue的更多内容,可查看以下文章:http://ifeve.com/java-transfer-queue/、http://www.tuicool.com/articles/ZFriEz、http://guojuanjun.blog.51cto.com/277646/948298/,本文不细述。

  SynchronousQueue:SynchronousQueue是一种无界、无缓冲的阻塞队列,可以认为SynchronousQueue是一个缓存值为1的阻塞队列,但是SynchronousQueue内部并没有数据缓存空间,数据是在配对的生产者和消费者线程之间直接传递的。可以这样来理解:SynchronousQueue是一个传球手,SynchronousQueue不存储数据元素,队列头元素是第一个排队要插入数据的线程,而不是要交换的数据,SynchronousQueue负责把生产者线程处理的数据直接传递给消费者线程,生产者和消费者互相等待对方,握手,然后一起离开。SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。

 

1.4 详解SynchronousQueue

【本小节主要摘自参考资料,作为学习笔记O(∩_∩)O】

(1)认识SynchronousQueue 

  SynchronousQueue的isEmpty()方法永远返回true,remainingCapacity()方法永远返回0,remove()和removeAll() 方法永远返回false,iterator()方法永远返回null,peek()方法永远返回null,故我们不能通过调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,同样遍历这个队列的操作也是不允许的。

  SynchronousQueue的一个使用场景是在线程池里,Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要创建新的线程(新任务到来时),当然如果有空闲线程的话,则会复用这些线程。

(2)SynchronousQueue实现机制

  阻塞算法的实现通常是在内部采用一个锁来保证在多个线程中的put()和take()方法是串行执行的,如下代码是一般put()和take()方法的实现:

public class NativeSynchronousQueue<E> {
    boolean putting = false;
    E item = null;

    public synchronized E take() throws InterruptedException {
        while (item == null)
            wait();
        E e = item;
        item = null;
        notifyAll();
        return e;
    }

    public synchronized void put(E e) throws InterruptedException {
        if (e==null) return;
        while (putting)
            wait();
        putting = true;
        item = e;
        notifyAll();
        while (item!=null)
            wait();
        putting = false;
        notifyAll();
    }
}

  经典同步队列的实现采用了三个信号量,代码如下:

public class SemaphoreSynchronousQueue<E> {
    E item = null;
    Semaphore sync = new Semaphore(0);
    Semaphore send = new Semaphore(1);
    Semaphore recv = new Semaphore(0);

    public E take() throws InterruptedException {
        recv.acquire();
        E x = item;
        sync.release();
        send.release();
        return x;
    }

    public void put (E x) throws InterruptedException{
        send.acquire();
        item = x;
        recv.release();
        sync.acquire();
    }
}

  Java5中SynchronousQueue的实现相对来说做了一些优化,它只使用了一个锁,使用队列代替信号量,允许发布者直接发布数据,而不是要首先从阻塞在信号量处被唤醒,代码如下:

public class Java5SynchronousQueue<E> {
    ReentrantLock qlock = new ReentrantLock();
    Queue waitingProducers = new Queue();
    Queue waitingConsumers = new Queue();

    static class Node extends AbstractQueuedSynchronizer {
        E item;
        Node next;

        Node(Object x) { item = x; }
        void waitForTake() { /* (uses AQS) */ }
           E waitForPut() { /* (uses AQS) */ }
    }

    public E take() {
        Node node;
        boolean mustWait;
        qlock.lock();
        node = waitingProducers.pop();
        if(mustWait = (node == null))
           node = waitingConsumers.push(null);
         qlock.unlock();

        if (mustWait)
           return node.waitForPut();
        else
            return node.item;
    }

    public void put(E e) {
         Node node;
         boolean mustWait;
         qlock.lock();
         node = waitingConsumers.pop();
         if (mustWait = (node == null))
             node = waitingProducers.push(e);
         qlock.unlock();

         if (mustWait)
             node.waitForTake();
         else
            node.item = e;
    }
}

  Java6中SynchronousQueue的实现采用了一种性能更好的无锁算法——扩展的“Dual stack and Dual queue”算法,性能比Java5的实现有较大提升。

  声明一个SynchronousQueue有两种不同的方式,支持公平和非公平两种竞争机制,它们之间有着不太一样的行为,公平模式和非公平模式的区别:如果采用公平模式,SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者;如果是非公平模式(SynchronousQueue默认),SynchronousQueue会采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者。两者性能相当,一般情况下,Fifo通常可以支持更大的吞吐量,但Lifo可以更大程度的保持线程的本地化,需要注意的是,若采用非公平模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况(可能某些生产者或者消费者的数据永远都得不到处理)。

(3)参考资料

(1)http://ifeve.com/java-synchronousqueue/

(2)http://blog.itpub.net/29644969/viewspace-1169051/

 

 

第二节 使用示例

2.1 生产者-消费者示例

一个生产者-N个消费者,程序功能:在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。

package com.test;

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class BlockingQueueTest {
    public static void main(String[] args) {
        Scanner in = new Scanner(System.in);
        System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
        String directory = in.nextLine();
        System.out.print("Enter keyword (e.g. volatile): ");
        String keyword = in.nextLine();
        final int FILE_QUEUE_SIZE = 10;
        final int SEARCH_THREADS = 100;
        BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
        FileEnumerationTask enumerator = new FileEnumerationTask(queue,new File(directory));
        new Thread(enumerator).start();
        for (int i = 1; i <= SEARCH_THREADS; i++)
            new Thread(new SearchTask(queue, keyword)).start();
    }
}

/**
 * This task enumerates all files in a directory and its subdirectories.
 */
class FileEnumerationTask implements Runnable {
    /**
     * Constructs a FileEnumerationTask.
     * 
     * @param queue
     *            the blocking queue to which the enumerated files are added
     * @param startingDirectory
     *            the directory in which to start the enumeration
     */
    public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory) {
        this.queue = queue;
        this.startingDirectory = startingDirectory;
    }

    public void run() {
        try {
            enumerate(startingDirectory);
            queue.put(DUMMY);
        } catch (InterruptedException e) {
        }
    }

    /**
     * Recursively enumerates all files in a given directory and its
     * subdirectories
     * 
     * @param directory
     *            the directory in which to start
     */
    public void enumerate(File directory) throws InterruptedException {
        File[] files = directory.listFiles();
        for (File file : files) {
            if (file.isDirectory())
                enumerate(file);
            else
                queue.put(file);
        }
    }

    public static File DUMMY = new File("");
    private BlockingQueue<File> queue;
    private File startingDirectory;
}

/**
 * This task searches files for a given keyword.
 */
class SearchTask implements Runnable {
    /**
     * Constructs a SearchTask.
     * 
     * @param queue
     *            the queue from which to take files
     * @param keyword
     *            the keyword to look for
     */
    public SearchTask(BlockingQueue<File> queue, String keyword) {
        this.queue = queue;
        this.keyword = keyword;
    }

    public void run() {
        try {
            boolean done = false;
            while (!done) {
                File file = queue.take();
                if (file == FileEnumerationTask.DUMMY) {
                    queue.put(file);
                    done = true;
                } else
                    search(file);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
        }
    }

    /**
     * Searches a file for a given keyword and prints all matching lines.
     * 
     * @param file
     *            the file to search
     */
    public void search(File file) throws IOException {
        Scanner in = new Scanner(new FileInputStream(file));
        int lineNumber = 0;
        while (in.hasNextLine()) {
            lineNumber++;
            String line = in.nextLine().trim();
            if (line.contains(keyword))
                System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber,
                        line);
        }
        in.close();
    }

    private BlockingQueue<File> queue;
    private String keyword;
}

解说:上述程序展示了如何使用阻塞队列来控制线程集,生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中,同时我们还启动了大量的搜索线程,每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。

  在上述代码中,我们使用了一个小技巧来在工作结束后终止线程,为了发出完成信号,枚举线程把一个虚拟对象放入队列,当搜索线程取到这个虚拟对象时,就将其放回并终止(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包)。

注意:在这个程序中,我们使用的是ArrayBlockingQueue,使用队列数据结构作为一种同步机制,这里不需要人任何显示的线程同步。

对比分析:

  ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。

  ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象,这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。

 

2.2 DelayQueue使用示例

我们可以将延时队列DelayQueue运用在以下场景中:

  (1)缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

  (2)定时任务调度:使用DelayQueue保存当天将要执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行任务,比如TimerQueue就是使用DelayQueue实现的。

DelayQueue使用实例如下:

(1)实现一个Student对象作为DelayQueue的元素,Student必须实现Delayed接口的两个方法

package com.test;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Student implements Delayed {//必须实现Delayed接口
    
    private String name;
    private long submitTime;// 交卷时间
    private long workTime;// 考试时间

    public Student(String name, long submitTime) {
        this.name = name;
        this.workTime = submitTime;
        this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS) + System.nanoTime();
        System.out.println(this.name + " 交卷,用时" + workTime);
    }

    public String getName() {
        return this.name + " 交卷,用时" + workTime;
    }
    
    //必须实现getDelay方法
    public long getDelay(TimeUnit unit) {
        //返回一个延迟时间
        return unit.convert(submitTime - System.nanoTime(), unit.NANOSECONDS);
    }

    //必须实现compareTo方法
    public int compareTo(Delayed o) {
        Student that = (Student) o;
        return submitTime > that.submitTime ? 1 : (submitTime < that.submitTime ? -1 : 0);
    }

}

(2)主线程程序

package com.test;
import java.util.concurrent.DelayQueue;
public class DelayQueueTest {
    public static void main(String[] args) throws Exception {
        
        // 新建一个等待队列
        final DelayQueue<Student> bq = new DelayQueue<Student>();
for (int i = 0; i < 5; i++) {
            Student student = new Student("学生"+i,Math.round((Math.random()*10+i)));
            bq.put(student); // 将数据存到队列里!
        }
        //获取但不移除此队列的头部;如果此队列为空,则返回 null。
        System.out.println(bq.peek().getName());
    }
}

上述程序运行结果如下:

学生0 交卷,用时8
学生1 交卷,用时9
学生2 交卷,用时4
学生3 交卷,用时9
学生4 交卷,用时12
学生2 交卷,用时4

 

 

第三节 使用阻塞式队列处理大数据

 鄙人暂时还没有研究这部分内容,此处仅贴出两个资源,以供后续学习

(1)http://blog.csdn.net/lifetragedy/article/details/50593588

(2)http://download.csdn.net/detail/lifetragedy/9419773

 

第四节 参考资料

(1)http://www.cnblogs.com/dolphin0520/p/3932906.html

关于爱程序网 - 联系我们 - 广告服务 - 友情链接 - 网站地图 - 版权声明 - 人才招聘 - 帮助