二. 线程管理之线程池

 2023-09-16 阅读 17 评论 0

摘要:不忘初心 砥砺前行, Tomorrow Is Another Day ! 相关文章 一. 线程管理之Thread基础二. 线程管理之线程池三. 线程管理之ThreadLocal四. 线程管理之Android中的多线程线程池的线程数怎么确定。本文概要: 认识Executor与ExecutorService理解Future与FuturTaskThreadPoolExecut

不忘初心 砥砺前行, Tomorrow Is Another Day !

相关文章
  • 一. 线程管理之Thread基础
  • 二. 线程管理之线程池
  • 三. 线程管理之ThreadLocal
  • 四. 线程管理之Android中的多线程

线程池的线程数怎么确定。本文概要:

  1. 认识Executor与ExecutorService
  2. 理解Future与FuturTask
  3. ThreadPoolExecutor介绍
  4. Executors工厂类

线程池的优点:

  • 重用线程,避免不必要的对象创建和销毁.
  • 可有效控制最大并发线程数,提高系统资源的使用率.避免因线程过多强占系统资源导致阻塞.

一. 认识Executor与ExecutorService

  • 认识Executor

java 线程池详解、Executor作为线程池的顶级接口. 在Java的设计中,Runnable负责任务的提交. Executor负责任务的执行.将任务进行了解耦.

public interface Executor {void execute(Runnable command);//执行已提交的 Runnable 任务对象
}
复制代码
  • 认识ExecutorService

ExecutorService接口继承了Executor接口,定义了一些生命周期的方法

public interface ExecutorService extends Executor {void shutdown();//顺次地关闭ExecutorService,停止接收新的任务,等待所有已经提交的任务执行完毕之后,关闭ExecutorServiceList<Runnable> shutdownNow();//阻止等待任务启动并试图停止当前正在执行的任务,停止接收新的任务,返回处于等待的任务列表boolean isShutdown();//判断线程池是否已经关闭boolean isTerminated();//如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。boolean awaitTermination(long timeout, TimeUnit unit)//等待(阻塞)直到关闭或最长等待时间或发生中断,timeout - 最长等待时间 ,unit - timeout 参数的时间单位  如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false <T> Future<T> submit(Callable<T> task);//提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。<T> Future<T> submit(Runnable task, T result);//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。Future<?> submit(Runnable task);//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)//执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)//执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)//执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码

在ExecutorService中运用到了Future相关知识,下面对Futue做一个简单了解.

二. 理解Future与FutureTask

  • 理解Future

Future简单理解就是对异步任务的统计类,包含进行取消、查询是否完成、获取结果等操作.

Future类位于java.util.concurrent包下,对应源码.

public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);//表示如果在任务完成前被取消成功,则返回trueboolean isCancelled();表示任务执行结束,无论是正常结束/中断/发生异常,都返回trueboolean isDone();//用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回V get() throws InterruptedException, ExecutionException;//用来获取执行结果,有超时机制,如果阻塞时间超过了指定时间,会抛出异常V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码
  • 理解FutureTask

FutureTask既实现了Future接口,又实现了Runnable接口.

对应源码

public class FutureTask<V> implements RunnableFuture<V>public interface RunnableFuture<V> extends Runnable, Future<V> {void run();
}//构造方法
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;       // ensure visibility of callable
}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable
}public void run() {if (state != NEW ||!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//回调callable的Call方法,获取异步任务返回值.result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {//...省略部分代码}}
复制代码

所以我们可以通过Runnable接口实现线程,又可以通过Future接口获取线程执行完后的结果.

使用示例 上一篇文章Thread基础中已经使用过Future了,这里直接换成FutureTask的使用.

public class MyCallable implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("子线程正在干活");Thread.sleep(3000);return "实现Callable接口,重写Call方法";}public static void main(String[] args) throws Exception {useFutureTask();}private static void  useFutureTask() throws InterruptedException, ExecutionException {MyCallable myCallable = new MyCallable();ExecutorService executorService = Executors.newSingleThreadExecutor();//定义一个Task,再提交任务.FutureTask<String> futureTask = new FutureTask<>(myCallable);executorService.submit(futureTask);executorService.shutdown();Thread.sleep(1000);//模拟正在干活System.out.println("主线程正在干活");//阻塞当前线程,等待返回结果.System.out.println("等待返回结果:" + futureTask.get());System.out.println("主线程所有的活都干完了");}
}//调用输出
子线程正在干活
主线程正在干活
等待返回结果:实现Callable接口,重写Call方法
主线程所有的活都干完了
复制代码

说到这里,可能有些人和我一样迷糊,ExecutorService可以通过submit和execute进行执行任务.那么这两者区别是啥.我们来简单的总结一下.

对应源码

//submit方法
<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);//execute方法
void execute(Runnable command);
复制代码

