线程池是如何实现的?
# 使用线程池有哪些好处
- 降低资源消耗。通过重复利用已创建的线程,降低线程创建和销毁的消耗。
- 提高相应速度。当任务到达时,任务可以不需要线程创建就可以执行。
- 提高线程的可管理性。线程是稀缺资源、如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。使用线程池可以统一分配、调优和监控。
# 线程池的创建
我们可以通过 ThreadPoolExecutor
来创建一个线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {//...}
2
3
4
5
6
7
创建一个线程池需要传入如下几个参数:
corePoolSize: 线程池中的常驻核心线程数。
当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的核心线程能够执行新任务也会创建线程。当线程池中的线程数量到达 corePoolSize 时就不再创建。如果调用了线程池的prestartAllCoreThread()
方法,线程池会在启动后就创建所有核心线程。workQueue: 存放任务的阻塞队列,被提交但尚未被执行的任务。
可以选择如下几个阻塞队列: 1.ArrayBlockingQueue
2.LinkedBlockingQueue
:吞吐量高于LinkedBlockingQueue
,是Executors.newFixedThreadPool()
创建的线程池的默认队列。 3.SynchronousQueue
:一个不存储元素的队列。每个提交的任务都必须等到线程来执行,否则阻塞提交。是Executors.newCachedThreadPool()
创建的线程池的默认队列。 4.PriorityBlockingQueue
:一个具有优先级的无限阻塞队列。maximumPoolSize: 线程池最大线程数量。
如果队列满了并且已经创建的线程数量小于最大线程数量,则线程池会再继续创建新的线程执行任务。值得注意的是如果使用无界队列 (比如LinkedBlockingQueue
)这个参数就没有意义了。threadFactory: 用于设置创建线程的工厂
可以给创建的线程设置有意义的名字,可方便排查问题handler: 拒绝策略,表示当队列满了且工作线程都满了如何来拒绝请求执行的线程的策略,默认是AbortPolicy策略,主要有四种类型。
AbortPolicy 直接抛出RegectedExcutionException 异常阻止系统正常进行,默认策略
DiscardPolicy 直接丢弃任务,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的一种方案
DiscardOldestPolicy 抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务
CallerRunsPolicy 交给线程池调用所在的线程进行处理,“调用者运行”的一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
以上内置拒绝策略均实现了 RejectExcutionHandler 接口,我们也可以实现接口RejectExcutionHandler
来自定义策略,如记录日志或者持久化不能拒绝的任务。
- keepAliveTime: 非核心线程存活时间
线程池中非核心线程空闲的存活时间 当前线程池数量超过 corePoolSize 时,当空闲时间达到 keepAliveTime 值时,非核心线程会被销毁直到只剩下 corePoolSize 个线程为止。
tips:如果任务很多并且任务执行时间比较短,可以调大 keepAliveTime ,提高线程利用率。
- unit: keepAliveTime 的时间单位
# 向线程池提交一个任务后,线程池是如何处理这个任务的呢?
- 如果当前运行的线程数<corePoolSize,则创建新线程来执行任务(这一步骤需要获取全局锁)。
- 如果当前运行的线程数>=corePoolSize,则将任务加入阻塞队列。
- 如果队列已满或者不能加入,接下来的处理分两种情况:
- 线程池中的线程是否都处于工作状态。创建新的线程来执行任务(这一步骤需要获取全局锁)。
- 线程池中有空闲的线程,则用其来执行任务。
- 如果创建新线程会导致线程数量>maximumPoolSize,则执行拒绝策略。
我们再从源码的角度来理解下这个过程:
ThreadPoolExecutor#execute(Runnble command) 方法
public void execute(Runnable command) {
// 参数校验
if (command == null)
throw new NullPointerException();
// 从 c 的值可以判断出线程池的状态, 以及线程池中线程的数量
int c = ctl.get();
// 1. 如果线程池的线程数量 小于 核心线程数
if (workerCountOf(c) < corePoolSize) {
// 添加一个核心线程 command 表示一个具体的任务, true 表示为核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 如果线程池处于 RUNNING 状态(只有处于此状态,才能接受新的任务)
// 并且线程池的数量大于核心线程数, 就把任务添加到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果队列也满了,就创建一个非核心线程(core==false)
else if (!addWorker(command, false))
// 如果创建失败,就执行拒绝策略
reject(command);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 线程池是如何工作的呢?
线程池中有一个 workers 集合,里面记录了所有的工作线程,只有拿到 mainLock 锁的线程才能访问。 ThreadPoolExecutor#workers 属性
private final HashSet<Worker> workers = new HashSet<Worker>();
线程池创建线程时,会将线程包装成 Worker
,Worker
在执行任务后还会循环获取队里里的任务。我们可以从 Worker
类的 run()
方法看到这一点。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
上面代码关键点是while循环和getTask()方法,通过循环不断的调用getTask()从阻塞队列中获取任务,通过这个方法,它与阻塞队列建立桥梁。
# 向线程池提交任务
有两个方法提交任务,分别是execute()
,submit()
execute()
:execute只能提交Runnable类型的任务,用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。如果遇到异常会直接抛出。使用方法如下:
public static void main(String[] args) throws Exception{
ExecutorService es = Executors.newSingleThreadExecutor();
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("Runnable线程处理开始...");
}
};
es.execute(runnable);
es.shutdown();
}
2
3
4
5
6
7
8
9
10
11
submit()
:submit既可以提交Runnable类型的任务,也可以提交Callable类型的任务,会有一个类型为Future的返回值,但当任务类型为Runnable时,返回值为null。如果遇到异常不会直接抛出,只有在使用Future的get方法获取返回值时,才会抛出异常。使用方法:
public static void main(String[] args) throws Exception{
ExecutorService es = Executors.newSingleThreadExecutor();
Callable callable = new Callable() {
@Override
public String call() throws Exception {
System.out.println("线程处理开始...");
return "hello world";
}
};
Future<String> future = es.submit(callable);
while(true) {
//idDone:如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。
if(future.isDone()) {
System.out.println("任务执行完成:" + future.get());
break;
}
}
es.shutdown();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 关闭线程池
可以通过shutdown
和shutdownNow
方法来关闭线程池。他们的原理是遍历线程池的中工作线程,调用其interrupt()
方法来中断线程,所以,无法相应中断的任务永远无法被终止。但是他们也存在一定的区别:
shutdown
:将线程的状态设置为 SHUTDOWN 状态,然后中断没有执行任务的线程。shutdownNow
:首先将线程的状态设置为 STOP ,然后尝试停止所有正在执行或者暂停任务的线程,并返回队列中的待执行任务。
只要调用了这两个方法中的任意一个,isShutdown()
方法就会返回true
,当所有任务都关闭后,才表示线程池关闭成功,这是调用isTerminaed()
方法才会返回 true
。
通常调用 shutdown
来关闭线程池,如果不需要等任务执行完可以调用 shutdownNow
。
# 合理的配置线程池
- 最大线程数 maximumPoolSize 的是指可参考如下规则:
- CPU 密集型任务应该配置尽可能小的线程,如cpu数量+1。
- IO密集型任务线程并不是一直占用cpu,则应该适度更多的配置线程,如2*cpu数量
- 可以通过
Runtime.getRuntime().availableProcessors()
获得当前设备的 cpu 个数。
- 建议使用有界队列。如果任务执行速度变慢,线程池不断向队列中 add 元素,会有内存溢出的风险。