JAVA线程池源码解析与使用

频繁的创建与销毁线程是非常浪费系统资源的行为,多线程编程中是必要考虑到线程复用,线程池就是实现线程复用的一种方式,看看JAVA的线程池如何让使用都有哪些关键参数

所属分类 JAVA

相关标签 任务多线程线程池

ThreadPoolExecutor

在JAVA中,线程池最核心的类是 java.uitl.concurrent.ThreadPoolExecutor 。

线程池的主要作用就是存放线程,当需要使用线程时,优先从池中取用一个空闲的线程。

线程池主要实现了线程的复用,无需频繁的创建和销毁线程。

因此,线程池也要具备动态扩容、超时销毁、排队等待等功能。

ThreadPoolExecutor 类图如下:

/static/upload/post/1647306208730.png

  1. Executor接口:顶层,仅定义了一个方法(用于执行任务)
  2. ExecutorService接口:继承并拓展Executor接口,添加一些新的方法,如操控线程池生命周期的shutDown()、shutDownNow()等,可异步跟踪执行任务生成返回值Future方法、submit()等
  3. AbstractExecutorService抽象类:实现了ExecutorService中声明的方法,除了execute(Runnable command)

ThreadPoolExecutor 提供了4个构造函数,如下图:

/static/upload/post/1647306846011.png

前三个构造函数,基本上是调用了第四个构造函数,通过构造函数基本上知道了数据结构和各个核心树形的意义。

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory.
     *
     * @param corePoolSize 核心线程数量
     * @param maximumPoolSize 最大线程数量
     * @param keepAliveTime 非核心线程的最大存活时间,仅对超出核心线程之外生效
     * @param unit 用于keepAliveTime的时间单位,如秒、小时、天等
     * @param workQueue 等待执行的任务队列,提供了三种队列
     * @param threadFactory 线程工厂类,主要用来创建线程
     * @param handler 拒绝策略,线程和队列都满了的情况触发
     * @throws IllegalArgumentException 抛出参数异常
     * @throws NullPointerException 抛出空指针异常
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

工作原理

上文大致介绍了,线程池核心类的实现,本节主要深入学习下线程池的工作原理。

线程池的状态主要使用原子操作类AtomicInteger ctl 来标识。

ctl 其中高3位表示线程池状态,低29位表示目前的线程数量。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits(高3位)
    // 高3位为1,可以接收新任务,处理队列中等待的任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 高3位为0,不再接收新任务,处理队列中等待的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 高3位为001,不再接收新任务,不处理队列中等待的任务,尝试终端运行中任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 高3位为010,所有任务被终止,workerCount为0,此状态将调用terminated()方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 高3位为100,terminated()方法调用完成后变成此状态
    private static final int TERMINATED =  3 << COUNT_BITS;

相关方法:

  • runStateOf(int c) :获取高3位保存的线程池状态
  • workerCountOf(int c):获取低29位的线程数量
  • ctlOf(int rs, int wc):rs表示runState,wc表示workerCount,打包合并成ctl

线程池默认情况下是不进行线程的初始化的(当然提供了预创建的初始化方法)。

  • prestartCoreThread():初始化一个核心线程
  • prestartAllCoreThreads():初始化所有核心线程

线程初始化一般发生在提交一个任务的时候,集中在execute()方法。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 如果当前线程小于核心线程,创建一个新线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 线程池是否在运行
        // 如果在运行,队列是否允许插入,插入成功,再次验证线程池是否运行
        // 如果不在运行,移除插入的任务,然后抛出拒绝策略
        // 如果在运行,没有线程了,创建一个新线程(注意是空线程)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 添加非核心线程失败,就直接拒绝
        else if (!addWorker(command, false))
            reject(command);
    }