从源码对比可以知道.

  • 接收的参数不一样.execute仅能接收Runnable类型.
  • submit()有返回值,而execute()没有
  • submit()可以进行Exception处理.

通过总结会发现这其实就与前面介绍的实现Runable与Callable接口的特点类似.

三. ThreadPoolExecutor介绍

所有线程池都是通过ThreadPoolExecutor来创建.

对应源码

 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}
复制代码

参数解析

  • corePoolSize : 核心线程数
  • maximumPoolSize : 最大线程数
  • keepAliveTime : 非核心线程闲置时的超时时间.如果设置了allowCoreThreadTimeOut为true,那么也将作用于核心线程.
  • unit : 时间单位
  • workQueue : 任务队列.存放Runnable任务.

另外还有2个参数,一般我们不需要去手动设定.

  • ThreadFactory : 线程工厂,创建线程用.
  • RejectedExecutionHandler : 任务队列已满或者无法成功执行任务时,会调用此handler的rejectedExecution方法抛出异常通知调用者,俗称饱和策略.

理解ThreadPoolExecutor处理的流程


线程池工作流程
  1. 在提交任务时,检查是否达到核心线程数量.
    • 未达到,则启动核心线程去执行任务.
    • 已达到,进行下一步.
  2. 检查任务队列是否已满.
    • 队列未满,加入任务队列.
    • 队列已满,进行下一步.
  3. 检查是否到达最大线程数.
    • 未达到,启动非核心线程去执行任务.
    • 已达到,执行饱和策略.
  • 一旦有线程处于闲置时,就会去队列中获取任务执行.

四. Executors工厂类

通过Executors提供四种线程池,newFixedThreadPool、newSingleThreadExecutor、newScheduledThreadPool、ewCachedThreadPool.

  • FixedThreadPool

可重用固定线程数的线程池.

对应源码

 public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}复制代码
  • 只有核心线程.数量固定
  • 无超时机制,队列大小无限制.

使用示例

private static void executeFixedThreadPool() {ExecutorService executorService = Executors.newFixedThreadPool(5);for (int i = 0; i < 20; i++) {executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("线程:" + Thread.currentThread().getName());}});}}
复制代码
  • SingleThreadExecutor

单线程的线程池

对应源码

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
复制代码

同FixedThreadPool类似.

  • 固定只有一个核心线程. 因此它能确保所有任务在一个线程中按顺序执行.

使用示例

private static void executeSingleThreadExecutor() {ExecutorService executorService = Executors.newSingleThreadExecutor();for (int i = 0; i < 20; i++) {executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("线程:" + Thread.currentThread().getName());}});}}
复制代码
  • ScheduledThreadPool

支持定时和周期性任务的线程池

对应源码

ScheduledExecutorService.java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}ScheduledThreadPoolExecutor.java    
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue());}
复制代码

同FixedThreadPool类似.

  • 核心线程数量固定,非核心线程不定.

使用示例

private static void executeScheduledThreadPool(int flag) {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println("后"+System.currentTimeMillis());
//                try {
//                    Thread.sleep(10000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }System.out.println("线程:" + Thread.currentThread().getName());}};System.out.println("前"+System.currentTimeMillis());switch (flag) {case 0://延迟1000毫秒后开始执行executorService.schedule(runnable, 1000, TimeUnit.MILLISECONDS);break;case 1://延迟1000毫秒后开始执行,后面每隔2000毫秒执行一次.强调任务的执行频率,不受任务执行时间影响,过时不候.executorService.scheduleAtFixedRate(runnable, 1000, 2000, TimeUnit.MILLISECONDS);break;case 2://延迟1000毫秒后开始执行,后面每次延迟3000毫秒执行一次.强调任务执行的间隔.executorService.scheduleWithFixedDelay(runnable, 1000, 3000, TimeUnit.MILLISECONDS);break;default:break;}}复制代码
  • CachedThreadPool

对应源码

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
复制代码
  • 只有非核心线程.
  • 有超时机制,队列无法存储,被立即执行.

因此适合大量的耗时较少的任务.

使用示例

private static void executeCachedThreadPool() {ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < 20; i++) {executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("线程:" + Thread.currentThread().getName());}});}}
复制代码

关于线程池相关就介绍到这里了.

由于本人技术有限,如有错误的地方,麻烦大家给我提出来,本人不胜感激,大家一起学习进步.

参考链接:

  • www.cnblogs.com/handsomeye/…
  • www.cnblogs.com/dolphin0520…
  • blog.csdn.net/yuzjang/art…
  • blog.csdn.net/liushengbao…
  • www.cnblogs.com/whoislcj/p/…
  • item.jd.com/12125491.ht…

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/4/68276.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息