Java线程池

虽然之前学习了不少相关知识,但是只有在实践中踩坑才能印象深刻。今天看了半天的java对线程池的处理,额外兴致来,总结一份java线程池相关。

线程池的后果

Java提供的工具类-Executors

Executors是一个Java中的工具类,提供工厂方法来创建不同类型的线程池。 提供方法如下:

然后阿里巴巴Java开发手册中这样提到:

  1. 【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让同学更加明确线程池的运行规则,规避资源耗尽的风险。

    说明:Executors 返回的线程池对象的弊端如下:
          
    1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。 
                      
    2)CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。 
    

第一个关键词:OOM。怎么导致OOM的呢,那就show codes,一起来看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @describe: Executors线程池创建,证明使用Executors会造成oom
* @author:彭爽pross
* @date: 2018/12/21
*/
public class ExecutorsDemo {
private static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
for(int i=1;i<Integer.MAX_VALUE;i++){
executor.execute(new subThread());
System.out.println("Thread count:"+i);
}
}
}
class subThread implements Runnable{
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}

代码里刻意调整线程数量,启动时故意设置内存大小(模拟上限内存):-Xmx8m -Xms8m。运行结果:

1
2
3
4
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at org.pross.threadPool.ExecutorsDemo.main(ExecutorsDemo.java:18)

错误就是很明显的,OOM问题。再出现OOM之前,会一直有打印输出,一直达到内存上限为止。为什么使用Executors创建线程池就会错误,那我们就来追溯Java创建线程池造成OOM的原因。

Executors为什么存在缺陷

往上面结果看一眼,最终执行错误代码到了这一行:java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)。敲黑板划一下第二个关键词:LinkedBlockingQueue,点进newFixedThreadPool 可以发现关键词的踪影:

