之前博客的所有内容是对单个线程的操作,例如有Thread和Runnable的使用以及ThreadGroup等的使用,但是对于在有些场景下我们需要管理很多的线程,而对于这些线程的管理有一个统一的管理工具叫做线程池,线程池就是管理很多的线程的一个集合。这篇分享中提出的一个就是关于线程池的概念。
从JDK1.5开始,utils包提供了一个类ExecutorService,这个类是对线程池的实现,关于线程Thread来说,它的整个生命周期中都是需要很好的管理,但是我们频繁的创建或者是销毁线程极大的浪费了系统的资源,那么就需要将线程进行重复的利用。基于这个需求设计出了线程池。
所谓的线程池,从字面上理解就是存放线程的一个容器。当某个任务需要执行的时候,在线程池里的线程就会主动调用这个任务。但是还需要知道这个线程池的资源也是有限的,什么时候对于线程池的资源进行回收,什么时候进行线程的补充,什么如果达到容量之后继续提交任务会是什么状态等等。
基于上面的描述可以知道一个线程池主要具备的要素有以下一些
通过上面的分析,我们知道了线程的几个重要的因素。下面就来自己实现一个线程池。
public interface ThreadPool {//提交执行任务到线程池中void execute(Runnable runnable);//关闭线程池void shutdown();//获取线程池初始化大小int getInitSize();//获取线程池最大链接数int getMaxSize();//获取线程池需要维护的核心线程数int getCoreSize();//获取线程池中用于缓存任务队列的大小int getQueueSize();//获取线程中活跃线程的大小int getActiveCount();//判断线程是否已经被shutdownboolean isShutdown();
}
public interface RunnableQueue {//当有新任务的时候首先进入到offer中void offer(Runnable runnable);//通过take方法获取到任务Runnable take();//获取任务队列中任务的数量int size();
}
多线程池的实现方式。定义了一个函数式接口
@FunctionalInterface
public interface ThreadFactory {Thread createThread(Runnable runnable);
}
@FunctionalInterface
public interface DenyPolicy {void reject(Runnable runnable,ThreadPool threadPool);class DiscardDenyPolicy implements DenyPolicy{@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {//do nothing}}class AbortDenyPolicy implements DenyPolicy{@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {throw new RunnableDenyException("The runnable "+runnable+" will be abort.");}}class RunnerDenyPolicy implements DenyPolicy{@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {if (!threadPool.isShutdown()){runnable.run();}}}
}
public class RunnableDenyException extends RuntimeException{public RunnableDenyException(String message) {super(message);}
}
public class InternalTask implements Runnable {private final RunnableQueue runnableQueue;private volatile boolean running = true;public InternalTask(RunnableQueue runnableQueue) {this.runnableQueue = runnableQueue;}@Overridepublic void run() {//如果当前任务正在执行并且没有被中断,则需要不断的从Queue中获取任务到run方法中while (running&& !Thread.currentThread().isInterrupted()) {try{Runnable task = runnableQueue.take();task.run();}catch (Exception e){running = false;break;}}}public void stop(){this.running = false;}
}
线程队列的设计
做完基本设计之后剩下的就是对于基本功能的实现,首先是对外部线程队列的实现,对这个线程队列来说,有几个需要注意点,第一个就是队列的大小,第二个就是队列的拒绝策略。也就是是说当队列达到上限之后应该怎么处理。
public class LinkedRunableQueue implements RunnableQueue {//任务队列最大容量private final int limit;private final DenyPolicy denyPolicy;private final LinkedList<Runnable> runnableList = new LinkedList<>();private final ThreadPool threadPool;public LinkedRunableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {this.limit = limit;this.denyPolicy = denyPolicy;this.threadPool = threadPool;}@Overridepublic void offer(Runnable runnable) {synchronized (runnableList){if (runnableList.size() >= limit){denyPolicy.reject(runnable,threadPool);}else {runnableList.addLast(runnable);runnableList.notifyAll();}}}@Overridepublic Runnable take() {synchronized (runnableList){while (runnableList.isEmpty()){try {runnableList.wait();} catch (InterruptedException e) {e.printStackTrace();}}return runnableList.removeFirst();}}@Overridepublic int size() {synchronized (runnableList){return runnableList.size();}}
}
其中两个比较关键的方法一个是offer一个take,在offer方法中如果队列达到上限会执行拒绝策略,否则将继续往队列中放入执行的任务,同时唤醒take的任务线程。take会不断的从队列中获取任务,当队列为空的时候会进入阻塞状态,这个有可能在阻塞的过程中会被中断,所以处理异常的时候要对异常进行thow处理。也就是说对异常进行抛出而不是catch。
线程池的设计
根据前面的设计来开一个线程池有很多的控制属性、例如线程池大小、核心线程数、最大线程数、等等。
public class BasicThreadPool extends Thread implements ThreadPool {private final int initSize;private final int maxSize;private final int coreSize;private int activeCount;private final ThreadFactory threadFactory;private final RunnableQueue runnableQueue;private volatile boolean isShutdown = false;private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();private final long keepAliveTime;private final TimeUnit timeUnit;public BasicThreadPool(int initSize,int maxSize,int coreSize,int queueSize){this(initSize,maxSize,coreSize,DEFAULT_THREAD_FACTORY,queueSize,DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);}public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory,int queueSize, DenyPolicy denyPolicy,long keepAliveTime, TimeUnit timeUnit) {this.initSize = initSize;this.maxSize = maxSize;this.coreSize = coreSize;this.threadFactory = threadFactory;this.runnableQueue = new LinkedRunableQueue(queueSize,denyPolicy,this);this.keepAliveTime = keepAliveTime;this.timeUnit = timeUnit;this.init();}private void init(){start();for (int i = 0; i < initSize; i++) {newThread();}}private void newThread() {InternalTask internalTask = new InternalTask(runnableQueue);Thread thread = this.threadFactory.createThread(internalTask);ThreadTask threadTask = new ThreadTask(thread,internalTask);threadQueue.offer(threadTask);this.activeCount++;thread.start();}private void removeThread(){ThreadTask threadTask = threadQueue.remove();threadTask.internalTask.stop();this.activeCount--;}@Overridepublic void run() {while (isShutdown&& !isInterrupted()){try {timeUnit.sleep(keepAliveTime);} catch (InterruptedException e) {isShutdown = true;break;}synchronized (this){if (isShutdown){break;}if (runnableQueue.size()>0&& activeCount<coreSize){for (int i = initSize; i < coreSize ; i++) {newThread();}continue;}if (runnableQueue.size()>0&& activeCount<maxSize){for (int i = coreSize; i < maxSize ; i++) {newThread();}}if (runnableQueue.size()==0&& activeCount>coreSize){for (int i = coreSize; i < activeCount ; i++) {removeThread();}}}}}private static class ThreadTask{Thread thread;InternalTask internalTask;public ThreadTask(Thread thread,InternalTask internalTask){this.thread = thread;this.internalTask = internalTask;}}@Overridepublic void execute(Runnable runnable) {if (this.isShutdown){throw new IllegalStateException("The thread pool is destory");}this.runnableQueue.offer(runnable);}@Overridepublic void shutdown() {synchronized (this){if (isShutdown){return;}isShutdown =true;threadQueue.forEach(threadTask -> {threadTask.internalTask.stop();threadTask.thread.interrupt();});this.interrupt();}}@Overridepublic int getInitSize() {if (isShutdown){throw new IllegalStateException("The thread pool is destory");}return this.initSize;}@Overridepublic int getMaxSize() {if (isShutdown){throw new IllegalStateException("The thread pool is destory");}return this.maxSize;}@Overridepublic int getCoreSize() {if (isShutdown){throw new IllegalStateException("The thread pool is destory");}return this.coreSize;}@Overridepublic int getQueueSize() {if (isShutdown){throw new IllegalStateException("The thread pool is destory");}return this.runnableQueue.size();}@Overridepublic int getActiveCount() {synchronized (this){return this.activeCount;}}@Overridepublic boolean isShutdown() {return this.isShutdown;}private static class DefaultThreadFactory implements ThreadFactory{private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);private static final ThreadGroup group = new ThreadGroup("TestGroup-"+GROUP_COUNTER.getAndDecrement());private static final AtomicInteger COUNTER = new AtomicInteger(0);@Overridepublic Thread createThread(Runnable runnable) {return new Thread(group,runnable,"thread-pool-"+COUNTER.getAndDecrement());}}
}
对于一个线程池来说,既要有对参数的控制,还要有对活动的控制,当线程执行的时候需要有一个执行线程的方法,将线程放入到线程队列中去执行。
线程池处理并发任务,在执行线程的过程中线程池还有自我维护的功能,也就是说,在执行操作的过程中的健壮性
线程池的销毁策略
线程池作为需要多个线程同时访问的对象,难免会出现线程安全问题,使用同步方式是为了防止线程池自我保护而导致数据不匹配的问题。而线程池的销毁主要是对整个线程池的停止工作。也就是说需要停止线程池中所有的线程执行操作。并且将开关变量设置为true。
由于所有的方法都需要多线程访问,所以为了线程安全,都是使用同步的方式进行操作。
public class ThreadPoolTest {public static void main(String[] args) throws InterruptedException {final ThreadPool threadPool = new BasicThreadPool(2,6,4,1000);for (int i = 0; i < 20; i++) {threadPool.execute(()->{try {TimeUnit.SECONDS.sleep(10);System.out.println(Thread.currentThread().getName() + "is running and done.");} catch (InterruptedException e) {e.printStackTrace();}});}for (;;){System.out.println("getActiveCount:"+threadPool.getActiveCount());System.out.println("getQueueSize:"+threadPool.getQueueSize());System.out.println("getCoreSize:"+threadPool.getCoreSize());System.out.println("getMaxSize:"+threadPool.getMaxSize());System.out.println("==============================");TimeUnit.SECONDS.sleep(1);}}
}
结果如下
结合自定义的线程池的实现,对线程池技术的基本原理有了一个更加深刻的认识。通过自定义的线程池,对于以后使用JDK自带的ExecutorService线程池就有了原理上的认识,当然这个线程池比我现在实现的这个线程池实现的功能强大,但是基本的原理都是一样的。希望可以对大家有所帮助。
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态