JDK里的线程池

 2023-09-05 阅读 223 评论 0

摘要:2019独角兽企业重金招聘Python工程师标准>>> 1. 由来 应用程序处理任务最简单的方式 —— 串行地执行任务。但这种方式一次只能处理一个任务,不能充分利用多核cpu资源,不能提供高吞率和快速响应。 线程池提出以下几点避免之前遇到地问题: 创建

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

1. 由来

应用程序处理任务最简单的方式 —— 串行地执行任务。但这种方式一次只能处理一个任务,不能充分利用多核cpu资源,不能提供高吞率和快速响应。

线程池提出以下几点避免之前遇到地问题:

  • 创建管理多个工作线程 来处理任务,来提供快速响应和高吞吐;
  • 复用线程,减小工作线程创建销毁所消耗的资源;
  • 控制工作线程个数,避免闲置的工作线程,避免大量工作线程的资源竞争,避免过多的工作线程耗尽计算机资源。

线程池就是一个用来快速接收并处理任务的一个箱子,箱子里面有一批工作线程一直在处理任务或等待任务,忙的时候会有新的工作线程出现一起处理任务,闲的时候会关闭掉一些工作线程。

2. JDK里的线程池

2.1 ThreadPoolExecutor类继承结构

Executor框架是JDK里任务执行框架,它把任务的提交和任务的执行解耦合。只需要定义好任务,然后提交给Executor,而不需要关心该任务是如何执行、被哪个线程执行以及什么时候执行。

而线程池作为处理任务的一种方案,jdk里的线程池自然是作为Executor框架下的一种实现出现。线程池类结构图如下: 线程池类结构图

  • java.util.concurrent.Executor:一个只包含一个方法的接口,它的抽象含义是:用来执行一个Runnable任务的执行器;
  • java.util.concurrent.ExecutorService:对Executor的一个扩展,增加一些任务提交和用于生命周期管理的接口;

2.2 ThreadPoolExecutor处理逻辑

ThreadPoolExecutor处理逻辑

任务处理的4个阶段

  1. 客户端调用submit或executor提交任务给线程池;
  2. 如果工作线程数还没达到corePoolSize,就创建新的工作线程,并绑定该任务;
  3. 如果工作线程数达到corePoolSize,新增的任务能够成功地offer放到任务队列里,而线程池里的线程则努力地使用take()或poll()阻塞地从任务队列里拉取任务进行处理。
  4. 如果新增的任务放进队列失败(比如:队列是个有界队列,而且线程池里的线程不能及时将任务取走,任务队列可能会满掉,插入任务就会失败),此时线程池就会紧急地创建新的临时线程来处理这个放不进队列的任务;
  5. 如果新增的任务放不进队列,而且工作线程已经达到最大值maximumPoolSize时,则把任务转送到RejectExecutionHanlder进行拒绝处理。

以上逻辑详见ThreadPoolExecutor.execute(Runnable)方法

额外的处理流程:

  • 如果临时线程使用使用poll(keepAliveTime,timeUnit)在keepAliveTime时间内不能从队列中获取到任务,则说明不忙了,可以关闭临时线程,节约资源。再如果allowCoreThreadTimeOut设置为true,则如果核心线程也使用poll(keepAliveTime,timeUnit)在keepAliveTime时间内不能从队列中获取到任务时,它同样也会被关闭;
  • 如果工作线程发生异常结束了,则会重新addWork创建一个工作线程;

以上逻辑详见ThreadPoolExecutor.runWorker(Worker)方法

代码

public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}

2.3 任务处理组件

  1. 控制阶段1和阶段3的控制器:
    • AtomicInteger ctl: 工作线程个数和线程池状态
    • int corePoolSize:核心线程数大小
    • int maximumPoolSize:最大线程数大小
  2. 任务等待队列BlockingQueue<Runnable> workQueue :缓存来不及处理的任务,以及充当阶段2的控制器
  3. 工作线程:HashSet<Worker> workers
  4. 任务被拒绝处理器:RejectedExecutionHandler handler

上述每个组件都可以进行扩展,来定制适合业务场景的线程池。

2.3.1 控制阶段1和阶段3的控制器

  • ThreadPoolExecutor内部使用AtomicInteger ctl的低位29位来存储工作线程个数,高3位来存储线程池的状态。

  • 如果corePoolSize==maximumPoolSize, 则没有第三阶段,线程池火力全开地干活;

  • 如果corePoolSize==0时,缓存队列千万不能时无界是则会是只有一个线程在处理任务退化成串行处理,队列有界的时候,线程池里都是临时线程临时工;

  • corePoolSize和maximumPoolSize设置成多大最合适。计算公式:N核服务器,如果计算时间为x,io等待时间为y,则工作线程数应设置为 N*(x+y)/x。CPU核数可以通过Runtime.getRuntime().availableProcessors()获得

  • 线程池启动的时候默认没有工作线程,可以通过prestartCoreThread() 或 prestartAllCoreThreads() 来预先创建线程避免首次调用慢的问题

2.3.2 任务等待队列workQueue

