java blockingqueue,LinkedBlockingQueue1.8源碼

 2023-10-18 阅读 28 评论 0

摘要:使用 private int i;LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);class Producer implements Runnable {@Overridepublic void run() {while (true) {try {queue.put(i++);System.out.println("生產,,,,,,剩余容量&#

使用

private int i;LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);class Producer implements Runnable {@Overridepublic void run() {while (true) {try {queue.put(i++);System.out.println("生產,,,,,,剩余容量:" + queue.remainingCapacity());System.out.println("生產,,,,,,剩余容量:" + queue);} catch (InterruptedException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {while (true) {try {queue.take();System.out.println("消費,,,,剩余容量:" + queue.remainingCapacity());System.out.println("消費,,,,剩余容量:" + queue);} catch (InterruptedException e) {e.printStackTrace();}}}}public static void main(String[] args) {Test linkedBlockingQueueTest = new Test();new Thread(linkedBlockingQueueTest.new Producer()).start();new Thread(linkedBlockingQueueTest.new Consumer()).start();}

生產,剩余容量:2
生產,剩余容量:[0]
生產,剩余容量:1
生產,剩余容量:[0, 1]
生產,剩余容量:0
生產,剩余容量:[0, 1, 2]
消費,剩余容量:1
生產,剩余容量:0
消費,剩余容量:[1, 2, 3]
生產,剩余容量:[1, 2, 3]
消費,剩余容量:1
生產,剩余容量:0
消費,剩余容量:[2, 3, 4]
生產,剩余容量:[2, 3, 4]
消費,剩余容量:1
生產,剩余容量:0
消費,剩余容量:[3, 4, 5]
生產,剩余容量:[3, 4, 5]
消費,剩余容量:1

是一個可變范圍的鏈表,隊列的元素是先進先出(FIFO),首元素在隊列中待的時間最長,尾元素在隊列中待的時間最短。新元素添加在隊列的尾部,檢索操作獲得隊列的頭部。 有更大的吞吐量比起 ,但是更少的可預見性操作在
并發應用中。該隊列的容量大小是可變的,如果在構造函數中沒有指定,默認的容量是 ,鏈表節點會被動態的創建,除非此操作會導致超出容量。此類的它的迭代器實現了 和 接口的所有方法。
從源碼中可以看到,該類繼承了AbstractQueue,實現了BlockingQueue,
一般情況下,在處理多線程的并發問題時,常常用到此類。

構造器

    public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}

java blockingqueue,節點

    static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}

容量大小
private final int capacity;
如果沒有指定,則為Integer.MAX_VALUE,支持原子操作

當前元素數量
private final AtomicInteget count = new AtomicInteget();

鏈表頭節點,其前驅節點為null
transient Node head;

linkedblockingqueue使用、鏈表尾節點,其后繼為null
private transient Node last;

針對取和添加操作的兩把鎖及其上的條件

/*Lock held by tack,poll,etc/
private final ReentrantLook takeLock=new ReentrantLock();
/*Wait queue for waiting takes/
private final Condition notEmpty=takeLock.newCondition();
/*Lock held by put,offer,etc/
private final ReentrantLock putLock=new ReentrantLock();
/*Wait queue for waiting puts/
private final Condition notFull=putLock.newCondition();

主要方法

/**添加一個元素到隊尾,如果隊列已滿,則一直處于阻塞狀態,直到有可用空間
*/
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//如果當前線程未被中斷,則獲取鎖putLock.lockInterruptibly();try {/*當容量已滿時,等待notfull條件釋放鎖,陷入等待狀態有兩種方法會激活該線程:1.  某個put方法添加元素后,發現隊列有空余,就調用  notFull.signal()方法激活阻塞線程;2.  Take線程取元素時,發現隊列已滿,取出元素后,也會調用notFull.signal()方法激活阻塞線程*/while (count.get() == capacity) {notFull.await();}//把元素node添加到隊尾enqueue(node);c = count.getAndIncrement();//發現隊列未滿,調用notFull.signal()激活阻塞線程的put線程if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();
}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {//若容量為空,等待非空條件while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();//再次激活notEmptyif (c > 1)notEmpty.signal();} finally {takeLock.unlock();}//如果容量已滿,調用signalNotFull(),put線程處于阻塞狀態。if (c == capacity)signalNotFull();return x;
}

其生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味著在高并發的操作下生產者和消費者可以并行的操作隊列中的數據,依次來提高整個隊列的并發性能。在這里,我們需要注意的是:如果構造一個LinkedBlockingQueue對象,而沒有指定其大小,LinkedBlockingQueue會默認為Integer.MAX_VALUE,這樣的話,如果生產者的速度大于消費者的速度,也許還沒有等隊列阻塞產生,系統內存就已經被消耗殆盡了。

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/3/148898.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息