在JAVA中,线程池最核心的类是 java.uitl.concurrent.ThreadPoolExecutor 。
线程池的主要作用就是存放线程,当需要使用线程时,优先从池中取用一个空闲的线程。
线程池主要实现了线程的复用,无需频繁的创建和销毁线程。
因此,线程池也要具备动态扩容、超时销毁、排队等待等功能。
ThreadPoolExecutor 类图如下:
ThreadPoolExecutor 提供了4个构造函数,如下图:
前三个构造函数,基本上是调用了第四个构造函数,通过构造函数基本上知道了数据结构和各个核心树形的意义。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*
* @param corePoolSize 核心线程数量
* @param maximumPoolSize 最大线程数量
* @param keepAliveTime 非核心线程的最大存活时间,仅对超出核心线程之外生效
* @param unit 用于keepAliveTime的时间单位,如秒、小时、天等
* @param workQueue 等待执行的任务队列,提供了三种队列
* @param threadFactory 线程工厂类,主要用来创建线程
* @param handler 拒绝策略,线程和队列都满了的情况触发
* @throws IllegalArgumentException 抛出参数异常
* @throws NullPointerException 抛出空指针异常
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
上文大致介绍了,线程池核心类的实现,本节主要深入学习下线程池的工作原理。
线程池的状态主要使用原子操作类AtomicInteger ctl 来标识。
ctl 其中高3位表示线程池状态,低29位表示目前的线程数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits(高3位)
// 高3位为1,可以接收新任务,处理队列中等待的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 高3位为0,不再接收新任务,处理队列中等待的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 高3位为001,不再接收新任务,不处理队列中等待的任务,尝试终端运行中任务
private static final int STOP = 1 << COUNT_BITS;
// 高3位为010,所有任务被终止,workerCount为0,此状态将调用terminated()方法
private static final int TIDYING = 2 << COUNT_BITS;
// 高3位为100,terminated()方法调用完成后变成此状态
private static final int TERMINATED = 3 << COUNT_BITS;
相关方法:
线程池默认情况下是不进行线程的初始化的(当然提供了预创建的初始化方法)。
线程初始化一般发生在提交一个任务的时候,集中在execute()方法。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果当前线程小于核心线程,创建一个新线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 线程池是否在运行
// 如果在运行,队列是否允许插入,插入成功,再次验证线程池是否运行
// 如果不在运行,移除插入的任务,然后抛出拒绝策略
// 如果在运行,没有线程了,创建一个新线程(注意是空线程)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 添加非核心线程失败,就直接拒绝
else if (!addWorker(command, false))
reject(command);
}
上文可知,创建线程主要调用了 addWorker()。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 判断线程池的状态,如果大于或等SHUTDOWN,不处理提交的任务,直接返回
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 判断当前需要创建的线程是否为核心线程,入参core为true,且当前线程数小于corePoolSize,跳出循环,开始创建新线程
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// 自旋方法,goto语法
}
}
// 创建线程
boolean workerStarted = false;
boolean workerAdded = false;
ThreadPoolExecutor.Worker w = null;
try {
w = new ThreadPoolExecutor.Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 获取线程池主锁
// 工作线程通过Woker类实现,ReentrantLock锁保证线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加线程到workers中(线程池中)
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动新建的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
为什么 worker 可以承载线程?
Worker 类继承了AQS,并实现了Runnable接口。
它有两个重要的成员变量:firstTask和thread。
上文提到将 worker 添加到 workers 。
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
workers 是一个 hashSet。
所以,线程池底层的存储结构其实就是一个 HashSet。
workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue
线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize。
如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
public static void main(String[] args) {
// 完整构造
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
// 常见快速构造
// 容量固定为1
ExecutorService justOne = Executors.newSingleThreadExecutor();
// 容量最大
ExecutorService intMax = Executors.newCachedThreadPool();
// 指定容量
ExecutorService inputNo = Executors.newFixedThreadPool(10);
// 使用示例
for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("PoolSize:"+executor.getPoolSize()+",WaitQueue:"+ executor.getQueue().size()+",Down:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
需要根据任务的类型来配置线程池大小:
如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
如果是IO密集型任务,参考值可以设置为2*NCPU
当然,这只是一个参考值。
线程池本质是一个hashSet。
多余的任务会放在阻塞队列中。
阻塞队列满了后,才会触发非核心线程的创建。
非核心线程只是临时过来打杂的,空闲了,自己关闭。
线程池提供了两个钩子(beforeExecute,afterExecute),继承线程池,在执行任务前后做一些事情。
线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池)
当前还没有观点发布,欢迎您留下足迹!
Spring就像一个大家族,里面包含了很多衍生产品,其中最为出名的就是SpringMVC和SpringBoot,那么这三者之间是什么关系呢?SpringMVC和SpringBoot又专门用来做什么呢?
在JAVA的WEB工程中我们可以将JSP页面文件放在WEB-INFO中限制用户进行URL直接访问,但静态资源如js、css文件却是需要被外部直接访问的,直接对外暴露又不太安全,可以通过自定义过滤器处理
SpringMVC框架是围绕DispatcherServlet(前端控制器)展开的,本文描述SpringMVC的优点、各个核心类(角色)作用,并说明用户请求数据到最终视图返回完整的数据传输过程
SpringBoot 的 MyBatis 默认采用 hikari 连接池,druid (德鲁伊) 连接池由阿里开源,它不仅仅是一个连接池,更是代理、过滤器、解析器、插件、监控、优化等实用功能组件库,更在阿里生产环境得到验证,所以 Lets Do It
Struts2框架以WebWork优秀的设计思想为核心,吸收了 Struts框架的部分优点,提供了一个更加整洁的MVC设计模式实现的Web应用程序框架,本文主要是与Spring整合关键配置和实例
scope属性主要用于控制依赖范围,主要分为编译、打包、运行、测试、依赖传递等各个常见,scope不同于optional提供了更多可选择的配置参数用于应对不同的依赖场景。