java 池化_溯本求源: JAVA线程池工作原理

 2023-09-11 阅读 14 评论 0

摘要:1. 前言线程池是JAVA开发中最常使用的池化技术之一,可以减少线程资源的重复创建与销毁造成的开销。2. 灵魂拷问:怎么做到线程重复利用?很多同学会联想到连接池,理所当然的说:需要的时候从池中取出线程,执行完任务再放回去。如何用代码实现

1. 前言

线程池是JAVA开发中最常使用的池化技术之一,可以减少线程资源的重复创建与销毁造成的开销。

2. 灵魂拷问:怎么做到线程重复利用?

很多同学会联想到连接池,理所当然的说:需要的时候从池中取出线程,执行完任务再放回去。

如何用代码实现呢?

java线程池使用实例,此时就会发现,调用线程的start方法后,生命周期就不由父线程直接控制了。线程的run方法执行完成就销毁了,所谓的“取出”和“放回”只不过是想当然的操作。

这里先说答案:生产者消费者模型

3. ThreadPoolExecutor的实现

648102a1d255

image

3.1 结构

首先看下ThreadPoolExecutor的继承结构

线程工作原理、顶级接口是Executor,定义execute方法

ExecutorService添加了submit方法,支持返回future获取执行结果,以及线程池运行状态的相关方法

本文着重讲线程池的执行流程,因此将暂时忽略线程池的状态相关的代码,也建议新手看源码时从核心流程看起。

3.2 核心方法:execute()

public void execute(Runnable command) {

if (command == null)

线程池面试原理,throw new NullPointerException();

int c = ctl.get();

// 判断是否小于核心线程数

if (workerCountOf(c) < corePoolSize) {

//添加worker,添加成功则退出

if (addWorker(command, true))

线程池实现原理、return;

c = ctl.get();

}

// 核心线程数已用完则放入队列

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

java线程池会出现哪些问题,// 双重检查,避免入队完成后,所有线程已销毁,导致没有消费者消费当前任务

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

java线程池工作流程、// 队列已满则开启非核心线程,达到最大线程数则使用拒绝策略

else if (!addWorker(command, false))

reject(command);

}

execute方法就是一个生产的过程,主要分为开启线程和入队

开启线程会传入command(即当前任务),开启的线程会立即消费该任务

java四种线程池?入队的任务则会由Worker消费

主要关注corePoolSize,maximumPoolSize,queueSize(任务队列长度),workerCount(当前worker数量)这几个参数,可以总结为以下:

已满

未满

操作

corePoolSize

java线程池怎么使用。开启核心线程

corePoolSize

queueSize

入队

queueSize

maximumPoolSize

java多线程和线程池?开启非核心线程

maximumPoolSize

拒绝

3.3 消费者:Worker

648102a1d255

image

Worker类实现Runnable接口,继承AQS,主要先关注thread和firstTask两个属性和run方法

java线程池工具类?Worker(Runnable firstTask) {

setState(-1);

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

从Worker的构造方法可以看出,thread就是线程池中真正消费任务的线程,创建时会传入this(即Worker对象),而Worker实现了Runnable,因此线程运行时就是执行了Worker的run方法。

简述线程池的原理?final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock();

boolean completedAbruptly = true;

linux线程池的实现原理、try {

// getTask会阻塞,因此不会造成cpu飙高

while (task != null || (task = getTask()) != null) {

// ···

try {

beforeExecute(wt, task);

java线程池源码深度解析?Throwable thrown = null;

try {

// 执行传入的Runnable

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

java线程池实现原理、} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

java池化技术。}

} finally {

// 修改为null,否则下次循环不会调用getTask

task = null;

// ···

}

hashmap工作原理、}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

再来看run方法,直接调用了ThreadPoolExecutor的runWorker方法,runWorker中有一个while循环,循环体执行了task.run方法

task首先会获取Worker中的firstTask属性,并将其置为null,因此firstTask只会执行一次,后续task将通过getTask方法获取。

因此Worker的工作流程可以概括为:消费完Worker的firstTask后,循环执行getTask获取任务并消费,获取不到task时,就退出循环,线程销毁。

此处便可以看出生产者消费者模型了。

private Runnable getTask() {

boolean timedOut = false;

for (;;) {

int c = ctl.get();

// ···

int wc = workerCountOf(c);

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))

&& (wc > 1 || workQueue.isEmpty())) {

// 尝试减少计数,失败则会continue循环重试

if (compareAndDecrementWorkerCount(c))

// 此处返回null,runWorker将退出循环

return null;

continue;

}

try {

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null)

return r;

timedOut = true;

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

runWorker方法退出循环的条件是getTask返回null。

