博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程池-你可能需要知道这些
阅读量:6175 次
发布时间:2019-06-21

本文共 16465 字,大约阅读时间需要 54 分钟。

线程池无论是在Java开发还是Android开发中,都是一个很常见也很重要的知识点,在面试中也常常被考官问到,那么本博文就带大家一探线程池的究竟。

为什么要使用线程池

在我们日常开发中,当遇到异步或者线程问题时,我们首先想到的是使用Thread或者Runnable来处理,在Android中我们还会结合Handler来实现线程的切换,Thread的使用也比较简单,在处理一些简单的异步任务时也是我们的不二之选。

但是,如果出现需要处理大量异步任务的情况时,使用new Thread的方式就会给我们带来很多问题:

  1. 频繁的创建线程,线程执行完之后又被回收,又会导致频繁的GC,无法重用线程。
  2. 如果线程的创建耗时或者需要消耗资源,频繁的创建和回收会对系统造成负担。
  3. 线程缺乏统一管理,各线程之间互相竞争,降低程序的运行效率。

如果我们使用线程池,那么就能有效的解决以上的问题,线程池主要有以下优点:

  1. 可以重用已经创建好的线程,避免频繁创建进而导致的频繁GC。
  2. 可以有效控制和管理线程并发,提高性能。
  3. 可以有效控制线程的执行,比如:定时执行,取消执行等等。

如何创建、使用线程池

相信大家已经对如何创建、使用线程池了如指掌了,我们举几个简单的例子,以方便后续的博文讲解。

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);ExecutorService cachedThreadPool = Executors.newCachedThreadPool();ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();复制代码

一探究竟

在平时的开发中,我们通常通过如下的代码构造一个固定数量线程的线程池:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);复制代码

我们来看下固定线程数的线程池是如何构造出来的。

public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                                 0L, TimeUnit.MILLISECONDS,                                 new LinkedBlockingQueue
()); }复制代码

原来我们常用的线程池都是通过对ThreadPoolExecutor进行不同配置来实现的,那么我们就从这个ThreadPoolExecutor来开始分析吧!

1.ThreadPoolExecutor

ThreadPoolExecutor有四个构造函数,我们来看一些最复杂的构造函数:

public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue
workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)复制代码

这个构造函数有7个参数,而其他构造函数也是复用了该构造函数,我们来分析下这几个参数的意义:

  1. corePoolSize

线程池中核心线程数。

线程池在新建线程时,如果当前线程池总数小于核心线程数(corePoolSize),那么新建的线程是核心线程;否则,新建的线程则为非核心线程。

除非调用了allowCoreThreadTimeOut方法设置了空闲等待时间,否则这些线程一直存在于线程池中,即使他们处于空闲状态。

  1. maximumPoolSize

线程池中允许存在的最大线程数量。

maximumPoolSize = 核心线程数(corePoolSize)+非核心线程数。

  1. keepAliveTime

非核心线程空闲等待时间(超时时长),当系统中非核心线程闲置时间超过keepAliveTime后,非核心线程就会被回收。

如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也表示核心线程的超时时长。

  1. unit

keepAliveTime的单位,有纳秒、微秒、毫秒、秒、分、时、天等。

TimeUnit是一个枚举类型,包括:

NANOSECONDS : 1微毫秒 = 1微秒 / 1000

MICROSECONDS : 1微秒 = 1毫秒 / 1000

MILLISECONDS : 1毫秒 = 1秒 /1000

SECONDS : 秒

MINUTES : 分

HOURS : 小时

DAYS : 天

  1. workQueue

线程池中待执行任务队列。该队列用于存储由execute提交的Runnable任务。 常用的任务队列主要有以下几种:

  1. SynchronousQueue 这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大。

  2. LinkedBlockingQueue 这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize.

  3. ArrayBlockingQueue 可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误。

  4. DelayQueue 队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。

  1. threadFactory

为线程池创建新线程的工厂类,这个我们一般使用默认即可。

  1. handler

拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到 最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会 抛出一个RejectedExecutionException。

