CompletionService批量异步执行
前景引入
我们知道线程池可以执行异步任务,同时可以通过返回值Future获取返回值,所以异步任务大多数采用ThreadPoolExecutor+Future,如果存在如下情况,需要从任务一二三中获取返回值后,保存到数据库中,用异步逻辑实现代码应该如下所示。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<Integer> f1 = executorService.submit(() -> {
System.out.println("执行任务一");
return 1;
});
Future<Integer> f2 = executorService.submit(() -> {
System.out.println("执行任务二");
return 2;
});
Future<Integer> f3 = executorService.submit(() -> {
System.out.println("执行任务三");
return 3;
});
Integer r1 = f1.get();
executorService.execute(()->{
// 省略保存r1操作
System.out.println(r1);
});
Integer r2 = f2.get();
executorService.execute(()->{
// 省略保存r2操作
System.out.println(r2);
});
Integer r3 = f3.get();
executorService.execute(()->{
// 省略保存r3操作
System.out.println(r3);
});
executorService.shutdown();
}
这样写的代码一点毛病没有,逻辑都是正常的,但如果存在任务一查询了比较耗时的操作,由于f1.get是阻塞执行,那么就算任务二和任务三已经返回结果,任务二的返回值和任务三的返回值都是不能保存到数据库的,因为f1.get将主线程阻塞了。
批量异步实现
那可以如何处理呢?可以采用万能的阻塞队列,任务先执行完毕的先入队,这样可以保证其它线程入库的速度不受影响,提高效率。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3);
Future<Integer> f1 = executorService.submit(() -> {
System.out.println("执行任务一");
Thread.sleep(5000);
return 1;
});
Future<Integer> f2 = executorService.submit(() -> {
System.out.println("执行任务二");
return 2;
});
Future<Integer> f3 = executorService.submit(() -> {
System.out.println("执行任务三");
Thread.sleep(3000);
return 3;
});
executorService.execute(()->{
try {
Integer r1 = f1.get();
// 阻塞队列入队操作
queue.put(r1);
System.out.println(r1);
} catch (Exception e) {
e.printStackTrace();
}
});
executorService.execute(()->{
try {
Integer r2 = f2.get();
queue.put(r2);
System.out.println(r2);
} catch (Exception e) {
e.printStackTrace();
}
});
executorService.execute(()->{
try {
Integer r3 = f3.get();
queue.put(r3);
System.out.println(r3);
} catch (Exception e) {
e.printStackTrace();
}
});
// 循环次数不要使用queue.size限制,因为不同时刻queue.size值是有可能不同的
for (int i = 0; i <3; i++) {
Integer integer = queue.take();
// 省略保存integer操作
executorService.execute(()->{
System.out.println("保存入库=="+integer);
});
}
executorService.shutdown();
}
产生结果如下
同样的在生产中不建议使用,因为SDK为我们提供了工具类CompletionService,CompletionService内部就维护了一个阻塞队列,唯一与上述代码实现有所区别的是,阻塞队列入库的是Future对象,其余原理类似。
CompletionService
如何创建CompletionService
CompletionService同样是一个接口,其具体实现为ExecutorCompletionService,创建CompletionService对象有两种方式
public ExecutorCompletionService(Executor executor);
public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)
CompletionService对象的创建都是需要指定线程池,如果在创建时没有传入阻塞对象,那么会采用默认的LinkedBlockingQueue无界阻塞队列,如果应用到生产可能会产生OOM的情况,这是需要注意的。
CompletionService初体验
CompletionService如何做到批量执行异步任务呢,将上述场景采用CompletionService实现下
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletionService completionService = new ExecutorCompletionService(executorService);
Future<Integer> f1 = completionService.submit(() -> {
System.out.println("执行任务一");
Thread.sleep(5000);
return 1;
});
Future<Integer> f2 = completionService.submit(() -> {
System.out.println("执行任务二");
return 2;
});
Future<Integer> f3 = completionService.submit(() -> {
System.out.println("执行任务三");
Thread.sleep(3000);
return 3;
});
for (int i = 0; i <3 ; i++) {
Future take = completionService.take();
Integer integer = (Integer) take.get();
executorService.execute(()->{
System.out.println("执行入库=="+integer);
});
}
executorService.shutdown();
}
CompletionService接口说明
CompletionService的方法不多,使用起来比较简单,方法签名如下
// 提交任务到阻塞队列,带返回值的
Future<V> submit(Callable<V> task);
// 提交任务到阻塞队列,和ThreadPoolExecutor的submit方法类似
Future<V> submit(Runnable task, V result);
// 从阻塞队列中出队,阻塞队列空就阻塞
Future<V> take() throws InterruptedException;
// 从阻塞队列中出队 非阻塞,如果阻塞队列为空立即返回null
Future<V> poll();
// 从阻塞队列中出队,非阻塞,如果等待timeout时间后阻塞队列还为空,那么立即返回null
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
总结
CompletionService主要是去解决无效等待的问题,如果一个耗时较长的任务在执行,那么可以采用这种方式避免无效的等待,CompletionService还能让异步任务的执行结果有序化,先执行完就先进入阻塞队列。
本文暂时没有评论,来添加一个吧(●'◡'●)