初识Callable and Future
在编码时,我们可以通过继承Thread或是实现Runnable接口来创建线程,但是这两种方式都存在一个缺陷:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到目的。Java5提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
Callable and Future源码:
(1)Callable接口:
public interface Callable<V> { V call() throws Exception; }
(2)Future接口:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
源码解说:
Callable位于java.util.concurrent包下,它是一个接口,在它里面只声明了一个call()方法。从上面的源码可以看到,Callable是一个泛型接口,call()函数返回的类型就是传递进来的泛型实参类型。
Future类位于java.util.concurrent包下,Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果,其cancel()方法的参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置为true,则表示可以取消正在执行过程中的任务;get()方法用来获取执行结果,该方法会阻塞直到任务返回结果。
Callable and Future示例:
(1)下面的示例是一个Callable,它会采用最明显的方式查找数组的一个分段中的最大值。
import java.util.concurrent.Callable; class FindMaxTask implements Callable<Integer> { private int[] data; private int start; private int end; FindMaxTask(int[] data, int start, int end) { this.data = data; this.start = start; this.end = end; } public Integer call() { int max = Integer.MIN_VALUE; for (int i = start; i < end; i++) { if (data[i] > max) max = data[i]; } return max; } }
(2)将Callable对象提交给一个Executor,它会为每个Callable对象创建一个线程,如下代码段所示:
import java.util.concurrent.*; public class MultithreadedMaxFinder { public static int max(int[] data) throws InterruptedException, ExecutionException { if (data.length == 1) { return data[0]; } else if (data.length == 0) { throw new IllegalArgumentException(); } // split the job into 2 pieces FindMaxTask task1 = new FindMaxTask(data, 0, data.length/2); FindMaxTask task2 = new FindMaxTask(data, data.length/2, data.length); // spawn 2 threads ExecutorService service = Executors.newFixedThreadPool(2); Future<Integer> future1 = service.submit(task1); Future<Integer> future2 = service.submit(task2); return Math.max(future1.get(), future2.get()); } }
补充:
ExecutorService接口中声明了若干个不同形式的submit()方法,各个方法的返回类型为Future类型,如下:
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
因为Future只是一个接口,所以是无法直接用来创建对象来使用的,因此就有了下面的FutureTask,FutureTask目前是Future接口的一个唯一实现类。在Java并发程序中FutureTask表示一个可以取消的异步运算。它有启动和取消运算、查询运算是否完成和取回运算结果等方法。只有当运算完成的时候结果才能取回,如果运算尚未完成get方法将会阻塞。
FutureTask实现了RunnableFuture接口,其声明如下:
public class FutureTask<V> implements RunnableFuture<V>
RunnableFuture接口定义如下:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
解说:
因RunnableFuture接口继承Runnable接口和Future接口,FutureTask实现了RunnableFuture接口,所以FutureTask既可以作为Runnable被线程执行(Thread接收Runnable类型的参数),又可以提交给Executor来执行以得到返回值(ExecutorService.submit(Runnable task))。
FutureTask构造函数:
FutureTask的构造函数接收不同形式的参数,如下:
public FutureTask(Callable<V> callable) { } public FutureTask(Runnable runnable, V result) { }
观察下述两个示例代码中FutureTask的使用方式
示例一:
FutureTask将被作为Runnable被线程执行
(1)任务线程ThreadC:
package demo.thread; import java.util.concurrent.Callable; //实现Callable接口,call()方法可以有返回结果 public class ThreadC implements Callable<String> { @Override public String call() throws Exception { try {//模拟任务,执行了500毫秒; Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } return "thread B"; } }
(2)主线程ThreadMain:
package demo.thread; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class ThreadMain { public static void main(String[] args) { ThreadC threadc = new ThreadC(); FutureTask<String> faeature = new FutureTask<String>(threadc); new Thread(faeature).start();//注意启动方式,FutureTask将被作为Runnable被线程执行 System.out.println("这是主线程;begin!"); //注意细细体会这个,只有主线程get了,主线程才会继续往下执行 try { System.out.println("得到的返回结果是:"+faeature.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("这是主线程;end!"); } }
示例二:
FutureTask被提交给Executor执行以得到返回值
public class Test { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); executor.submit(futureTask);//FutureTask被提交给Executor执行以得到返回值 executor.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主线程在执行任务"); try { System.out.println("task运行结果"+futureTask.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任务执行完毕"); } } class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { Thread.sleep(3*1000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } }
CompletionService
ExecutorCompletionService
ExecutorCompletionService实现了CompletionService,融合了线程池Executor和阻塞队列BlockingQueue的功能,将计算部分委托给一个Executor。
(1)构造函数:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
(2)任务提交:
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
从上述submit()方法可以看出,当提交某个任务时,该任务首先将被包装为一个QueueingFuture
(3)QueueingFuture源码:
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
参考资料:
(1)http://ifeve.com/futuretask-source/
(2)http://www.tuicool.com/articles/umyy6b