这就是是ThreadPoolExecutor的构造方法参数的解释,我们的线程提交到线程池之后又是按照什么样的策略去运行呢?

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*         * Proceed in 3 steps:         *         * 1. If fewer than corePoolSize threads are running, try to         * start a new thread with the given command as its first         * task.  The call to addWorker atomically checks runState and         * workerCount, and so prevents false alarms that would add         * threads when it shouldn't, by returning false.         *         * 2. If a task can be successfully queued, then we still need         * to double-check whether we should have added a thread         * (because existing ones died since last checking) or that         * the pool shut down since entry into this method. So we         * recheck state and if necessary roll back the enqueuing if         * stopped, or start a new thread if there are none.         *         * 3. If we cannot queue task, then we try to add a new         * thread.  If it fails, we know we are shut down or saturated         * and so reject the task.         */        int c = ctl.get(); // 获取线程池中的线程数目        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true)) // 启动一个核心线程,true代表的是核心线程                return;            c = ctl.get();        }        // 向队列中添加任务,注意offer返回结果(队列已满时返回false,add直接抛出异常)        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }复制代码

从上面的代码中我们可以发现,当一个任务被加入线程池时,ThreadPoolExecutor执行策略如下:

  1. 如果当前线程池中的线程数少于核心线程数(corePoolSize),那么就启动一个新的线程(核心线程)执行任务。

  2. 线程池中的线程数已经达到核心线程数(corePoolSize),而且workQueue未满,则将任务放入workQueue中等待执行。

  3. 如果线程池中的线程数已经达到核心线程数(corePoolSize)但未超过最大线程数(maximumPoolSize),而且workQueue已满,则开启一个非核心线程来执行任务。

  4. 如果线程池中的线程数已经超过最大线程数(maximumPoolSize),则拒绝执行该任务。

2. FixedThreadPool

FixedThreadPool是一个核心线程数量固定的线程池,创建以及构造如下:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue
()); }复制代码

从构造上分析,我们得出FixedThreadPool有以下特点:

  1. 所有线程均为核心线程,不存在非核心线程。(corePoolSize和maximumPoolSize相等,均为nThreads)。

  2. 不存在线程超时的情况。(keepAliveTime==0)

  3. 线程队列(LinkedBlockingQueue)的默认大小为Integer.MAX_VALUE(2的31次方减1),因此我们可以往队列里添加多个任务。

我们来验证下分析是否正确:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);        for (int i = 0; i < 30; i++) {            final int finalI = i;            Runnable runnable = new Runnable() {                @Override                public void run() {                    try {                        Thread.sleep(3000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName());                }            };            fixedThreadPool.execute(runnable);        }复制代码

运行结果如下:

The runnable num is :0 and  thread name :pool-1-thread-1The runnable num is :1 and  thread name :pool-1-thread-2The runnable num is :2 and  thread name :pool-1-thread-3The runnable num is :3 and  thread name :pool-1-thread-1The runnable num is :4 and  thread name :pool-1-thread-2The runnable num is :5 and  thread name :pool-1-thread-3The runnable num is :6 and  thread name :pool-1-thread-1The runnable num is :7 and  thread name :pool-1-thread-2The runnable num is :8 and  thread name :pool-1-thread-3The runnable num is :9 and  thread name :pool-1-thread-1The runnable num is :10 and  thread name :pool-1-thread-2The runnable num is :11 and  thread name :pool-1-thread-3The runnable num is :13 and  thread name :pool-1-thread-2The runnable num is :12 and  thread name :pool-1-thread-1The runnable num is :14 and  thread name :pool-1-thread-3复制代码

运行结果与我们的分析一致,先往核心线程中添加三个任务,剩余任务进入到workQueue中等待,当有空闲的核心线程时就执行任务队列中的任务。

3. SingleThreadExecutor

SingleThreadExecutor与FixedThreadPool类似,不同的是SingleThreadExecutor线程池中只有一个固定的核心线程(可以将其看成是一个特殊的FixedThreadPool)。创建及构造如下:

ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); public static ExecutorService newSingleThreadExecutor() {        return new FinalizableDelegatedExecutorService            (new ThreadPoolExecutor(1, 1,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue
())); }复制代码

