aqs源碼分析,AbstractQueuedSynchronizer AQS源碼分析

 2023-10-18 阅读 30 评论 0

摘要:申明:jdk版本為1.8 AbstractQueuedSynchronizer是jdk中實現鎖的一個抽象類,有排他和共享兩種模式。 我們這里先看排他模式,共享模式后面結合java.util.concurrent.locks.ReentrantReadWriteLock單獨寫一篇隨筆。 后面還會分析可重入鎖java.util.concurre

申明:jdk版本為1.8

AbstractQueuedSynchronizer是jdk中實現鎖的一個抽象類,有排他和共享兩種模式。

我們這里先看排他模式,共享模式后面結合java.util.concurrent.locks.ReentrantReadWriteLock單獨寫一篇隨筆。

后面還會分析可重入鎖java.util.concurrent.locks.ReentrantLock。

aqs源碼分析。言歸正傳,一起來看看吧。

AQS中主要是通過state變量來控制鎖狀態的,子類通過重寫下面的某些方法并控制state值來達到獲取鎖或者解鎖效果:

一般情況下要實現自己的鎖,會實現java.util.concurrent.locks.Lock接口,并通過內部類繼承自java.util.concurrent.locks.AbstractQueuedSynchronizer實現。形如下面這樣:

AbstractQueuedSynchronizer,好了,AQS的擴展就先到這里,后面講具體的鎖時再詳細分析。

下面分析AQS本身的代碼實現。

核心方法是java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(int)獲取指定數量的鎖(實際就是給state加多少的問題)以及java.util.concurrent.locks.AbstractQueuedSynchronizer.release(int)釋放指定數量的鎖(實際就是給state減多少的問題)

acquire

代碼以及注釋說明如下;

linux源碼分析,看注釋就明白了,這是以排他模式獲取鎖,并忽略線程中斷,與acquireInterruptibly相對應。acquireInterruptibly在遇到線程被中斷時會拋出InterruptedException異常。

這里會先調用一次tryAcquire嘗試獲取鎖,如果獲取失敗,就會新建一個Node并加到雙向鏈表尾部,該Node會在自旋里面重復地嘗試直到獲得鎖為止,過程中線程可能會阻塞或者不阻塞,這點完全取決于Node中的waitStatus狀態值,這一點后面會詳細分析。

tryAcquire是子類實現的,負責操作state值,通過state值的控制,子類可以實現各種各樣的鎖,父類的主要邏輯就是控制好Node的鏈表,線程的阻塞、喚醒等。

下面看下addWaiter方法,方法的邏輯就是用一個Node對象包裝當前線程,并將Node加入到雙向鏈表的尾部:

thinkphp源碼分析、acquire采用的是排他模式,這里的入參是Node.EXCLUSIVE,創建一個排他模式的Node。大部分情況下tail是不為null的,看注釋就知道了,

這里外部加了層判斷嘗試以最快方式將新建的Node掛到鏈表尾部。

因為是并發環境,所以compareAndSetTail有可能失敗,失敗的話就進入enq方法,以自旋方式往鏈表尾部添加。

addWaiter完之后將剛剛新建的Node傳入acquireQueued,自旋嘗試獲得鎖:

netty源碼分析、

通過上面代碼可知,如果node的前任node是head,并且tryAcquire獲得鎖成功,就將當前node設為head并返回是否是因為線程中斷而從阻塞狀態喚醒的。

否則,shouldParkAfterFailedAcquire,先檢查是否要阻塞當前線程,如果需要阻塞,則調用parkAndCheckInterrupt,阻塞線程,

并在喚醒后檢查是否是終端導致的線程喚醒,同時設置interrupted = true

下面分別看下這兩個方法:

grbl源碼分析?shouldParkAfterFailedAcquire:

根據前任節點的狀態決定是否應該阻塞,如果前任節點狀態為Node.SIGNAL就直接阻塞當前節點對應的線程,否則檢查前任節點狀態是否為cancelled,

如果是的話,就將前面所有狀態為cancelled的節點剔除,剩下的邏輯就是前任狀態設置為Node.SIGNAL,總結一句話,在自旋過程中如果沒有成功獲得鎖

的情況下,兜兜轉轉最終會返回true,阻塞當前線程。

ceph源碼分析。node狀態描述如下:

返回true后就會執行parkAndCheckInterrupt方法,如下:

調用LockSupport.park阻塞當前線程,LockSupport的分析見我的另外一篇隨筆。

qt源碼分析?這里喚醒一般通過LockSupport.unpark喚醒,注意,線程的interrupt方法也會喚醒該線程,所以這里調用了Thread.interrupted()靜態方法

判斷喚醒方式。喚醒后就會繼續進入自旋嘗試獲得鎖。

release

