前言 本篇来讲讲Java中线程池的使用与分析。
正文 阿里巴巴Java开发手册中强调,线程资源必须通过线程池来提供,不允许显式创建线程。 首先我们讲讲普通的线程创建方式。
Java线程创建方式 在Java语言中,我们通过两种方式创建一个新线程,分别是继承Thread类或者实现Runnable接口。
继承Thread类 Thread类本质上是实现了Runnable接口的一个实例,启动线程的方法是通过Thread类的start方法,这个方法是native方法,它将启动线程,并在获得时间片后执行run方法。
1 2 3 4 5 6 7 8 9 10 11 public class MyThread extends Thread { @Override public void run () { System.out.println("MyThread run" ); } public static void main (String[] args) { MyThread myThread = new MyThread(); myThread.start(); } }
实现Runnable接口 如果该类已经继承其他类,由于java是单继承,所以只能实现runnable接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 public class MyThread implements Runnable { @Override public void run () { System.out.println("MyThread run" ); } public static void main (String[] args) { MyThread myThread = new MyThread(); Thread thread = new Thread(myThread); thread.start(); } }
这里值得一提的是,start方法是启动一个线程,线程处于就绪状态,而run方法则是直接执行,线程进入了运行状态。start方法之后会直接执行完毕,等待后续cpu的调度来执行这个线程,如果是run方法则直接在当前线程运行了方法,影响接下来的代码。因此只有start方法才能是多线程执行。
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) { new Thread(new Runnable() { @Override public void run () { System.out.println(Thread.currentThread().getName()); } }).run(); System.out.println(Thread.currentThread().getName()); }
通常情况下如果需要一个异步操作,那直接在类里new Thread实例并传入Runnable实例,重写run方法就可以简单实现一个多线程操作,但这种方法并不提倡。
1 2 3 4 5 6 7 new Thread(new Runnable() { @Override public void run () { } }).start();
为什么使用线程池 前面说了创建线程的方式,那为什么阿里不让程序员这样创建线程呢?原因在于以下几点:
每次new Thread实例性能会逐渐变差。
线程没法统一管理,容易出现线程无限制创建,严重会有线程间相互竞争导致死锁,再严重些会导致占用过多资源而导致OOM。
而相比于new Thread,Java中提供的四种线程池有几点好处:
在创建和销毁线程时所消耗时间与系统资源的开销大幅度减小。
可以提供定时执行,并发控制等高级功能。
Java线程池 Java中通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
Executors源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public class Executors { public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newWorkStealingPool (int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null , true ); } public static ExecutorService newWorkStealingPool () { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null , true ); } public static ExecutorService newFixedThreadPool (int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor (ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool (ThreadFactory threadFactory) { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } public static ScheduledExecutorService newSingleThreadScheduledExecutor () { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1 )); } public static ScheduledExecutorService newSingleThreadScheduledExecutor (ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1 , threadFactory)); } public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool ( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } }
我们查看源码可以知道,Executors类使用了ThreadPoolExecutor类创建了一个简单线程池。我们接着跟进ThreadPoolExecutor类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor (int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue) ; public ThreadPoolExecutor (int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) ; public ThreadPoolExecutor (int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) ; public ThreadPoolExecutor (int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) ; ... }
我们得知,ThreadPoolExecutor提供了四种构造器,但前三种构造器其实是调用了第四种构造方法而已。接下来看看几个参数的作用:
corePoolSize: 线程池核心线程数最大值
maximumPoolSize: 线程池最大线程数大小
keepAliveTime: 线程池中非核心线程空闲的存活时间大小
unit: 线程空闲存活时间单位
workQueue: 存放任务的阻塞队列
threadFactory: 用于设置创建线程的工厂,可以给创建的线程设置有意义的名字,可方便排查问题。
handler: 线城池的饱和策略事件,主要有四种类型。
我们可以看看源码中线程池的执行流程,即execute方法,源码中的注释已经十分清楚。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
翻译一下:
提交一个任务,如果线程池中存活的核心线程小于线程数corePoolSize,线程池会创建一个核心线程去处理提交的任务。
如果核心线程池已满,一个新的任务会放到任务队列workQueue中排队等待执行。
当corePoolSize已满,并且workQueue也满,判断线程数是否达到maximumPoolSize,如果没达到,就创建一个非核心线程来执行任务。
如果线程数达到maximumPoolSize,直接采取拒绝策略。
顺带一提四种拒绝策略:
AbortPolicy(抛出一个异常,默认的)
DiscardPolicy(直接丢弃任务)
DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)
CallerRunsPolicy(交给线程池调用所在的线程进行处理)
至此,线程池的工作原理基本上讲完了。但在阿里巴巴Java开发手册中有这么一条:
【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors返回的线程池对象的弊端如下: 1) FixedThreadPool和SingleThreadPool: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。 2) CachedThreadPool: 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
仔细看源码,确实Executors在创建线程池时,workQueue并没有指定最大大小,可能会导致堆积大量的请求,即没有实行拒绝策略。
线程池的工作队列
ArrayBlockingQueue:有界队列,使用数组实现,FIFO。
LinkedBlockingQueue:基于链表的可设置容量的阻塞队列,默认最大长度为Integer.MAX_VALUE。
DelayQueue:延迟队列,是一个任务定时周期的延迟执行的队列。根据指定的时间从小到大排列,否则否则根据插入到队列的时间先后排序。
PriorityBlockingQueue:优先级队列。
SynchronousQueue:同步队列,不存储元素,每个插入操作必须等到另一个线程进行移除操作,否则一直阻塞。
几种常见线程池 前面只是稍微提到几种线程池,这边详细说说每种线程池。
newFixedThreadPool 源码在上头,这里我就不贴了。可以知道,这个线程池特点是核心线程数与最大线程数相同,并且没有非空闲时间,即keepAliveTime为0,再有就是阻塞队列使用的是LinkedBlockingQueue。 在Executors中newFixedThreadPool的阻塞队列是无界的,也就是如果核心线程执行的时间过长,会导致大量的任务插入到队列中,最终导致OOM。
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10 ); for (int i = 0 ;i< Integer.MAX_VALUE;i++){ executorService.execute(()->{ try { Thread.sleep(100000 ); }catch (InterruptedException e){ } }); } }
因此fixedThreadPool适用于cpu密集的任务,cpu长期被使用的情况下,尽可能少分配线程,适合执行长期的任务。
newCachedThreadPool 这个线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE,阻塞队列为SynchronousQueue,非核心线程的空闲存活时间为60秒。 因为没有核心线程,所以添加任务后直接添加进阻塞队列,判断是够有空闲进程,如果有,取出去执行任务,如果没有,就创建一个线程执行,执行完任务的线程有60秒的存活时间,如果再次接到任务,则可以活下去,否则被销毁。 这种线程池适合并发执行短期的小任务。
1 2 3 4 5 6 7 8 9 public static void main (String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0 ;i<5 ;i++){ executorService.execute(() -> { System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " running now" ); }); } }
如果提交任务速度小于处理任务的速度,则只会使用一个线程重复使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0 ;i<5 ;i++){ executorService.execute(() -> { System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " running now" ); }); try { Thread.sleep(10000 ); }catch (Exception e){ } } }
newSingleThreadExecutor 特点为核心线程数为1,最大线程数也为1,阻塞队列是LinkedBlockingQueue,keepAliveTime为0。 这个线程池的特点就是只有一个线程在工作,如果有任务到来,线程空闲就执行任务,线程繁忙就把任务加到阻塞队列中,直到线程从队列中取出任务执行。 适合串行执行任务的场景。
1 2 3 4 5 6 7 8 public static void main (String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0 ;i<5 ;i++){ executorService.execute(() -> { System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " running now" ); }); } }
newScheduledThreadPool 特点是最大线程数为Integer.MAX_VALUE,阻塞队列是DelayedWorkQueue,keepAliveTime为0。 工作机制为:添加一个任务到阻塞队列中,线程池中的线程从阻塞队列中取任务,取time大于当前时间的任务,执行完将任务的time修改为下次执行的时间,并将这个任务放到阻塞队列中。 这种线程池适合周期执行定时任务的场景。
1 2 3 4 5 6 7 public static void main (String[] args) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1 ); executorService.scheduleWithFixedDelay(() -> { System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " running now" ); },1 ,3 ,TimeUnit.SECONDS); }
后文 本文讲了Java中四种常见的线程池,并学习相应的工作原理与应用常见,尽管阿里开发手册中不允许直接使用Executors中的线程池,但理解这几种线程池十分重要,如果需要创建线程池的话,可以通过ThreadPoolExecutor自定义所有参数来创建,让代码阅读者明白为什么要这样创建。通过这次的学习,对java线程池的理解确实有学到不少。