线程池无论是在Java开发还是Android开发中,都是一个很常见也很重要的知识点,在面试中也常常被考官问到,那么本博文就带大家一探线程池的究竟。
为什么要使用线程池
在我们日常开发中,当遇到异步或者线程问题时,我们首先想到的是使用Thread或者Runnable来处理,在Android中我们还会结合Handler来实现线程的切换,Thread的使用也比较简单,在处理一些简单的异步任务时也是我们的不二之选。
但是,如果出现需要处理大量异步任务的情况时,使用new Thread的方式就会给我们带来很多问题:
- 频繁的创建线程,线程执行完之后又被回收,又会导致频繁的GC,无法重用线程。
- 如果线程的创建耗时或者需要消耗资源,频繁的创建和回收会对系统造成负担。
- 线程缺乏统一管理,各线程之间互相竞争,降低程序的运行效率。
如果我们使用线程池,那么就能有效的解决以上的问题,线程池主要有以下优点:
- 可以重用已经创建好的线程,避免频繁创建进而导致的频繁GC。
- 可以有效控制和管理线程并发,提高性能。
- 可以有效控制线程的执行,比如:定时执行,取消执行等等。
如何创建、使用线程池
相信大家已经对如何创建、使用线程池了如指掌了,我们举几个简单的例子,以方便后续的博文讲解。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);ExecutorService cachedThreadPool = Executors.newCachedThreadPool();ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();复制代码
一探究竟
在平时的开发中,我们通常通过如下的代码构造一个固定数量线程的线程池:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);复制代码
我们来看下固定线程数的线程池是如何构造出来的。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }复制代码
原来我们常用的线程池都是通过对ThreadPoolExecutor进行不同配置来实现的,那么我们就从这个ThreadPoolExecutor来开始分析吧!
1.ThreadPoolExecutor
ThreadPoolExecutor有四个构造函数,我们来看一些最复杂的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)复制代码
这个构造函数有7个参数,而其他构造函数也是复用了该构造函数,我们来分析下这几个参数的意义:
- corePoolSize
线程池中核心线程数。
线程池在新建线程时,如果当前线程池总数小于核心线程数(corePoolSize),那么新建的线程是核心线程;否则,新建的线程则为非核心线程。
除非调用了allowCoreThreadTimeOut方法设置了空闲等待时间,否则这些线程一直存在于线程池中,即使他们处于空闲状态。
- maximumPoolSize
线程池中允许存在的最大线程数量。
maximumPoolSize = 核心线程数(corePoolSize)+非核心线程数。
- keepAliveTime
非核心线程空闲等待时间(超时时长),当系统中非核心线程闲置时间超过keepAliveTime后,非核心线程就会被回收。
如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也表示核心线程的超时时长。
- unit
keepAliveTime的单位,有纳秒、微秒、毫秒、秒、分、时、天等。
TimeUnit是一个枚举类型,包括:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天
- workQueue
线程池中待执行任务队列。该队列用于存储由execute提交的Runnable任务。 常用的任务队列主要有以下几种:
SynchronousQueue 这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大。
LinkedBlockingQueue 这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize.
ArrayBlockingQueue 可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误。
DelayQueue 队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。
- threadFactory
为线程池创建新线程的工厂类,这个我们一般使用默认即可。
- handler
拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到 最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会 抛出一个RejectedExecutionException。
这就是是ThreadPoolExecutor的构造方法参数的解释,我们的线程提交到线程池之后又是按照什么样的策略去运行呢?
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); // 获取线程池中的线程数目 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) // 启动一个核心线程,true代表的是核心线程 return; c = ctl.get(); } // 向队列中添加任务,注意offer返回结果(队列已满时返回false,add直接抛出异常) if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }复制代码
从上面的代码中我们可以发现,当一个任务被加入线程池时,ThreadPoolExecutor执行策略如下:
-
如果当前线程池中的线程数少于核心线程数(corePoolSize),那么就启动一个新的线程(核心线程)执行任务。
-
线程池中的线程数已经达到核心线程数(corePoolSize),而且workQueue未满,则将任务放入workQueue中等待执行。
-
如果线程池中的线程数已经达到核心线程数(corePoolSize)但未超过最大线程数(maximumPoolSize),而且workQueue已满,则开启一个非核心线程来执行任务。
-
如果线程池中的线程数已经超过最大线程数(maximumPoolSize),则拒绝执行该任务。
2. FixedThreadPool
FixedThreadPool是一个核心线程数量固定的线程池,创建以及构造如下:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }复制代码
从构造上分析,我们得出FixedThreadPool有以下特点:
所有线程均为核心线程,不存在非核心线程。(corePoolSize和maximumPoolSize相等,均为nThreads)。
不存在线程超时的情况。(keepAliveTime==0)
线程队列(LinkedBlockingQueue)的默认大小为Integer.MAX_VALUE(2的31次方减1),因此我们可以往队列里添加多个任务。
我们来验证下分析是否正确:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 30; i++) { final int finalI = i; Runnable runnable = new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName()); } }; fixedThreadPool.execute(runnable); }复制代码
运行结果如下:
The runnable num is :0 and thread name :pool-1-thread-1The runnable num is :1 and thread name :pool-1-thread-2The runnable num is :2 and thread name :pool-1-thread-3The runnable num is :3 and thread name :pool-1-thread-1The runnable num is :4 and thread name :pool-1-thread-2The runnable num is :5 and thread name :pool-1-thread-3The runnable num is :6 and thread name :pool-1-thread-1The runnable num is :7 and thread name :pool-1-thread-2The runnable num is :8 and thread name :pool-1-thread-3The runnable num is :9 and thread name :pool-1-thread-1The runnable num is :10 and thread name :pool-1-thread-2The runnable num is :11 and thread name :pool-1-thread-3The runnable num is :13 and thread name :pool-1-thread-2The runnable num is :12 and thread name :pool-1-thread-1The runnable num is :14 and thread name :pool-1-thread-3复制代码
运行结果与我们的分析一致,先往核心线程中添加三个任务,剩余任务进入到workQueue中等待,当有空闲的核心线程时就执行任务队列中的任务。
3. SingleThreadExecutor
SingleThreadExecutor与FixedThreadPool类似,不同的是SingleThreadExecutor线程池中只有一个固定的核心线程(可以将其看成是一个特殊的FixedThreadPool)。创建及构造如下:
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }复制代码
从构造上分析,我们得出SingleThreadExecutor有以下特点:
只存在一个核心线程,不存在非核心线程。(corePoolSize和maximumPoolSize相等,均为nThreads)。
不存在线程超时的情况。(keepAliveTime==0)
线程队列的默认大小为Integer.MAX_VALUE(2的31次方减1)。
任务按照FIFO原则执行,不存在线程同步的问题(只有一个线程啊)。
我们来验证下我们的分析是否正确:
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { final int finalI = i; Runnable runnable = new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName()); } }; singleThreadPool.execute(runnable); }复制代码
运行结果如下:
The runnable num is :0 and thread name :pool-1-thread-1The runnable num is :1 and thread name :pool-1-thread-1The runnable num is :2 and thread name :pool-1-thread-1The runnable num is :3 and thread name :pool-1-thread-1The runnable num is :4 and thread name :pool-1-thread-1复制代码
4. CachedThreadPool
CachedThreadPool的创建及构造如下:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());}复制代码
从构造上分析,我们得出SingleThreadExecutor有以下特点:
1.不存在核心线程,只有非核心线程,而且线程数目非常大(Integer.MAX_VALUE)。
存在线程超时机制,超时时间为60s。(keepAliveTime==60s)
使用SynchronousQueue来管理任务队列。
从上面的特点我们可以分析出CachedThreadPool特别适合执行大量并发任务的场景,因为线程数目比较大,所以每当我们添加一个新任务进来的时候,如果线程池中有空闲的线程,则由该空闲的线程执行新任务,如果没有空闲线程,则创建新线程来执行任务。
我们来验证下我们的分析是否正确:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int finalI = i; Runnable runnable = new Runnable() { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName()); } }; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(runnable); }复制代码
运行结果如下:
The runnable num is :0 and thread name :pool-1-thread-1The runnable num is :1 and thread name :pool-1-thread-2The runnable num is :2 and thread name :pool-1-thread-3The runnable num is :3 and thread name :pool-1-thread-2The runnable num is :4 and thread name :pool-1-thread-3The runnable num is :5 and thread name :pool-1-thread-1The runnable num is :6 and thread name :pool-1-thread-2The runnable num is :7 and thread name :pool-1-thread-1The runnable num is :8 and thread name :pool-1-thread-3The runnable num is :9 and thread name :pool-1-thread-2复制代码
5. ScheduledThreadPool
ScheduledThreadPool具备定时执行任务的特点,ScheduledThreadPool的创建及构造如下:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);} public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue());}复制代码
ScheduledThreadPool具有如下特点:
核心线程数量固定,而且非核心线程数目非常大(Integer.MAX_VALUE)。
存在线程超时机制,超时时间为10s。(keepAliveTime==10s)
使用DelayedWorkQueue来管理任务队列。
我们看下ScheduledThreadPool几个常用的API:
- 延迟执行任务
public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit);复制代码
- 延迟定时执行任务
public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);复制代码
首次启动任务的时间间隔为initialDelay+period,然后每隔period执行一次任务。 3. 延迟执行定时执行任务
public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);复制代码
首次启动任务的时间间隔为initialDelay,然后每隔delay执行一次任务。
Android中的线程池
在我们日常的Android开发中,我们经常使用的AsyncTask的实现就采用了线程池,我们来看下AsyncTask是如何实现的。
首先来看AsyncTask的一些属性声明:
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); // We want at least 2 threads and at most 4 threads in the core pool, // preferring to have 1 less than the CPU count to avoid saturating // the CPU with background work // 核心线程数最少为2个,最多为4个。为了不影响系统运行,一般取CPU-1个。 private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4)); // 最多存在的线程数为CPU*2-1 private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; // 线程超时时间为30s private static final int KEEP_ALIVE_SECONDS = 30; // 线程生成工厂(没什么卵用,只是为了标识AsyncTask) private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; // 使用LinkedBlockingQueue作为任务管理队列 private static final BlockingQueuesPoolWorkQueue = new LinkedBlockingQueue (128); /** * An { @link Executor} that can be used to execute tasks in parallel. */ public static final Executor THREAD_POOL_EXECUTOR; static { // 根据上面的参数构造线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory); // 允许核心线程超时存在 threadPoolExecutor.allowCoreThreadTimeOut(true); THREAD_POOL_EXECUTOR = threadPoolExecutor; }复制代码
当我们调用了AsyncTask的execute方法后,执行了那些操作呢?
@MainThread public final AsyncTaskexecute(Params... params) { return executeOnExecutor(sDefaultExecutor, params); }@MainThread public final AsyncTask executeOnExecutor(Executor exec, Params... params) { if (mStatus != Status.PENDING) { switch (mStatus) { case RUNNING: throw new IllegalStateException("Cannot execute task:" + " the task is already running."); case FINISHED: throw new IllegalStateException("Cannot execute task:" + " the task has already been executed " + "(a task can be executed only once)"); } } mStatus = Status.RUNNING; onPreExecute(); mWorker.mParams = params; // 调用了该接口执行任务,exec为SerialExecutor类的实例 exec.execute(mFuture); return this; }复制代码
接着来看SerialExecutor类。
private static class SerialExecutor implements Executor { // 将其当做栈使用即可 final ArrayDequemTasks = new ArrayDeque (); Runnable mActive; // 执行任务的接口 public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { // 一个任务执行完,才会执行下一个任务(AsyncTask是可能会顺序任务的) scheduleNext(); } } }); // 首个任务执行时,mActive为空调用scheduleNext进行执行 if (mActive == null) { scheduleNext(); } } protected synchronized void scheduleNext() { // 使用了我们创建的线程池完成了任务的执行 if ((mActive = mTasks.poll()) != null) { THREAD_POOL_EXECUTOR.execute(mActive); } } }复制代码
通过上面的简单分析,相信大家对AsyncTask的线程池机制有了一个更清晰的认识,其实很多我们看起来很牛逼的东西实现原理也很简单,只要我们多学习多看源码,就能了解其机制。
结束语
线程池在我们的实际开发中重要性不言而喻,在面试中也会被经常问到,希望大家通过本博文对线程池有一个清晰明了的认识,并在实际开发中使用它,谢谢。