1
2
3
4
5
6
7
8
9
10
 /*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

这里就需要继续说下Java的堵塞队列。Java中的BlockingQueue主要的实现方式是ArrayBlockingQueueLinkedBlockingQueue。ArrayBlockingQueue是一个用数组实现的有界的阻塞队列,必须设置容量。但是我们默认的LinkedBlockingQueue是一个用链表实现的有界阻塞队列,容量可以选择行进行设置,不设置的话,就是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE

1
2
3
4
5
6
7
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

所以就很简单的找到了问题所在,不设置LinkedBlockingQueue容量大小的话就是默认是Integer.MAX_VALUE。而最开始创建newFixedThreadPool时候,并没有可以指定的字段。所以出现OOM是正常的,阿里巴巴Java开发手册中不允许使用 Executors 去创建是正常的。 那我们想使用线程池怎么去创建呢?

创建线程池的正确姿势

我们继续往前看newFixedThreadPool是怎么实现的那段源码,继续敲第三个关键词:ThreadPoolExecutor,是直接返回ThreadPoolExecutor对象,创建包含各个字段信息,其中就有LinkedBlockingQueue。那我们能不能直接调用ThreadPoolExecutor的构造函数来自己创建线程池,在创建的同时,给LinkedBlockingQueue指定容量呢?这个问答必须是Yes。

1
2
private static ExecutorService executor = new ThreadPoolExecutor(5, 200, 0L, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024));

如上代码直接调用ThreadPoolExecutor来自己创建线程池。我们替换掉前面写的Demo中创建的方式,启动运行,结果如下:

1
2
3
4
5
6
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task org.pross.threadPool.subThreadPool@2e5d6d97 rejected from java.util.concurrent.ThreadPoolExecutor@238e0d81[Running, pool size = 200, active threads = 200, queued tasks = 1024, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.pross.threadPool.ThreadPoolExecutorDemo.main(ThreadPoolExecutorDemo.java:18)

结果还是有问题,和上次不同的是,这次是抛出的异常信息,但是异常(Exception)总比发生错误(Error)要好,敲黑板划一下第四个关键词:ThreadPoolExecutor$AbortPolicy.rejectedExecution。这个关键词代表的信息很简单,是因为当前线程池使用的队列是有边界队列,队列已经满了便无法继续处理新的请求,所以就会抛出rejectedExecution,所以这次打印输出只到Thread count:1224,1224即队列最大数加上最大线程池数之和。

线程池的前因

如果只看结果和使用,那么到这里就结束了。如果我们深入思考一步,发现这个BlockingQueue队列似乎最开始就默认需要了,但是为什么需要队列,Have you thought about it?这个就要道道线程池的工作原理。

我们需要了解的Java线程池工作原理

线程池内的线程数的大小相关的概念有两个,一个是核心池大小(corePoolSize),还有最大线程池大小(maximumPoolSize)。如果当前的线程个数比核心池个数小,当线程任务到来,会优先创建一个新的线程并执行任务。当已经到达核心池大小,则把任务放入队列,为了资源不被耗尽,队列的最大容量可能也是有上限的,如果达到队列上限则考虑继续创建新线程执行任务,如果此刻线程的个数已经到达最大线程池的上限话,则考虑把任务丢弃。

然后我从网上随便找了张图,放到这方便理解。

所以这就比较方便理解上面调用ThreadPoolExecutor来自己创建线程池中的一些参数了。当然,ThreadPoolExecutor的构造函数有四种,我选取一个参数最完整的构造方法。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

那一起来看一下这些参数的具体意思。

corePoolSize:核心池大小,需要注意的是在初创建线程池时线程不会立即启动,直到有任务提交才开始启动线程并逐渐时线程数目达到corePoolSize。若想一开始就创建所有核心线程需调用prestartAllCoreThreads方法。

maximumPoolSize:池中允许的最大线程数。需要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否创建新线程。

keepAliveTime:当线程数大于核心时,多于的空闲线程最多存活时间。

unit :keepAliveTime 参数的时间单位。

workQueue :当线程数目超过核心线程数时用于保存任务的队列。主要有3种类型的BlockingQueue可供选择:无界队列,有界队列和同步移交。(重要)

threadFactory :执行程序创建新线程时使用的工厂。

handler :阻塞队列已满且线程数达到最大值时所采取的饱和策略。java默认提供了4种饱和策略的实现方式:中止、抛弃、抛弃最旧的、调用者运行。将在下文中详细阐述。(重要)

我们选取比较复杂和重要的两个参数来介绍一下。(我也是翻阅的资料)

阻塞队列BlockingQueue

如果运行的线程少于corePoolSize,则Executor会首先添加新的线程直接去运行,不会进入BlockingQueue排队等候;如果运行的线程大于等于corePoolSize,则Executor就会将新任务请求加入BlockingQueue排队等候。而BlockingQueue主要有三种类型:无界队列有界队列同步移交队列

无界队列

队列的大小无限制,就是前面提到过不指定容量默认使用的LinkedBlockingQueue,如果不指定容量大小的话,当任务线程池中耗时较长就会导致大量新任务在队列中堆积导致OOM。

有界队列

有界队列也存在两类:遵循FIFO原则的队列(ArrayBlockingQueue,LinkedBlockingQueue)和优先级队列(PriorityBlockingQueue),优先级由任务的Comparator决定。使用有界队列时队列大小需要和线程池大小相配合,线程池较小,有界队列较大时可以减少内存消耗,降低CPU使用率,但是会限制QPS。

同步移交队列

如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。

饱和策略RejectedExecutionHandler

JDK提供四种饱和策略,都作为静态内部类在ThreadPoolExcutor中进行实现。

AbortPolicy终止策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

通过代码可以看出,该策略是默认饱和策略。使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。

DiscardPolicy抛弃策略

1
2
3
4
5
6
7
8
9
10
11
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

不作任何处理,相当于直接抛弃任务。

DiscardOldestPolicy抛弃旧任务策略

如代码,先将阻塞队列中的头元素出队抛弃(poll),再尝试提交任务(execute)。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

CallerRunsPolicy调用者运行策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

既不抛弃任务也不抛出异常,直接运行任务的run方法,换言之将任务回退给调用者来直接运行。使用该策略时线程池饱和后将由调用线程池的主线程自己来执行任务,因此在执行任务的这段时间里主线程无法再提交新任务,从而使线程池中工作线程有时间将正在处理的任务处理完成。

Java线程池就介绍到这里。

完。