釋放指定數量的鎖,調用子類tryRelease,將State減去入參對應的數值,如果為0,則表示鎖徹底釋放,進入unparkSuccessor方法,喚醒head后面節點對應的線程。

aqs詳解。?正常情況下喚醒head后面一個節點的thread,但是,如果nextNode為cancel狀態,就從tail往前找最前面符合條件的node。

至此,鎖的獲取、釋放都講完了,還有一個涉及到共享模式的那一路邏輯后面結合java.util.concurrent.locks.ReentrantReadWriteLock再分析。

ConditionObject

首先,一把AQS鎖可以new多個ConditionObject對象,調用await和signal必須在鎖的lock和unlock中間,也就是必須獲得鎖的情況下才能調用,否則會拋出異常。

類似synchronized中調用監視器的wait和notify,不同的是AQS同一把鎖可以有多個conditionObject。

下面詳細分析下AQS的await和signal。

await

/*** Implements interruptible condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,*      throwing IllegalMonitorStateException if it fails.* <li> Block until signalled or interrupted.* <li> Reacquire by invoking specialized version of*      {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/public final void await() throws InterruptedException {
       // 檢查線程是否已經中斷
if (Thread.interrupted())throw new InterruptedException();
       // 將當前線程包裝進Node,狀態為Node.CONDITION,并加到條件隊列末尾Node node
= addConditionWaiter();
       // 釋放掉該線程獲取的所有鎖狀態
int savedState = fullyRelease(node);int interruptMode = 0;
       // 循環等待當前node直到該node從條件隊列轉入等待隊列
       // 或者該線程被中斷,也就是checkInterruptWhileWaiting返回0
while (!isOnSyncQueue(node)) {
         // 如果還沒有轉移到等待隊列則阻塞當前線程
         
// 這里的線程喚醒有一下幾種情況:
         // 1.調用signal后轉移到等待隊列并排隊到該node正常喚醒
         // 2.線程中斷喚醒
         // 3.signal時調用transferForSignal入隊后檢查發現前驅節點取消了,或者對前驅節點設置狀態為Node.SIGNAL時CAS失敗
         LockSupport.park(this);
         // 喚醒后檢查狀態,是否被中斷喚醒,如果不是返回0,如果是,再看是在signal之前還是之后,
         // 之前的話返回THROW_IE,await結束后拋出InterruptedException異常
         // 之后的話返回REINTERRUPT,需要重新設置中斷標志
         // 值得說明的是,線程被中斷喚醒后,不管處在條件隊列的什么位置,都會直接將對應node轉移到等待隊列
         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
}
       // 分析checkInterruptWhileWaiting可知,即使線程中斷也會轉移到等待隊列,所以這里acquireQueued自旋排隊獲取鎖
       // 如果acquireQueued返回true則說明等待過程中發生了中斷,如果不是signal之前發生的中斷,需要重新設置中斷狀態以便外部邏輯感知到
       // 因為checkInterruptWhileWaiting和parkAndCheckInterrupt中都是調用的Thread.interrupted靜態方法,這個方法會清除中斷狀態
       if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
       // 正常signal的node會doSignal的時候設置nextWaiter = null,
       //所以這里的條件滿足情況就是在直到獲得鎖之前都沒有調用signal(因為signal開始執行就設置了
nextWaiter = null)
       if (node.nextWaiter != null) // clean up if cancelled
         // 在上述情況下,將node從條件隊列中移除
          unlinkCancelledWaiters();
      // 如果發生中斷,則根據中斷情況向外反饋
      if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
   }

jquery源碼分析,整個主線邏輯就是上面分析的那樣,下面詳解下關鍵的方法:

checkInterruptWhileWaiting,檢查是否是中斷導致的線程喚醒并返回后續如何處理的標志:

THROW_IE:拋出異常

REINTERRUPT:重新設置中斷標志

有了源碼該如何使用?從上面代碼可知,如果不是中斷,則返回0,重新進入while循環判斷是否在等待隊列中,如果是中斷,則判斷中斷時機:

這里的中斷時機主要針對的signal之前還是之后,因為在signal中會先將node狀態設置為0再將node轉移到等待隊列,如下圖所示:

因為都是CAS操作,所以這里通過判斷能否將狀態由CONDITION設置為0判斷是在signal之前還是之后,如果設置成功,說明在signal操作之前,則將node加入到等待隊列并返回true,對應的狀態是THROW_IE,如果設置失敗,說明signal中的CAS操作搶先了,那就while循環等待調用signal的線程將該node轉移到等待隊列,對應的狀態是REINTERRUPT。

arraylist源碼分析,備注:ConditionObject在jdk的BlockingQueue中有很好應用,可以結合一起看下效果更佳。

轉載于:https://www.cnblogs.com/restart30/p/10918255.html

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/3/149286.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息