观察getTask,只有同时满足以下情况时才会返回null

条件

解读

1

wc > maximumPoolSize || (timed && timedOut)

workQueue.poll方法超时

2

wc > 1 || workQueue.isEmpty()

队列任务全部执行完

3

compareAndDecrementWorkerCount(c)

cas减少workerCount成功

返回的task是通过workQueue.poll和workQueue.take得到的

两者执行时线程均会挂起,直至workQueue中有新的任务

不同之处在于poll方法阻塞keepAliveTime时间后会自动唤醒并返回null,此时timeOut置为true,即满足条件1,随后继续循环,重复检查是否大于核心线程数且队列为空,是则尝试减少workerCount并退出循环

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

// ···

for (;;) {

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get();

// ···

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

// ···

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

// ···

}

return workerStarted;

}

了解了Worker之后,再来看execute中调用的addWorker方法

方法有两个参数,firstTask即为Worker的firstTask,core则标记需要新增的是否是核心线程

retry循环与线程池状态相关,内层for循环则是重复尝试cas增加线程,若大于corePoolSize或者maximumPoolSize则新增失败,cas成功后,new一个Worker并start

3.4 总结

648102a1d255

image

回到最初的问题,线程是如何做到重复利用的?

并不存在取出线程使用完再归还的操作,线程启动后进入循环,主动获取任务执行,退出循环则线程销毁。

execute方法控制新增Worker和任务入队

附:手写简易线程池

public class MyThreadPool implements Executor {

private int corePoolSize;

private int maximumPoolSize;

private BlockingQueue queue;

//记录当前工作线程数

private AtomicInteger count;

private long keepAliveTime;

private RejectHandler rejectHandler;

public MyThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue queue, long keepAliveTime, RejectHandler rejectHandler) {

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.queue = queue;

this.keepAliveTime = keepAliveTime;

this.rejectHandler = rejectHandler;

count = new AtomicInteger(0);

}

@Override

public void execute(Runnable task) {

int ct = count.get();

//核心线程数未满,尝试增加核心线程

if (ct < corePoolSize && count.compareAndSet(ct, ct + 1)) {

new Worker(task).start();

return;

}

//入队

if (queue.offer(task)) {

return;

}

//重新获取一遍count,否则如果在core分支cas失败,此处必然也失败

ct = count.get();

//队列已满,尝试增加非核心线程

if (ct < maximumPoolSize && count.compareAndSet(ct, ct + 1)) {

new Worker(task).start();

return;

}

//已达最大线程数,拒绝

rejectHandler.reject(task);

}

class Worker extends Thread {

Runnable firstTask;

public Worker(Runnable firstTask) {

this.firstTask = firstTask;

}

@Override

public void run() {

Runnable task = firstTask;

firstTask = null;

while (true) {

try {

//getTask会阻塞

if (task != null || (task = getTask()) != null) {

task.run();

} else {

//getTask超时才会进入,直接退出,线程销毁

break;

}

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

//置空,否则不能getTask

task = null;

}

}

}

}

Runnable getTask() throws InterruptedException {

//标记是否超时过

boolean timedOut = false;

while (true) {

int ct = count.get();

//超出核心线程数才进入超时逻辑,即使timeOut由于线程poll超时过一次变成true,执行到这里如果不超出corePoolSize,可以再次进入take分支

if (ct > corePoolSize) {

//超出核心线程数

if (timedOut) {

//已超时过,尝试减少工作线程数,失败会continue,然后重新比较corePoolSize,重试减少线程数

if (count.compareAndSet(ct, ct - 1)) {

return null;

} else {

continue;

}

}

Runnable task = queue.poll(keepAliveTime, TimeUnit.MILLISECONDS);

if (task == null) {

//poll超时才进入

timedOut = true;

continue;

}

return task;

} else {

//必然能获取到task

return queue.take();

}

}

}

public static interface RejectHandler {

void reject(Runnable r);

}

public static void main(String[] args) {

MyThreadPool pool = new MyThreadPool(2, 5, new LinkedBlockingQueue<>(100), 2000, r -> {

System.out.println(r + ": reject");

});

for (int i = 0; i < 3; i++) {

final int x = i;

new Thread(() -> {

for (int j = 0; j < 5; j++) {

final int y = j;

pool.execute(() -> {

try {

Thread.sleep(3000L);

} catch (InterruptedException e) {

e.printStackTrace();

}

LocalDateTime now = LocalDateTime.now();

System.out.println(String.format("线程i=%s, j=%s,执行结束: %s", x, y, now.format(DateTimeFormatter.ISO_DATE_TIME)));

});

}

}).start();

}

}

}

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/1/43416.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息