任务队列用来传输和保持提交的任务,还用来控制任务处理流程走第二阶段处理还是走第三阶段的处理。所以常见的扩展有4种:

  1. 直接提交,典型的有SynchronousQueue,队列不缓存任务,如果没有对maximumPoolSize进行限制,则忙不过来时就会无限地增加临时线程,最后崩溃。
  2. 无界队列。如果所有corePoolSize线程都忙时,新任务都会在队列中等待。这样创建的线程数就不会超过corePoolSize,所以maximumPoolSize的设置就没意义。如果任务处理速度小于任务提交速度,队列会无限增长占用大量内存最后崩溃。
  3. 有界队列。当使用有限的 maximumPoolSizes时,有界队列有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
  4. 自定义队列,覆盖poll方法,自定义逻辑决定任务是进入队列,还是新建临时线程进行处理。《Tomcat线程池,更符合大家想象的可扩展线程池》 提到的tomcat线程池就是这么做的。

2.3.3 任务被拒绝处理器

ThreadPoolExecutor提供了4种拒绝策略,也可以自己实现:

  1. 中止策略(Abort Policy):默认的策略,队列满时,会抛出异常RejectedExecutionException,调用者在捕获异常之后自行判断如何处理该任务;
  2. 抛弃策略(Discard Policy):队列满时,进程池抛弃新任务,并不通知调用者;
  3. 抛弃最久策略(Discard-oldest Policy):队列满时,进程池将抛弃队列中被提交最久的任务;
  4. 调用者运行策略(Caller-Runs Policy):该策略不会抛弃任务,也不会抛出异常,而是将任务退还给调用者,即新任务将在调用ThreadPoolExecutor的线程中执行。

2.3.4 工作线程

ThreadPoolExecutor使用Worker来封装实际工作的线程,部分源码如下:

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in.  Null if factory fails. */final Thread thread;/** Initial task to run.  Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* [@param](https://my.oschina.net/u/2303379) firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}
  • Woker代理的内部Thread是通过ThreadFactory线程工厂来创建线程,一般是在创建线程的时候设置线程的名称,线程组,优先级,守护进程,异常处理逻辑等。可以通过继承NameThreadFactory来实现自定义线程工厂类;
  • Worker继承AbstractQueuedSynchronizer实现了一个独占不可重入锁,用于中断空闲的Worker时,根据w.tryLock()是否能获取锁来判断一个Worker是否空闲。此结论来自乒乓狂魔的《线程池源码分析-ThreadPoolExecutor》;
  • 实现Runnable接口,实现的run方法时调用ThreadPoolExecutor的runWorker方法,runWorker方法里面设置钩子方法:beforeExecute(Thread, Runnable) 和 afterExecute(Runnable, Throwable)方法,这两种方法分别在执行每个任务之前和之后调用。它们可用于操纵执行环境;例如,重新初始化 ThreadLocal、搜集统计信息、添加日志条目或打印异常。此外,还可以重写 terminated()方法来执行线程池完全终止后需要完成的所有特殊处理。重新onShutdown()方法来执行调用线程池shutdown()方法执行后的一些清理工作。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

2.3.5 关闭线程池

ThreadPoolExecutor提供了2个方法:

  • shutdown():停止接收任务,但是会把队列里的任务执行完;
  • shutdownNow():停止接收任务,停止队列中任务的处理,停止正在处理的任务。

2.3.6 线程池的监控

ThreadPoolExecutor提供get前缀的方法获取线程池的状态信息: getPoolSize(), getCorePoolSize(), getMaximumPoolSize(), getLargestPoolSize(), getActiveCount(), getTaskCount(), getCompletedTaskCount(), getKeepAliveTime(), toString()

2.4 提供常用组合

Executor可以创建的线程池共有四种:
线程池 | 任务队列 | corePoolSize | maxinumPoolSize
------------------------|----------------|------------------|---------|-----
newSingleThreadExecutor| LinkedBlockingQueue | 1 |1
newFixedThreadPool| LinkedBlockingQueue不限容量 | n |n
newCachedThreadPool|SynchronousQueue容量为0|0|Integer.MAX_VALUE
newScheduledThreadPool|DelayedWorkQueue|n|Integer.MAX_VALUE

2.5 异常处理

private ExecutorService getExecutorService() {ThreadFactory threadFactory = new NamedThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = super.newThread(r);t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {@Overridepublic void uncaughtException(Thread t, Throwable e) {e.printStackTrace();}});return t;}};return new ThreadPoolExecutor(WORKER_COUNT, WORKER_COUNT, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(), threadFactory) {@Overrideprotected void afterExecute(Runnable r, Throwable t) {    //异常处理super.afterExecute(r, t);if (t == null && r instanceof Future<?>) {try {//get这里会首先检查任务的状态,然后将上面的异常包装成ExecutionExceptionObject result = ((Future<?>) r).get();} catch (CancellationException ce) {t = ce;} catch (ExecutionException ee) {t = ee.getCause();} catch (InterruptedException ie) {Thread.currentThread().interrupt(); // ignore/reset}}if (t != null) {//异常处理t.printStackTrace();}}};}

参考

  1. 【从0到1学习Java线程池】Java线程池的简介以及使用
  2. 【从0到1学习Java线程池】Java线程池原理
  3. 深度解读 java 线程池设计思想及源码实现
  4. 线程池源码分析-ThreadPoolExecutor
  5. Java ThreadPool的正确打开方式
  6. Tomcat线程池,更符合大家想象的可扩展线程池
  7. 深度解析Java线程池的异常处理机制

转载于:https://my.oschina.net/braveCS/blog/1587351

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/1/117.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息