计算机系统应用教程网站

网站首页 > 技术文章 正文

为什么有了线程,还要使用线程池?

btikc 2024-10-22 10:31:49 技术文章 6 ℃ 0 评论

先来思考,既然有了线程,为什么还要线程池呢?那线程池底层是怎么实现的呢?在实际开发中,怎么用才是最优的呢?

线程VS线程池

先写两个Demo测试下,分别使用线程跟线程池来给集合添加元素,循环100000次,看下运行结果

  • 线程
package com.yqj.threadpool;  
  
import java.util.ArrayList;  
import java.util.List;  
import java.util.Random;  
  
/**  
 * 线程测试  
 * @author Zhao Yun Long  
 * @version V1.0  
 * @date 2022/11/16 15:06  
 */public class ThreadDemo {  
  
    public static void main(String[] args) throws InterruptedException {  
        long start = System.currentTimeMillis();  
        final Random random = new Random();  
        final List<Integer> list = new ArrayList<Integer>();  
        for (int i = 0; i < 100000; i++) {  
            Thread thread = new Thread(){  
                @Override  
                public void run() {  
                    list.add(random.nextInt());  
                }  
            };  
            thread.start();  
            thread.join();  
        }  
        System.out.println("总共用时:"+(System.currentTimeMillis()-start)/1000 + "秒");  
        System.out.println("大小:"+list.size());  
    }  
}
复制代码

运行结果:

  • 线程池
package com.yqj.threadpool;  
  
import java.util.ArrayList;  
import java.util.List;  
import java.util.Random;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.TimeUnit;  
  
/**  
 * 线程池测试  
 * @author Zhao Yun Long  
 * @version V1.0  
 * @date 2022/11/16 17:06  
 */public class ThreadPoolDemo {  
  
    public static void main(String[] args) throws InterruptedException {  
        long start = System.currentTimeMillis();  
        final Random random = new Random();  
        final List<Integer> list = new ArrayList<Integer>();  
        ExecutorService executorService = Executors.newSingleThreadExecutor();  
        for (int i = 0; i < 100000; i++) {  
            executorService.execute(new Runnable() {  
                @Override  
                public void run() {  
                    list.add(random.nextInt());  
                }  
            });  
        }  
        executorService.shutdown();  
        executorService.awaitTermination(1, TimeUnit.DAYS);  
        System.out.println("总共用时:" + (System.currentTimeMillis()-start) + "毫秒");  
        System.out.println("大小:"+list.size());  
    }  
}
复制代码

运行结果:

使用线程:总耗时17秒多, 使用线程池:总耗时40毫秒

为什么差别这么大?

首先我们在代码中打印出当前线程的名称: 两个demo增加如下代码:

@Override  
public void run() {  
    list.add(random.nextInt());  
    //===========增加的代码=============  
    System.out.println("当前线程名称:"+Thread.currentThread().getName());  
}
复制代码

我们再次运行代码发现ThreadDemo中每次执行的线程都不一样,最终执行完总共创建了100000个线程,而ThreadPoolDemo始终只有一个线程。说明线程池对thread-1线程进行了复用,这样的好处就是大大减少了线程的创建以及销毁的次数,而我们之前的文章提到过,线程的创建与销毁耗时耗资源。

线程

线程池:

线程池的优点:

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 可以对线程做统一管理。

线程池实现原理

在讲线程池原理之前,有几个核心的参数一定要知道:

  1. corePoolSize:核心线程数 线程池中有核心线程跟非核心线程两种,核心线程默认会一直在线程池中,即使他当前并没有在处理任务。
  2. maximumPoolSize:允许创建的最大线程数 maximumPoolSize=核心线程数+非核心线程数
  3. keepAliveTime:非核心线程闲置超时时长 非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置allowCoreThreadTimeOut(true),则会也作用于核心线程。
  4. TimeUnit unit:keepAliveTime的时间单位
  5. workQueue:等待队列
  6. ThreadFactory: 创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂
  7. handler:拒绝处理策略

整个线程池相关的源码结构大致如下:

线程池本身有一个调度线程,这个线程就是用于管理布控整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。 线程池也有自己的状态:

// runState is stored in the high-order bits 
private static final int RUNNING = -1 << COUNT_BITS; 
private static final int SHUTDOWN = 0 << COUNT_BITS; 
private static final int STOP = 1 << COUNT_BITS; 
private static final int TIDYING = 2 << COUNT_BITS; 
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码

其中COUNT_BITS是 int 位数
private static final int COUNT_BITS = Integer.SIZE - 3; //Integer.SIZE=32
所以实际 COUNT_BITS = 29,
用上面的5个常量表示线程池的状态,实际上是使用32位中的高3位表示;

工作原理:

int c = ctl.get();
1、判断当前的线程数是否小于corePoolSize如果是,
使用入参任务通过addWord方法创建一个新的线程,
如果能完成新线程创建exexute方法结束,成功提交任务;
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
2、在第一步没有完成任务提交;状态为运行并且能成功加入任务到工作队列后,
再进行一次check,如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了)
非运行状态下当然是需要reject;
然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false); 判断当前工作线程池数是否为0  
        如果是创建一个null任务,任务在堵塞队列存在了就会从队列中取出 这样做的意义是
        保证线程池在running状态必须有一个任务在执行
        
        
        
}
3、如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,
则是线程池已经shutdown或者线程池已经达到饱和状态,所以reject;
从上面新增任务的execute方法也可以看出,拒绝策略不仅仅是在饱和状态下使用,
在线程池进入到关闭阶段同样需要使用到;
上面的几行代码还不能完全清楚这个新增任务的过程,
肯定还需要清楚addWorker方法才行:
else if (!addWorker(command, false))
    reject(command);
复制代码
  1. 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,如果能完成新线程创建exexute方法结束,成功提交任务;
  2. 在第一步没有完成任务提交;状态为运行并且能成功加入任务到工作队列后,再进行一次check,如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
  3. 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池已经达到饱和状态,所以reject;
    从上面新增任务的execute方法也可以看出,拒绝策略不仅仅是在饱和状态下使用,在线程池进入到关闭阶段同样需要使用到;

我们再通过一张图来再梳理下:

四种常见的线程池

Executors类中提供的几个静态方法来创建线程池。大家到了这一步,如果看懂了前面讲的ThreadPoolExecutor构造方法中各种参数的意义,那么一看到Executors类中提供的线程池的源码就应该知道这个线程池是干嘛的。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
复制代码

CacheThreadPool的运行流程如下:

  1. 提交任务进线程池。
  2. 因为corePoolSize为0的关系,不创建核心线程,线程池最大为Integer.MAX_VALUE。
  3. 尝试将任务添加到SynchronousQueue队列。
  4. 如果SynchronousQueue入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从SynchronousQueue拉取任务并在当前线程执行。
  5. 如果SynchronousQueue已有任务在等待,入列操作将会阻塞。

当需要执行很多短时间的任务时,CacheThreadPool的线程复用率比较高, 会显著地提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
复制代码

核心线程数量和总线程数量相等,都是传入的参数nThreads,所以只能创建核心线程,不能创建非核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。

与CachedThreadPool的区别

  • 因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核心线程。 而CachedThreadPool因为corePoolSize=0,所以只会创建非核心线程。
  • 在 getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。 CachedThreadPool会在60s后收回。
  • 由于线程不会被回收,会一直卡在阻塞,所以没有任务的情况下, FixedThreadPool占用资源更多
  • 都几乎不会触发拒绝策略,但是原理不同。FixedThreadPool是因为阻塞队列可以很大(最大为Integer最大值),故几乎不会触发拒绝策略;CachedThreadPool是因为线程池很大(最大为Integer最大值),几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
复制代码

有且仅有一个核心线程( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。

newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}
复制代码

实际使用

四种常见的线程池基本够我们使用了,但是《阿里巴巴开发手册》不建议我们直接使用Executors类中的线程池,而是通过ThreadPoolExecutor的方式,按照实际场景进行线程池的参数设置,这样需要我们更加明确线程池的运行规则,规避资源耗尽的风险。

原文:https://juejin.cn/post/7167361998496530469

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表