Zer0e's Blog

浅谈Java线程池

字数统计: 3.1k阅读时长: 13 min
2020/07/24 Share

前言

本篇来讲讲Java中线程池的使用与分析。

正文

阿里巴巴Java开发手册中强调,线程资源必须通过线程池来提供,不允许显式创建线程。
首先我们讲讲普通的线程创建方式。

Java线程创建方式

在Java语言中,我们通过两种方式创建一个新线程,分别是继承Thread类或者实现Runnable接口。

继承Thread类

Thread类本质上是实现了Runnable接口的一个实例,启动线程的方法是通过Thread类的start方法,这个方法是native方法,它将启动线程,并在获得时间片后执行run方法。

1
2
3
4
5
6
7
8
9
10
11
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("MyThread run");
}

public static void main(String[] args) {
MyThread myThread = new MyThread();
myThread.start();
}
}

实现Runnable接口

如果该类已经继承其他类,由于java是单继承,所以只能实现runnable接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyThread implements Runnable {
@Override
public void run() {
System.out.println("MyThread run");
}

public static void main(String[] args) {
MyThread myThread = new MyThread();
Thread thread = new Thread(myThread);
thread.start();
}
}

这里值得一提的是,start方法是启动一个线程,线程处于就绪状态,而run方法则是直接执行,线程进入了运行状态。start方法之后会直接执行完毕,等待后续cpu的调度来执行这个线程,如果是run方法则直接在当前线程运行了方法,影响接下来的代码。因此只有start方法才能是多线程执行。

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}).run();
System.out.println(Thread.currentThread().getName());

}
// 输出两个main 如果是start方法,输出不同线程。

通常情况下如果需要一个异步操作,那直接在类里new Thread实例并传入Runnable实例,重写run方法就可以简单实现一个多线程操作,但这种方法并不提倡。

1
2
3
4
5
6
7
new Thread(new Runnable() {

@Override
public void run() {
// TODO
}
}).start();

为什么使用线程池

前面说了创建线程的方式,那为什么阿里不让程序员这样创建线程呢?原因在于以下几点:

  • 每次new Thread实例性能会逐渐变差。
  • 线程没法统一管理,容易出现线程无限制创建,严重会有线程间相互竞争导致死锁,再严重些会导致占用过多资源而导致OOM。

而相比于new Thread,Java中提供的四种线程池有几点好处:

  • 在创建和销毁线程时所消耗时间与系统资源的开销大幅度减小。
  • 可以提供定时执行,并发控制等高级功能。

Java线程池

Java中通过Executors提供四种线程池,分别为:

  • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

Executors源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class Executors {

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
}

我们查看源码可以知道,Executors类使用了ThreadPoolExecutor类创建了一个简单线程池。我们接着跟进ThreadPoolExecutor类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}

我们得知,ThreadPoolExecutor提供了四种构造器,但前三种构造器其实是调用了第四种构造方法而已。接下来看看几个参数的作用:

  • corePoolSize: 线程池核心线程数最大值
  • maximumPoolSize: 线程池最大线程数大小
  • keepAliveTime: 线程池中非核心线程空闲的存活时间大小
  • unit: 线程空闲存活时间单位
  • workQueue: 存放任务的阻塞队列
  • threadFactory: 用于设置创建线程的工厂,可以给创建的线程设置有意义的名字,可方便排查问题。
  • handler: 线城池的饱和策略事件,主要有四种类型。

我们可以看看源码中线程池的执行流程,即execute方法,源码中的注释已经十分清楚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

翻译一下:

  • 提交一个任务,如果线程池中存活的核心线程小于线程数corePoolSize,线程池会创建一个核心线程去处理提交的任务。
  • 如果核心线程池已满,一个新的任务会放到任务队列workQueue中排队等待执行。
  • 当corePoolSize已满,并且workQueue也满,判断线程数是否达到maximumPoolSize,如果没达到,就创建一个非核心线程来执行任务。
  • 如果线程数达到maximumPoolSize,直接采取拒绝策略。

顺带一提四种拒绝策略:

  • AbortPolicy(抛出一个异常,默认的)
  • DiscardPolicy(直接丢弃任务)
  • DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)
  • CallerRunsPolicy(交给线程池调用所在的线程进行处理)

至此,线程池的工作原理基本上讲完了。但在阿里巴巴Java开发手册中有这么一条:

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors返回的线程池对象的弊端如下: 1) FixedThreadPool和SingleThreadPool: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。 2) CachedThreadPool: 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

