计算机系统应用教程网站

网站首页 > 技术文章 正文

java线程池核心类ThreadPoolExecutor的应用

btikc 2024-11-21 10:55:39 技术文章 41 ℃ 0 评论

上文java线程池核心类ThreadPoolExecutor概述中,我们知道ThreadPoolExecutor 是用来处理异步任务的一个接口,可以理解为一个线程池和一个任务队列,提交到ExecutorService对象的任务会被放入队列或者直接被线程池中的线程执行。今天我们来讲讲ThreadPoolExecutor的应用。

jdk中Executor框架提供了newFixedThreadPool,newCacheThreadPool,newSingleThreadPool 和newScheduledThreadPool等创建线程池,下面我们对这四种线程池进行详细分析

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue());
}

使用无界任务队列LinkedBlockingQueue,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量maximumPoolSize等于设置的corePoolSize数量。因为是无界任务队列,所以maximumPoolSize这个参数是无效的,哪怕任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加;若后续有新的任务加入,则直接进入队列等待。使用这种队列模式,一定要注意任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直至最后资源被耗尽的问题。

newCacheThreadPool

public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 60L, TimeUnit.SECONDS,
 new SynchronousQueue());
}

使用没有容量的任务队列SynchronousQueue,不会存储数据,每执行一次put就要执行一次take,否则就会阻塞。也就是说提交的任务不会被保存,总是会马上提交执行。如果执行任务的线程数量小于maximumPoolSize,则尝试创建新的线程;如果达到maximumPoolSize,就会执行拒绝策略AbortPolicy抛出异常。使用这种队列模式,需要对程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易执行拒绝策略。

newSingleThreadPool

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
 return new FinalizableDelegatedExecutorService
 (new ThreadPoolExecutor(1, 1,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue(),
 threadFactory));
}

从上端代码可以看出newSingleThreadPool 的初始化和newFixedThreadPool 的 nThreads = 1 的时候是一样的,也就是说线程池中最多只有一个任务在运行,超过的任务都会直接进入队列中等待。除此之外,还多了个FinalizableDelegatedExecutorService代理,FinalizableDelegatedExecutorService返回的ExcutorService在函数finalize()中会调用shutdown(),确保它被回收时调用shutdown()来终止线程。

newScheduledThreadPool

public ScheduledThreadPoolExecutor(int corePoolSize) {
 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 new DelayedWorkQueue());
}

使用无界定时任务队列DelayedWorkQueue。当新添加的任务超过corePoolSize数量,无法直接进入线程池运行,就会被放入定时任务队列等待。放入队列中的任务如果延迟期未满,则无法取出。这种模式用来处理需要定时及周期性执行的任务,因为是无界的,所以一定要注意任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直至最后资源被耗尽的问题。

自定义线程池

//自定义线程工厂
pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5),
 new ThreadFactory() {
 public Thread newThread(Runnable r) {
 System.out.println("线程"+r.hashCode()+"创建");
 //线程命名
 Thread th = new Thread(r,"threadPool"+r.hashCode());
 return th;
 }
}, new ThreadPoolExecutor.CallerRunsPolicy());

虽然jdk提供的这四种线程池处理方法,可以满足大部分应用场景,但都有其极限性,不够灵活;另外由于这四种方法内部也是通过ThreadPoolExecutor方式实现的,所以建议通过ThreadPoolExecutor方式来创建自己的线程池,通过这种方式有利于大家明确线程池的运行规则,创建符合业务场景需要的线程池,避免资源耗尽的风险。

线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory,线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等,上面的代码我们通过ThreadFactory对线程池中创建的线程进行记录与命名。

ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的:

  • beforeExecute:线程池中任务运行前执行。
  • afterExecute:线程池中任务运行完毕后执行。
  • terminated:线程池退出后执行。

通过这三个接口我们可以监控每个任务的开始和结束时间,或者其他一些功能。比如如下范例代码:

//实现自定义接口
pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5),
 new ThreadFactory() {
 public Thread newThread(Runnable r) {
 System.out.println("线程"+r.hashCode()+"创建");
 //线程命名
 Thread th = new Thread(r,"threadPool"+r.hashCode());
 return th;
 }
}, new ThreadPoolExecutor.CallerRunsPolicy()) {

 protected void beforeExecute(Thread t,Runnable r) {
 System.out.println("准备执行");
 }
 
 protected void afterExecute(Runnable r,Throwable t) {
 System.out.println("执行完毕");
 }
 
 protected void terminated() {
 System.out.println("线程池退出");
 }
};

以上通过对常用线程池,自定义线程池的创建和扩展等方面的介绍,让我们对ThreadPoolExecutor类的应用有了一定的了解,从而可以根据自己的需要,灵活配置和使用线程池创建线程。希望对你有所帮助。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表