上文可知,创建线程主要调用了 addWorker()。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 判断线程池的状态,如果大于或等SHUTDOWN,不处理提交的任务,直接返回
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;
            // 判断当前需要创建的线程是否为核心线程,入参core为true,且当前线程数小于corePoolSize,跳出循环,开始创建新线程
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                // 自旋方法,goto语法
            }
        }
        // 创建线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        ThreadPoolExecutor.Worker w = null;
        try {
            w = new ThreadPoolExecutor.Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 获取线程池主锁
                // 工作线程通过Woker类实现,ReentrantLock锁保证线程安全
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 添加线程到workers中(线程池中)
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 启动新建的线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

为什么 worker 可以承载线程?

/static/upload/post/1647324929186.png

Worker 类继承了AQS,并实现了Runnable接口。

它有两个重要的成员变量:firstTask和thread。

  • firstTask 用于保存第一次新建的任务
  • thread 是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程

上文提到将 worker 添加到 workers 。

    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

workers 是一个 hashSet。

所以,线程池底层的存储结构其实就是一个 HashSet。

workQueue,它用来存放等待执行的任务。

workQueue的类型为BlockingQueue,通常可以取下面三种类型:

  1. ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
  2. LinkedBlockingQueue:基于链表的先进先出队列,如创建没有指定大小,默认为Integer.MAX_VALUE
  3. synchronousQueue:不会保存提交的任务,直接新建一个线程来执行新来的任务

线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize。

如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

  • AbortPolicy:丢弃任务,抛出RejectedExecutionException异常。
  • DiscardPolicy:丢弃任务,不抛出异常。
  • DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  • CallerRunsPolicy:由调用线程处理该任务

使用示例

public static void main(String[] args) {
    // 完整构造
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,  new ArrayBlockingQueue<Runnable>(5));
    // 常见快速构造
    // 容量固定为1
    ExecutorService justOne = Executors.newSingleThreadExecutor();
    // 容量最大
    ExecutorService intMax =  Executors.newCachedThreadPool();
    // 指定容量
    ExecutorService inputNo = Executors.newFixedThreadPool(10);
    // 使用示例
    for(int i=0;i<15;i++){
        MyTask myTask = new MyTask(i);
        executor.execute(myTask);
        System.out.println("PoolSize:"+executor.getPoolSize()+",WaitQueue:"+ executor.getQueue().size()+",Down:"+executor.getCompletedTaskCount());
    }
    executor.shutdown();
}

线程池大小建议

需要根据任务的类型来配置线程池大小:

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

如果是IO密集型任务,参考值可以设置为2*NCPU

当然,这只是一个参考值。

小结

线程池本质是一个hashSet。

多余的任务会放在阻塞队列中。

阻塞队列满了后,才会触发非核心线程的创建。

非核心线程只是临时过来打杂的,空闲了,自己关闭。

线程池提供了两个钩子(beforeExecute,afterExecute),继承线程池,在执行任务前后做一些事情。

线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池)

米虫

做一个有理想的米虫,伪全栈程序猿,乐观主义者,坚信一切都是最好的安排!

本站由个人原创、收集或整理,如涉及侵权请联系删除

本站内容支持转发,希望贵方携带转载信息和原文链接

本站具有时效性,不提供有效、可用和准确等相关保证

本站不提供免费技术支持,暂不推荐您使用案例商业化

发表观点

提示

昵称

邮箱

QQ

网址

当前还没有观点发布,欢迎您留下足迹!

同类其他

JAVA

Spring、SpringMVC和SpringBoot

Spring就像一个大家族,里面包含了很多衍生产品,其中最为出名的就是SpringMVC和SpringBoot,那么这三者之间是什么关系呢?SpringMVC和SpringBoot又专门用来做什么呢?

自定义filter过滤器拦截未登录(非法)请求

在JAVA的WEB工程中我们可以将JSP页面文件放在WEB-INFO中限制用户进行URL直接访问,但静态资源如js、css文件却是需要被外部直接访问的,直接对外暴露又不太安全,可以通过自定义过滤器处理

一文理解SpringMVC框架核心

SpringMVC框架是围绕DispatcherServlet(前端控制器)展开的,本文描述SpringMVC的优点、各个核心类(角色)作用,并说明用户请求数据到最终视图返回完整的数据传输过程

SpringBoot配置druid(德鲁伊)数据库连接池

SpringBoot 的 MyBatis 默认采用 hikari 连接池,druid (德鲁伊) 连接池由阿里开源,它不仅仅是一个连接池,更是代理、过滤器、解析器、插件、监控、优化等实用功能组件库,更在阿里生产环境得到验证,所以 Lets Do It

Struts2 + Spring框架融合配置

Struts2框架以WebWork优秀的设计思想为核心,吸收了 Struts框架的部分优点,提供了一个更加整洁的MVC设计模式实现的Web应用程序框架,本文主要是与Spring整合关键配置和实例

Maven的pom配置文件的scope属性

scope属性主要用于控制依赖范围,主要分为编译、打包、运行、测试、依赖传递等各个常见,scope不同于optional提供了更多可选择的配置参数用于应对不同的依赖场景。

选择个人头像

昵称

邮箱

QQ

网址

评论提示

  • 头像:系统为您提供了12个头像自由选择,初次打开随机为你选择一个
  • 邮箱:可选提交邮箱,该信息不会外泄,或将上线管理员回复邮件通知
  • 网址:可选提交网址,评论区该地址将以外链的形式展示在您的昵称上
  • 记忆:浏览器将记忆您已选择或填写过得信息,下次评论无需重复输入
  • 审核:提供一个和谐友善的评论环境,本站所有评论需要经过人工审核