仔细看源码,确实Executors在创建线程池时,workQueue并没有指定最大大小,可能会导致堆积大量的请求,即没有实行拒绝策略。

线程池的工作队列

  • ArrayBlockingQueue:有界队列,使用数组实现,FIFO。
  • LinkedBlockingQueue:基于链表的可设置容量的阻塞队列,默认最大长度为Integer.MAX_VALUE。
  • DelayQueue:延迟队列,是一个任务定时周期的延迟执行的队列。根据指定的时间从小到大排列,否则否则根据插入到队列的时间先后排序。
  • PriorityBlockingQueue:优先级队列。
  • SynchronousQueue:同步队列,不存储元素,每个插入操作必须等到另一个线程进行移除操作,否则一直阻塞。

几种常见线程池

前面只是稍微提到几种线程池,这边详细说说每种线程池。

newFixedThreadPool

源码在上头,这里我就不贴了。可以知道,这个线程池特点是核心线程数与最大线程数相同,并且没有非空闲时间,即keepAliveTime为0,再有就是阻塞队列使用的是LinkedBlockingQueue。
在Executors中newFixedThreadPool的阻塞队列是无界的,也就是如果核心线程执行的时间过长,会导致大量的任务插入到队列中,最终导致OOM。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0;i< Integer.MAX_VALUE;i++){
executorService.execute(()->{
try {
Thread.sleep(100000);
}catch (InterruptedException e){

}
});
}
}
//Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded

因此fixedThreadPool适用于cpu密集的任务,cpu长期被使用的情况下,尽可能少分配线程,适合执行长期的任务。

newCachedThreadPool

这个线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE,阻塞队列为SynchronousQueue,非核心线程的空闲存活时间为60秒。
因为没有核心线程,所以添加任务后直接添加进阻塞队列,判断是够有空闲进程,如果有,取出去执行任务,如果没有,就创建一个线程执行,执行完任务的线程有60秒的存活时间,如果再次接到任务,则可以活下去,否则被销毁。
这种线程池适合并发执行短期的小任务。

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0 ;i<5;i++){
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " running now");
});
}

}

如果提交任务速度小于处理任务的速度,则只会使用一个线程重复使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0 ;i<5;i++){
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " running now");
});
try{
Thread.sleep(10000);
}catch (Exception e){

}
}
}

newSingleThreadExecutor

特点为核心线程数为1,最大线程数也为1,阻塞队列是LinkedBlockingQueue,keepAliveTime为0。
这个线程池的特点就是只有一个线程在工作,如果有任务到来,线程空闲就执行任务,线程繁忙就把任务加到阻塞队列中,直到线程从队列中取出任务执行。
适合串行执行任务的场景。

1
2
3
4
5
6
7
8
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0 ;i<5;i++){
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " running now");
});
}
}

newScheduledThreadPool

特点是最大线程数为Integer.MAX_VALUE,阻塞队列是DelayedWorkQueue,keepAliveTime为0。
工作机制为:添加一个任务到阻塞队列中,线程池中的线程从阻塞队列中取任务,取time大于当前时间的任务,执行完将任务的time修改为下次执行的时间,并将这个任务放到阻塞队列中。
这种线程池适合周期执行定时任务的场景。

1
2
3
4
5
6
7
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(() -> {
System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " running now");
},1,3,TimeUnit.SECONDS);

}

后文

本文讲了Java中四种常见的线程池,并学习相应的工作原理与应用常见,尽管阿里开发手册中不允许直接使用Executors中的线程池,但理解这几种线程池十分重要,如果需要创建线程池的话,可以通过ThreadPoolExecutor自定义所有参数来创建,让代码阅读者明白为什么要这样创建。通过这次的学习,对java线程池的理解确实有学到不少。

CATALOG
  1. 1. 前言
  2. 2. 正文
    1. 2.1. Java线程创建方式
      1. 2.1.1. 继承Thread类
    2. 2.2. 实现Runnable接口
    3. 2.3. 为什么使用线程池
    4. 2.4. Java线程池
    5. 2.5. 线程池的工作队列
    6. 2.6. 几种常见线程池
      1. 2.6.1. newFixedThreadPool
      2. 2.6.2. newCachedThreadPool
      3. 2.6.3. newSingleThreadExecutor
      4. 2.6.4. newScheduledThreadPool
  3. 3. 后文