线程池复用原理

线程池参数

在ThreadPoolExecutor中共有七个参数:

  • corePoolSize:核心线程数,核心线程会一直存活,即使没有任务需要执行(除非allowCoreThreadTimeOut参数设置为true,这样的话即使是核心线程也会被超时销毁);
  • maximumPoolSize:线程池中允许的最大线程数;
  • keepAliveTime:维护工作线程所允许的空闲时间,如果工作线程等待的时间超过了keepAliveTime,则会被销毁;
  • unit:指定keepAliveTime的单位,如TimeUnit.SECONDS;
  • workQueue:用来保存等待被执行任务的阻塞队列。常用的有:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue和PriorityBlockingQueue等;
  • threadFactory:线程工厂,提供创建新线程的功能。默认的实现是Executors.defaultThreadFactory(),即通过new Thread的方式;
  • handler:如果当前阻塞队列已满,并且当前的线程数量已超过了最大线程数,则会执行相应的拒绝策略。具体有四种(也可以自己实现):
    • AbortPolicy:默认实现,会直接抛出RejectedExecutionException;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardPolicy:直接抛弃,任务不执行;
    • DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务,并执行当前任务。

我们知道线程池有核心线程和非核心线程之分,其中核心线程是一直存活在线程池中的,而非核心线程是在执行完任务之后超时销毁的。当Thread执行完Runnable任务之后就会销毁,而且就算执行完任务之后把线程挂起也没有办法再去执行其他任务,那线程池是如何做到核心线程复用的呢?

![image-20220309161728804](/Users/lindinghao/Library/Application Support/typora-user-images/image-20220309161728804.png)

总结上图的流程:有新的任务需要执行,并且当前线程池的线程数小于核心线程数,则创建一个核心线程来执行。如果当前线程数大于核心线程数,则会将除了核心线程处理的任务之外剩下的任务加入到阻塞队列中等待执行。如果队列已满,则在当前线程数不大于最大线程数的前提下,创建新的非核心线程,处理完毕后等到达keepAliveTime空闲时间后会被直接销毁。注意,不一定销毁的就是这些非核心线程,核心线程也可能被销毁,只要减到剩余线程数到达核心线程数就行。核心线程和非核心线程的区别仅在于判断是否到达阈值时有区别:核心线程判断的是核心线程数,而非核心线程判断的是最大线程数。仅此一个区别。

下面看代码。首先来看一下执行线程任务的方法,里面很简单,就是根据工作线程数量去执行不同的策略,里面分成了3种情况,但是都会执行addWoker()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/*
* 如果当前活跃线程数小于核心线程数,就会添加一个worker来执行任务;
* 具体来说,新建一个核心线程放入线程池中,并把任务添加到该线程中。
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

//程序执行到这里,说明要么活跃线程数大于核心线程数;要么addWorker()失败

/*
* 如果当前线程池是运行状态,会把任务添加到队列
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}

//程序执行到这里,说明要么线程状态不是RUNNING;要么workQueue队列已经满了

//调用addWorker方法去创建非核心线程,
//如果当前线程数已经达到 maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

这个方法就和名字一样,添加一个工人来完成任务;而这个工人就是Thread,任务就是Runnable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private boolean addWorker(Runnable firstTask, boolean core) {
...省略一些不重要的

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//Worker是实现了Runnable接口的包装类
w = new Worker(firstTask);
//Thread是在Worker构造方法创建的
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

//检查线程池状态,分为2种情况
//1、线程池处于RUNNING
//2、线程池处于SHUTDOWN并且firstTask==null
//这2种情况都会创建Worker来执行队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//重新设置标识位
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

到这里,我们大概可以知道是通过创建Worker来执行任务的,而且线程是在Worker内部创建的,我们也能猜到Thread需要的Runnable应该也在Worker内部,所有我们继续看一下Worker类。

Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{

final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

Worker居然是一个Runnable任务,而且Worker的构造方法中创建了Thread对象。这样的话,在之前的addWorker()方法中调用t.start();就会执行到Worker的run()方法。

继续来看看runWorker()方法,这里很关键,一定要仔细看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//这个就是addWorker传进来的Runnable
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果task不为null或从workQueue中获取任务不为null
//就会一直执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt

//检查线程池状态,如果线程池处于中断状态,将调用interrupt将线程中断。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中断线程
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//线程任务执行啦
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

这里的关键在于这个while()条件判断,当第一次创建Worker时就有任务,当执行完这个任务后,这个方法并没有结束,而是不断地调用getTask()方法从阻塞队列中获取任务然后调用task.run()执行任务。

getTask()获取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// 1.allowCoreThreadTimeOut表示是否允许核心线程超时销毁,默认是false,也就是说核心线程即使空闲也不会被销毁
//当然,如果设置为true,核心线程是会销毁的
//这样的话,只有正在工作的线程数大于核心线程数才会为true,否则返回false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//2.如果timed为true,通过poll取任务;如果为false,通过take取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

这个方法是通过一个死循环取任务,取任务的话是通过workQueue这个阻塞队列去完成的,在默认不改变allowCoreThreadTimeOut的前提下,如果工作线程数大于核心线程数,则通过poll()从队列取任务;如果工作线程数小于等于核心线程数,则通过take()从队列取任务;这2个方法等区别是take()取任务时,如果队列中没有任务了会调用await()阻塞当前线程。这样的话,是不是已经搞清楚线程池中的核心线程复用的原因了。

线程的唤醒是在execute时,当调用workQueue.offer()方法,将任务放入阻塞队列时,会调用Condition.signal()方法唤醒一个之前阻塞的线程。这部分不细讲,感兴趣的同学自行查看。

总结
  • 1、当Thread的run方法执行完一个任务之后,会循环地从阻塞队列中取任务来执行,这样执行完一个任务之后就不会立即销毁了;
  • 2、当工作线程数小于等于核心线程数,那些空闲的核心线程再去队列取任务的时候,如果队列中的Runnable数量为0,就会阻塞当前线程,这样线程就不会回收了

线程池复用原理
http://example.com/2019/06/05/线程池复用原理/
Author
John Doe
Posted on
June 5, 2019
Licensed under