一般的线程池主要分为以下 4 个组成部分:
- 线程池管理器:用于创建并管理线程池
- 工作线程:线程池中的线程
- 任务接口:每个任务必须实现的接口,用于工作线程调度其运行
- 任务队列:用于存放待处理的任务,提供一种缓冲机制
Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor, xecutors,ExecutorService,ThreadPoolExecutor ,Callable 和 Future、FutureTask 这几个类。
- 降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗
- 提高响应速度:任务到达时不需要等待线程创建就可以立即执行
- 提高线程的可管理性:线程池可以统一管理、分配、调优和监控
Java 里面线程池的顶级接口是 Executor,但是严格意义上讲 Executor 并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是 ExecutorService。
- newCachedThreadPool:创建一个可缓存线程池 ,优先使用线程池中的空闲线程,没有可用的线程时,创建一个新线程并添加到池中
- newFixedThreadPool:创建一个 固定线程数的线程池,没有可用的线程时,新的任务需要在队列中等待待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
- newScheduledThreadPool:创建一个定时线程池,可延后或者定期执行。
- newSingleThreadExecutor:创建一个单线程的线程池,这个线程池只有一个线程。这个线程池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去!
主要特点为:线程复用;控制最大并发数;管理线程
Executor 管理多个异步任务的执行,而无需程序员显式地管理线程的生命周期。这里的异步是指多个任务的执行互不干扰,不需要进行同步操作。
这里示范一下 newScheduledThreadPool 的使用,如下:
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(3);
scheduledThreadPool.schedule(() -> System.out.println("延迟三秒"), 3, TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(() -> System.out.println("延迟 1 秒后每三秒执行一次"),1,3,TimeUnit.SECONDS);
}
//输出:
延迟 1 秒后每三秒执行一次
延迟三秒
延迟 1 秒后每三秒执行一次
延迟 1 秒后每三秒执行一次
注意:线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
线程池创建线程的过程如下:
ExecutorService pool = Executors.newFixedThreadPool(taskSize);
||
\/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
||
\/
public ThreadPoolExecutor(
int corePoolSize,//核心线程数 ,线程池的核心池大小,在创建线程池之后,线程池默认没有任何线程。
int maximumPoolSize,//允许的 最大线程数 。maximumPoolSize肯定是大于等于corePoolSize。
long keepAliveTime,//当前线程池数量超过 corePoolSize 时,多余的空闲线程的存活时间,即多少时间内会被销毁。
TimeUnit unit,//keepAliveTime 的单位。
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,//线程工厂,用于创建线程,一般用默认的即可。
RejectedExecutionHandler handler) //拒绝策略,当任务太多来不及处理,如何拒绝任务。
线程池实现原理
了解线程池具体的执行过程,可以通过ThreadPoolExecutor.execute() 分析一下
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1.当前池中线程比核心数少,新建一个线程执行任务
if (workerCountOf(c) < corePoolSize) {
//新建一个线程
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.核心池已满,但任务队列未满,添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//如果这时被关闭了,拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0)
//如果之前的线程已被销毁完,新建一个线程
addWorker(null, false);
}
//核心池已满,队列已满,试着创建一个新线程
else if (!addWorker(command, false))
//如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务
reject(command);
}
从上面代码可以等,线程池工作流程如下:
- 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
- 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
- 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
- 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常 RejectExecutionException。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow()
- shutdown() : 不会立即终止线程池 ,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
- shutdownNow() : 立即终止线程池 ,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
BlockingQueue接口有:ArrayBlockingQueue , DelayQueue , LinkedBlockingDeque , LinkedBlockingQueue , LinkedTransferQueue , PriorityBlockingQueue , SynchronousQueue
ArrayBlockingQueue (有界队列) :是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue (无界队列) :一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue(同步队列) : 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
DelayQueue(延迟队列) :一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。
PriorityBlockingQueue(优先级队列) : 一个具有优先级得无限阻塞队列。
怎么理解无界队列和有界队列
- 有界队列即长度有限,满了以后ArrayBlockingQueue会插入阻塞。
- 无界队列就是里面能放无数的东西而不会因为队列长度限制被阻塞,但是可能会出现OOM异常。
线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。
- AbortPolicy:直接抛出异常,阻止系统正常运行。,它将抛出RejectedExecutionException。
- CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
- DiscardOldestPolicy:丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
- DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际需要,完全可以自己扩展 RejectedExecutionHandler 接口。