从构造上分析,我们得出SingleThreadExecutor有以下特点:

  1. 只存在一个核心线程,不存在非核心线程。(corePoolSize和maximumPoolSize相等,均为nThreads)。

  2. 不存在线程超时的情况。(keepAliveTime==0)

  3. 线程队列的默认大小为Integer.MAX_VALUE(2的31次方减1)。

  4. 任务按照FIFO原则执行,不存在线程同步的问题(只有一个线程啊)。

我们来验证下我们的分析是否正确:

ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();        for (int i = 0; i < 5; i++) {            final int finalI = i;            Runnable runnable = new Runnable() {                @Override                public void run() {                    try {                        Thread.sleep(3000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName());                }            };            singleThreadPool.execute(runnable);        }复制代码

运行结果如下:

The runnable num is :0 and  thread name :pool-1-thread-1The runnable num is :1 and  thread name :pool-1-thread-1The runnable num is :2 and  thread name :pool-1-thread-1The runnable num is :3 and  thread name :pool-1-thread-1The runnable num is :4 and  thread name :pool-1-thread-1复制代码

4. CachedThreadPool

CachedThreadPool的创建及构造如下:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue
());}复制代码

从构造上分析,我们得出SingleThreadExecutor有以下特点:

1.不存在核心线程,只有非核心线程,而且线程数目非常大(Integer.MAX_VALUE)。

  1. 存在线程超时机制,超时时间为60s。(keepAliveTime==60s)

  2. 使用SynchronousQueue来管理任务队列。

从上面的特点我们可以分析出CachedThreadPool特别适合执行大量并发任务的场景,因为线程数目比较大,所以每当我们添加一个新任务进来的时候,如果线程池中有空闲的线程,则由该空闲的线程执行新任务,如果没有空闲线程,则创建新线程来执行任务。

我们来验证下我们的分析是否正确:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();        for (int i = 0; i < 10; i++) {            final int finalI = i;            Runnable runnable = new Runnable() {                @Override                public void run() {                    try {                        Thread.sleep(2000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName());                }            };            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            cachedThreadPool.execute(runnable);        }复制代码

运行结果如下:

The runnable num is :0 and  thread name :pool-1-thread-1The runnable num is :1 and  thread name :pool-1-thread-2The runnable num is :2 and  thread name :pool-1-thread-3The runnable num is :3 and  thread name :pool-1-thread-2The runnable num is :4 and  thread name :pool-1-thread-3The runnable num is :5 and  thread name :pool-1-thread-1The runnable num is :6 and  thread name :pool-1-thread-2The runnable num is :7 and  thread name :pool-1-thread-1The runnable num is :8 and  thread name :pool-1-thread-3The runnable num is :9 and  thread name :pool-1-thread-2复制代码

5. ScheduledThreadPool

ScheduledThreadPool具备定时执行任务的特点,ScheduledThreadPool的创建及构造如下:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {      return new ScheduledThreadPoolExecutor(corePoolSize);}    public ScheduledThreadPoolExecutor(int corePoolSize) {    super(corePoolSize, Integer.MAX_VALUE,          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,            new DelayedWorkQueue());}复制代码

ScheduledThreadPool具有如下特点:

  1. 核心线程数量固定,而且非核心线程数目非常大(Integer.MAX_VALUE)。

  2. 存在线程超时机制,超时时间为10s。(keepAliveTime==10s)

  3. 使用DelayedWorkQueue来管理任务队列。

我们看下ScheduledThreadPool几个常用的API:

  1. 延迟执行任务
public ScheduledFuture
schedule(Runnable command, long delay, TimeUnit unit);复制代码
  1. 延迟定时执行任务
public ScheduledFuture
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);复制代码

首次启动任务的时间间隔为initialDelay+period,然后每隔period执行一次任务。 3. 延迟执行定时执行任务

public ScheduledFuture
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);复制代码

首次启动任务的时间间隔为initialDelay,然后每隔delay执行一次任务。

Android中的线程池

在我们日常的Android开发中,我们经常使用的AsyncTask的实现就采用了线程池,我们来看下AsyncTask是如何实现的。

首先来看AsyncTask的一些属性声明:

private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();    // We want at least 2 threads and at most 4 threads in the core pool,    // preferring to have 1 less than the CPU count to avoid saturating    // the CPU with background work    // 核心线程数最少为2个,最多为4个。为了不影响系统运行,一般取CPU-1个。    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));    // 最多存在的线程数为CPU*2-1    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;    // 线程超时时间为30s    private static final int KEEP_ALIVE_SECONDS = 30;    // 线程生成工厂(没什么卵用,只是为了标识AsyncTask)    private static final ThreadFactory sThreadFactory = new ThreadFactory() {        private final AtomicInteger mCount = new AtomicInteger(1);        public Thread newThread(Runnable r) {            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());        }    };    // 使用LinkedBlockingQueue作为任务管理队列    private static final BlockingQueue
sPoolWorkQueue = new LinkedBlockingQueue
(128); /** * An {
@link Executor} that can be used to execute tasks in parallel. */ public static final Executor THREAD_POOL_EXECUTOR; static { // 根据上面的参数构造线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory); // 允许核心线程超时存在 threadPoolExecutor.allowCoreThreadTimeOut(true); THREAD_POOL_EXECUTOR = threadPoolExecutor; }复制代码

当我们调用了AsyncTask的execute方法后,执行了那些操作呢?

@MainThread    public final AsyncTask
execute(Params... params) { return executeOnExecutor(sDefaultExecutor, params); }@MainThread public final AsyncTask
executeOnExecutor(Executor exec, Params... params) { if (mStatus != Status.PENDING) { switch (mStatus) { case RUNNING: throw new IllegalStateException("Cannot execute task:" + " the task is already running."); case FINISHED: throw new IllegalStateException("Cannot execute task:" + " the task has already been executed " + "(a task can be executed only once)"); } } mStatus = Status.RUNNING; onPreExecute(); mWorker.mParams = params; // 调用了该接口执行任务,exec为SerialExecutor类的实例 exec.execute(mFuture); return this; }复制代码

接着来看SerialExecutor类。

private static class SerialExecutor implements Executor {        // 将其当做栈使用即可        final ArrayDeque
mTasks = new ArrayDeque
(); Runnable mActive; // 执行任务的接口 public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { // 一个任务执行完,才会执行下一个任务(AsyncTask是可能会顺序任务的) scheduleNext(); } } }); // 首个任务执行时,mActive为空调用scheduleNext进行执行 if (mActive == null) { scheduleNext(); } } protected synchronized void scheduleNext() { // 使用了我们创建的线程池完成了任务的执行 if ((mActive = mTasks.poll()) != null) { THREAD_POOL_EXECUTOR.execute(mActive); } } }复制代码

通过上面的简单分析,相信大家对AsyncTask的线程池机制有了一个更清晰的认识,其实很多我们看起来很牛逼的东西实现原理也很简单,只要我们多学习多看源码,就能了解其机制。

结束语

线程池在我们的实际开发中重要性不言而喻,在面试中也会被经常问到,希望大家通过本博文对线程池有一个清晰明了的认识,并在实际开发中使用它,谢谢。

转载于:https://juejin.im/post/5b32e85be51d4558cb101c32

你可能感兴趣的文章
NSURLCache内存缓存
查看>>
jquery click嵌套 事件重复注册 多次执行的问题
查看>>
Dev GridControl导出
查看>>
开始翻译Windows Phone 8 Development for Absolute Beginners教程
查看>>
Python tablib模块
查看>>
站立会议02
查看>>
Windows和Linux如何使用Java代码实现关闭进程
查看>>
0428继承性 const static
查看>>
第一课:从一个简单的平方根运算学习平方根---【重温数学】
查看>>
NET反射系统
查看>>
Oracle12C本地用户的创建和登录
查看>>
使用JS制作一个鼠标可拖的DIV(一)——鼠标拖动
查看>>
HDU problem 5635 LCP Array【思维】
查看>>
leetcode10. 正则表达式匹配
查看>>
redis常用命令--zsets
查看>>
springcloud--Feign(WebService客户端)
查看>>
网络攻击
查看>>
sorting, two pointers(cf div.3 1113)
查看>>
Scala并发编程【消息机制】
查看>>
win10下安装Oracle 11g 32位客户端遇到INS-13001环境不满足最低要求
查看>>