计算机系统应用教程网站

网站首页 > 技术文章 正文

ThreadPoolExecutor 线程池详解,再也不用预定义线程池了

btikc 2024-11-21 10:54:34 技术文章 39 ℃ 0 评论

为什么要用线程池?

Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来 3个好处。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整 T1,T3 时间的技术,从而提高服务器程序性能的。它把 T1,T3 分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有 T1,T3 的开销了。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。假设一个服务器一天要处理 50000 个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为 50000。一般线程池大小是远小于 50000。所以利用线程池的服务器程序不会为了创建50000 而在处理请求时浪费时间,从而提高效率。

ThreadPoolExecutor 的类关系

Executor 是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开来。ExecutorService 接口继承了 Executor,在其上做了一些 shutdown()、submit()的扩展,可以说是真正的线程池接口;

AbstractExecutorService 抽象类实现了 ExecutorService 接口中的大部分方法;

ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。

ScheduledExecutorService 接口继承了 ExecutorService 接口,提供了带"周期执行"功能 ExecutorService;

ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。

线程池的创建各个参数含义

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, longkeepAliveTime,
                          TimeUnit unit,BlockingQueue<Runnable> workQueue,
                          ThreadFactorythreadFactory,RejectedExecutionHandler handler)


corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize;

如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;

如果执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize

keepAliveTime

线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于 corePoolSize 时才有用

TimeUnit :keepAliveTime 的时间单位

workQueue:workQueue 必须是 BlockingQueue 阻塞队列。当线程池中的线程数超过它的corePoolSize 的时候,线程会进入阻塞队列进行阻塞等待。通过 workQueue,线程池实现了阻塞功能。(之前有篇文章讲解了阻塞队列)

workQueue(之前有篇文章讲解了阻塞队列)

用于保存等待执行的任务的阻塞队列,一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。

1)当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize。

2)由于 1,使用无界队列时 maximumPoolSize 将是一个无效参数。

3)由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数。

4)更重要的,使用无界 queue 可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。

所以我们一般会使用,ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue。

threadFactory

创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。

RejectedExecutionHandler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:

(1)AbortPolicy:直接抛出异常,默认策略;

(2)CallerRunsPolicy:用调用者所在的线程来执行任务;

(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

(4)DiscardPolicy:直接丢弃任务;

当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

扩展线程池

能扩展线程池的功能吗?比如在任务执行的前后做一点我们自己的业务工作?实际上,JDK 的线程池已经为我们预留的接口,在线程池ThreadPoolExecutor类中,有 2个方法是空的,就是给我们预留的。还有一个线程池退出时会调用的方法。

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

每个任务执行前后都会调用 beforeExecute 和 afterExecute 方法。相当于执行了一个切面。而在调用 shutdown 方法后则会调用 terminated 方法。

看一下下面扩展线程池demo:

package cn.pine;

import java.util.Random;
import java.util.concurrent.*;

/**
 *类说明:扩展线程池的使用范例
 */
public class ThreadPoolExt {
    static class Worker implements Runnable
    {
        private String taskName;
        private Random r = new Random();

        public Worker(String taskName){
            this.taskName = taskName;
        }

        public String getName() {
            return taskName;
        }

        @Override
        public void run(){
            System.out.println(Thread.currentThread().getName()
            		+" process the task : " + taskName);
            try {
                TimeUnit.MILLISECONDS.sleep(r.nextInt(100)*5);
            } catch (InterruptedException e) {
            }
        }
    }


    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService threadPool = new ThreadPoolExecutor(2, 4, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy()){
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("beforeExecute"+((Worker)r).getName());
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("afterExecute "+((Worker)r).getName());
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出 ");
            }
        };

        for (int i = 0; i <= 6; i++){
            Worker worker = new Worker("worker " + i);
            System.out.println("A new task has been added : " + worker.getName());
            threadPool.execute(worker);
        }
        threadPool.shutdown();

    }
}

运行结果:

A new task has been added : worker 0

A new task has been added : worker 1

A new task has been added : worker 2

A new task has been added : worker 3

beforeExecuteworker 0

beforeExecuteworker 1

A new task has been added : worker 4

pool-1-thread-1 process the task : worker 0

pool-1-thread-2 process the task : worker 1

A new task has been added : worker 5

A new task has been added : worker 6

afterExecute worker 0

beforeExecuteworker 2

pool-1-thread-1 process the task : worker 2

afterExecute worker 1

beforeExecuteworker 3

pool-1-thread-2 process the task : worker 3

afterExecute worker 2

beforeExecuteworker 4

pool-1-thread-1 process the task : worker 4

afterExecute worker 3

beforeExecuteworker 5

pool-1-thread-2 process the task : worker 5

afterExecute worker 4

beforeExecuteworker 6

pool-1-thread-1 process the task : worker 6

afterExecute worker 5

afterExecute worker 6

线程池退出

Process finished with exit code 0

线程池的工作机制

1)如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2)如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。

3)如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务。

4)如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。

提交任务

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。

submit()方法用于提交需要返回值的任务。线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过 future的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

关闭线程池

可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断 所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方法。

合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

?任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。

?任务的优先级:高、中和低。

?任务的执行时间:长、中和短。

?任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。

CPU 密集型任务应配置尽可能小的线程,如配置 Ncpu+1 个线程的线程池。由于 IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。

混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU 个数。

优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用 CPU。

建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。

假设,我们现在有一个 Web 系统,里面使用了线程池来处理业务,在某些情况下,系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行 SQL 变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。


如果觉得对你有用记得点个赞分享关注一下哦。

你们的支持是我的最大动力。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表