ArrayBlockingQueue与LinkedBlockingQueue

 2023-09-05 阅读 377 评论 0

摘要:序 本文主要简单介绍下ArrayBlockingQueue与LinkedBlockingQueue。 对比 queue阻塞与否是否有界线程安全保障适用场景注意事项ArrayBlockingQueue阻塞有界一把全局锁生产消费模型,平衡两边处理速度用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较

本文主要简单介绍下ArrayBlockingQueue与LinkedBlockingQueue。

对比

queue阻塞与否是否有界线程安全保障适用场景注意事项
ArrayBlockingQueue阻塞有界一把全局锁生产消费模型,平衡两边处理速度用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)
LinkedBlockingQueue阻塞可配置存取采用2把锁生产消费模型,平衡两边处理速度无界的时候注意内存溢出问题,用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。
ConcurrentLinkedQueue非阻塞无界CAS对全局的集合进行操作的场景size() 是要遍历一遍集合,慎用

内存方面

  • ArrayBlockingQueue
    用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)
  • LinkedBlockingQueue
    用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。

有界无界

  • ArrayBlockingQueue
    有界,适合已知最大存储容量的场景
  • LinkedBlockingQueue
    可有界可以无界

吞吐量

LinkedBlockingQueue在大多数并发的场景下吞吐量比ArrayBlockingQueue,但是性能不稳定。

Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

图片描述

测试结果表明,LinkedBlockingQueue的可伸缩性要高于ArrayBlockingQueue。初看起来,这个结果有些奇怪:链表队列在每次插入元素时,都必须分配一个链表节点对象,这似乎比基于数组的队列执行了更多的工作。然而,虽然它拥有更好的内存分配与GC等开销,但与基于数组的队列相比,链表队列的put和take等方法支持并发性更高的访问,因为一些优化后的链接队列算法能将队列头节点的更新操作与尾节点的更新操作分离开来。由于内存分配操作通常是线程本地的,因此如果算法能通过多执行一些内存分配操作来降低竞争程度,那么这种算法通常具有更高的可伸缩性。

并发方面

  • ArrayBlockingQueue
    采用一把锁,两个condition
    /** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;/*** Inserts element at current put position, advances, and signals.* Call only when holding lock.*/private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}/*** Extracts element at current take position, advances, and signals.* Call only when holding lock.*/private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;}

此外还支持公平锁

/*** Creates an {@code ArrayBlockingQueue} with the given (fixed)* capacity and the specified access policy.** @param capacity the capacity of this queue* @param fair if {@code true} then queue accesses for threads blocked*        on insertion or removal, are processed in FIFO order;*        if {@code false} the access order is unspecified.* @throws IllegalArgumentException if {@code capacity < 1}*/public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();}
  • LinkedBlockingQueue
    头尾各1把锁
    /** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();/*** Inserts the specified element at the tail of this queue if it is* possible to do so immediately without exceeding the queue's capacity,* returning {@code true} upon success and {@code false} if this queue* is full.* When using a capacity-restricted queue, this method is generally* preferable to method {@link BlockingQueue#add add}, which can fail to* insert an element only by throwing an exception.** @throws NullPointerException if the specified element is null*/public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;}public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

应用实例

Executors

里头用了LinkedBlockingQueue

    public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}

使用LinkedBlockingQueue实现logger

public class BungeeLogger extends Logger {private final ColouredWriter writer;private final Formatter formatter = new ConciseFormatter();
//    private final LogDispatcher dispatcher = new LogDispatcher(this);private final BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<>();volatile boolean running = true;Thread recvThread = new Thread(){@Overridepublic void run() {while (!isInterrupted() && running) {LogRecord record;try {record = queue.take();} catch (InterruptedException ex) {continue;}doLog(record);}for (LogRecord record : queue) {doLog(record);}}};public BungeeLogger() throws IOException {super("BungeeCord", null);this.writer = new ColouredWriter(new ConsoleReader());try {FileHandler handler = new FileHandler("proxy.log", 1 << 24, 8, true);handler.setFormatter(formatter);addHandler(handler);} catch (IOException ex) {System.err.println("Could not register logger!");ex.printStackTrace();}recvThread.start();Runtime.getRuntime().addShutdownHook(new Thread(){@Overridepublic void run() {running = false;}});}@Overridepublic void log(LogRecord record) {if (running) {queue.add(record);}}void doLog(LogRecord record) {super.log(record);writer.print(formatter.format(record));}
}

doc

  • BungeeCord

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/1/450.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息