国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

并發容器——BlockingQueue相關類

開發 后端

java.util.concurrent提供了多種并發容器,總體上來說有4類

Queue類:BlockingQueue ConcurrentLinkedQueue

Map類:ConcurrentMap

Set類:ConcurrentSkipListSet CopyOnWriteArraySet

List類:CopyOnWriteArrayList

接下來一系列文章,我會對每一類的源碼進行分析,試圖讓它們的實現機制完全暴露在大家面前。這篇主要是BlockingQueue及其相關類。

先給出結構圖:

 

 

下面我按這樣的順序來展開:

1、BlockingQueue

2、ArrayBlockingQueue 2.1 添加新元素的方法:add/put/offer

2.2 該類的幾個實例變量:takeIndex/putIndex/count/

2.3 Condition實現

3、LinkedBlockingQueue

4、PriorityBlockingQueue

5、DelayQueue

6、BlockingDque+LinkedBlockingQueue

其中前兩個分析的盡量詳細,為了方便大家看,基本貼出了所有相關源碼。后面幾個就用盡量用文字論述,如果看得吃力,建議對著jdk的源碼看。

1、BlockingQueue

BlockingQueue繼承了Queue,Queu是先入先出(FIFO),BlockingQueue是JDK 5.0新引入的。

根據隊列null/full時的表現,BlockingQueue的方法分為以下幾類:

 

 

至于為什么要使用并發容器,一個典型的例子就是生產者-消費者的例子,為了精簡本文篇幅,放到附件中見附件:“生產者-消費者 測試.rar”。

另外,BlockingQueue接口定義的所有方法實現都是線程安全的,它的實現類里面都會用鎖和其他控制并發的手段保證這種線程安全,但是這些類同時也實現了Collection接口(主要是AbstractQueue實現),所以會出現BlockingQueue的實現類也能同時使用Conllection接口方法,而這時會出現的問題就是像addAll,containsAll,retainAll和removeAll這類批量方法的實現不保證線程安全,舉個例子就是addAll 10個items到一個ArrayBlockingQueue,可能中途失敗但是卻有幾個item已經被放進這個隊列里面了。

2、ArrayBlockingQueue

ArrayBlockingQueue創建的時候需要指定容量capacity(可以存儲的最大的元素個數,因為它不會自動擴容)以及是否為公平鎖(fair參數)。

在創建ArrayBlockingQueue的時候默認創建的是非公平鎖,不過我們可以在它的構造函數里指定。這里調用ReentrantLock的構造函數創建鎖的時候,調用了:

public ReentrantLock(boolean fair) {

sync = (fair)? new FairSync() : new NonfairSync();

}

FairSync/ NonfairSync是ReentrantLock的內部類:

線程按順序請求獲得公平鎖,而一個非公平鎖可以闖入,如果鎖的狀態可用,請求非公平鎖的線程可在等待隊列中向前跳躍,獲得該鎖。內部鎖synchronized沒有提供確定的公平性保證。

分三點來講這個類:

2.1 添加新元素的方法:add/put/offer

2.2 該類的幾個實例變量:takeIndex/putIndex/count/

2.3 Condition實現

2.1 添加新元素的方法:add/put/offer

首先,談到添加元素的方法,首先得分析以下該類同步機制中用到的鎖:

Java代碼

  1. lock = new ReentrantLock(fair);     
  2. notEmpty = lock.newCondition();//Condition Variable 1     
  3. notFull =  lock.newCondition();//Condition Variable 2    

 

這三個都是該類的實例變量,只有一個鎖lock,然后lock實例化出兩個Condition,notEmpty/noFull分別用來協調多線程的讀寫操作。

Java代碼

  1. 1、     
  2. public boolean offer(E e) {     
  3.         if (e == nullthrow new NullPointerException();     
  4.         final ReentrantLock lock = this.lock;//每個對象對應一個顯示的鎖     
  5.         lock.lock();//請求鎖直到獲得鎖(不可以被interrupte)     
  6.         try {     
  7.             if (count == items.length)//如果隊列已經滿了     
  8.                 return false;     
  9.             else {     
  10.                 insert(e);     
  11.                 return true;     
  12.             }     
  13.         } finally {     
  14.             lock.unlock();//     
  15.         }     
  16. }     
  17. 看insert方法:     
  18. private void insert(E x) {     
  19.         items[putIndex] = x;     
  20.         //增加全局index的值。     
  21.         /*    
  22.         Inc方法體內部:    
  23.         final int inc(int i) {    
  24.         return (++i == items.length)? 0 : i;    
  25.             }    
  26.         這里可以看出ArrayBlockingQueue采用從前到后向內部數組插入的方式插入新元素的。如果插完了,putIndex可能重新變為0(在已經執行了移除操作的前提下,否則在之前的判斷中隊列為滿)    
  27.         */    
  28.         putIndex = inc(putIndex);      
  29.         ++count;     
  30.         notEmpty.signal();//wake up one waiting thread     
  31. }    

 

Java代碼

  1. public void put(E e) throws InterruptedException {     
  2.         if (e == nullthrow new NullPointerException();     
  3.         final E[] items = this.items;     
  4.         final ReentrantLock lock = this.lock;     
  5.         lock.lockInterruptibly();//請求鎖直到得到鎖或者變為interrupted     
  6.         try {     
  7.             try {     
  8.                 while (count == items.length)//如果滿了,當前線程進入noFull對應的等waiting狀態     
  9.                     notFull.await();     
  10.             } catch (InterruptedException ie) {     
  11.                 notFull.signal(); // propagate to non-interrupted thread     
  12.                 throw ie;     
  13.             }     
  14.             insert(e);     
  15.         } finally {     
  16.             lock.unlock();     
  17.         }     
  18. }    

 

Java代碼

  1. public boolean offer(E e, long timeout, TimeUnit unit)     
  2.         throws InterruptedException {     
  3.     
  4.         if (e == nullthrow new NullPointerException();     
  5.     long nanos = unit.toNanos(timeout);     
  6.         final ReentrantLock lock = this.lock;     
  7.         lock.lockInterruptibly();     
  8.         try {     
  9.             for (;;) {     
  10.                 if (count != items.length) {     
  11.                     insert(e);     
  12.                     return true;     
  13.                 }     
  14.                 if (nanos <= 0)     
  15.                     return false;     
  16.                 try {     
  17.                 //如果沒有被 signal/interruptes,需要等待nanos時間才返回     
  18.                     nanos = notFull.awaitNanos(nanos);     
  19.                 } catch (InterruptedException ie) {     
  20.                     notFull.signal(); // propagate to non-interrupted thread     
  21.                     throw ie;     
  22.                 }     
  23.             }     
  24.         } finally {     
  25.             lock.unlock();     
  26.         }     
  27.     }    

 

Java代碼

  1. public boolean add(E e) {     
  2.     return super.add(e);     
  3.     }     
  4. 父類:     
  5. public boolean add(E e) {     
  6.         if (offer(e))     
  7.             return true;     
  8.         else    
  9.             throw new IllegalStateException("Queue full");     
  10.     }    

 

2.2 該類的幾個實例變量:takeIndex/putIndex/count

Java代碼

  1. 用三個數字來維護這個隊列中的數據變更:     
  2. /** items index for next take, poll or remove */    
  3.     private int takeIndex;     
  4.     /** items index for next put, offer, or add. */    
  5.     private int putIndex;     
  6.     /** Number of items in the queue */    
  7.     private int count;    

 

提取元素的三個方法take/poll/remove內部都調用了這個方法:

Java代碼

  1. private E extract() {     
  2.         final E[] items = this.items;     
  3.         E x = items[takeIndex];     
  4.         items[takeIndex] = null;//移除已經被提取出的元素     
  5.         takeIndex = inc(takeIndex);//策略和添加元素時相同     
  6.         --count;     
  7.         notFull.signal();//提醒其他在notFull這個Condition上waiting的線程可以嘗試工作了     
  8.         return x;     
  9.     }   

 

從這個方法里可見,tabkeIndex維護一個可以提取/移除元素的索引位置,因為takeIndex是從0遞增的,所以這個類是FIFO隊列。

putIndex維護一個可以插入的元素的位置索引。

count顯然是維護隊列中已經存在的元素總數。

2.3 Condition實現

Condition現在的實現只有java.util.concurrent.locks.AbstractQueueSynchoronizer內部的ConditionObject,并且通過ReentranLock的newCondition()方法暴露出來,這是因為Condition的await()/sinal()一般在lock.lock()與lock.unlock()之間執行,當執行condition.await()方法時,它會首先釋放掉本線程持有的鎖,然后自己進入等待隊列。直到sinal(),喚醒后又會重新試圖去拿到鎖,拿到后執行await()下的代碼,其中釋放當前鎖和得到當前鎖都需要ReentranLock的tryAcquire(int arg)方法來判定,并且享受ReentranLock的重進入特性。

Java代碼

  1. public final void await() throws InterruptedException {     
  2.             if (Thread.interrupted())     
  3.                 throw new InterruptedException();     
  4.            //加一個新的condition等待節點     
  5.  Node node = addConditionWaiter();     
  6. //釋放自己的鎖     
  7.             int savedState = fullyRelease(node);      
  8.             int interruptMode = 0;     
  9.             while (!isOnSyncQueue(node)) {     
  10.             //如果當前線程 等待狀態時CONDITION,park住當前線程,等待condition的signal來解除     
  11.                 LockSupport.park(this);     
  12.                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)     
  13.                     break;     
  14.             }     
  15.             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)     
  16.                 interruptMode = REINTERRUPT;     
  17.             if (node.nextWaiter != null)     
  18.                 unlinkCancelledWaiters();     
  19.             if (interruptMode != 0)     
  20.                 reportInterruptAfterWait(interruptMode);     
  21.         }   

 

3、LinkedBlockingQueue

單向鏈表結構的隊列。如果不指定容量默認為Integer.MAX_VALUE。通過putLock和takeLock兩個鎖進行同步,兩個鎖分別實例化notFull和notEmpty兩個Condtion,用來協調多線程的存取動作。其中某些方法(如remove,toArray,toString,clear等)的同步需要同時獲得這兩個鎖,并且總是先putLock.lock緊接著takeLock.lock(在同一方法fullyLock中),這樣的順序是為了避免可能出現的死鎖情況(我也想不明白為什么會是這樣?)

4、PriorityBlockingQueue

看它的三個屬性,就基本能看懂這個類了:

Java代碼

  1. private final PriorityQueue q;     
  2.     private final ReentrantLock lock = new ReentrantLock(true);     
  3.     private final Condition notEmpty = lock.newCondition();   

 

q說明,本類內部數據結構是PriorityQueue,至于PriorityQueue怎么排序看我之前一篇文章:http://jiadongkai-sina-com.iteye.com/blog/825683

lock說明本類使用一個lock來同步讀寫等操作。

notEmpty協調隊列是否有新元素提供,而隊列滿了以后會調用PriorityQueue的grow方法來擴容。

5、DelayQueue

Delayed接口繼承自Comparable,我們插入的E元素都要實現這個接口。

DelayQueue的設計目的間API文檔:

An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will returnnull. Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.

因為DelayQueue構造函數了里限定死不允許傳入comparator(之前的PriorityBlockingQueue中沒有限定死),即只能在compare方法里定義優先級的比較規則。再看上面這段英文,“The head of the queue is that Delayed element whose delay expired furthest in the past.”說明compare方法實現的時候要保證最先加入的元素最早結束延時。而 “Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.”說明getDelay方法的實現必須保證延時到了返回的值變為<=0的int。

上面這段英文中,還說明了:在poll/take的時候,隊列中元素會判定這個elment有沒有達到超時時間,如果沒有達到,poll返回null,而take進入等待狀態。但是,除了這兩個方法,隊列中的元素會被當做正常的元素來對待。例如,size方法返回所有元素的數量,而不管它們有沒有達到超時時間。而協調的Condition available只對take和poll是有意義的。

另外需要補充的是,在ScheduledThreadPoolExecutor中工作隊列類型是它的內部類DelayedWorkQueue,而DelayedWorkQueue的Task容器是DelayQueue類型,而ScheduledFutureTask作為Delay的實現類作為Runnable的封裝后的Task類。也就是說ScheduledThreadPoolExecutor是通過DelayQueue優先級判定規則來執行任務的。

6、BlockingDque+LinkedBlockingQueue

BlockingDque為阻塞雙端隊列接口,實現類有LinkedBlockingDque。雙端隊列特別之處是它首尾都可以操作。LinkedBlockingDque不同于LinkedBlockingQueue,它只用一個lock來維護讀寫操作,并由這個lock實例化出兩個Condition notEmpty及notFull,而LinkedBlockingQueue讀和寫分別維護一個lock。

【編輯推薦】

  1. Java Web應用開發中的一些概念
  2. Tomcat 7 應用實測:聲明式Servlet 3.0
  3. 探秘Servlet 3.0中的Web安全改進
  4. 簡化Web應用開發 Servlet 3.0特性詳解
  5. Servlet 3.0的異步處理
責任編輯:金賀 來源: ITEYE博客
相關推薦

2023-07-03 09:59:00

并發編程并發容器

2025-06-10 10:15:00

Java容器并發

2020-06-29 07:52:17

Java工具類開發

2023-06-30 08:27:20

2025-01-14 00:00:00

Blocking隊列元素

2022-07-04 11:39:21

并發容器同步容器機制

2009-08-05 18:39:54

C#異常類

2023-12-07 08:13:58

Java開發

2010-02-06 15:49:31

刪除C++容器值

2020-07-01 07:52:07

Java并發容器

2024-05-29 08:49:45

2010-01-05 16:15:05

.NET Framew

2011-07-13 14:58:53

STL容器

2024-12-26 07:49:57

Java隊列線程

2022-10-12 07:53:46

并發編程同步工具

2009-09-01 16:14:08

C# Socket類

2010-02-01 17:31:06

C++類成員

2009-12-24 15:42:01

ADO類庫

2011-06-24 14:17:58

Qt 容器類 QVector

2009-12-21 16:24:24

WCF新到工廠
點贊
收藏

51CTO技術棧公眾號

欧美成人在线直播| 久久综合伊人| 国产午夜精品全部视频在线播放| 国产香蕉在线| 亚洲激情自拍偷拍| 日韩有码免费视频| 九九视频精品免费| 日韩在线第一区| 另类av一区二区| 久久伊人资源站| 免费亚洲一区| 日本欧洲国产一区二区| 丝袜亚洲另类欧美| 亚洲欧美国产精品桃花| 免费观看在线色综合| 午夜精品美女久久久久av福利| 午夜在线视频一区二区区别| 国产精品露出视频| 午夜精品一区在线观看| 中文字幕有码在线视频| 亚洲成av人片在线观看无码| 欧美xxxx综合视频| 久久69成人| 久久精品国产一区二区电影| 91tv亚洲精品香蕉国产一区| 国产一区二区免费| 国产日韩综合一区二区性色av| 精品一区二区三区在线观看视频| 自拍亚洲一区欧美另类| 一区在线不卡| 久久久国产精彩视频美女艺术照福利| 色综合视频一区二区三区44| 久久久www成人免费精品| 国产美女亚洲精品7777| 91av福利视频| 91综合在线| 理论片午夜视频在线观看| 成人爽a毛片一区二区免费| 亚洲欧美中文日韩在线| 亚洲欧洲日产国码无码久久99| 亚洲美女久久精品| 免费看黄色91| 久久视频国产精品免费视频在线| 亚洲成人套图| 欧美精品一二三| 东京一区二区| 国产精品久久久久高潮| 2019中文字幕在线电影免费 | 日韩二区三区在线观看| 青少年xxxxx性开放hg| 狠狠色丁香婷婷综合久久片| 日韩精品一区在线视频| 国产视频911| 免费观看v片在线观看| 欧美性生活一区| 综合久久2023| 欧美一区二粉嫩精品国产一线天| 日韩中文首页| 天堂v在线视频| 91婷婷韩国欧美一区二区| 高清av影院| 欧美日韩一区高清| 国外成人福利视频| 亚洲va电影大全| 成人免费黄色在线| 深夜福利在线视频| 少妇高潮 亚洲精品| jlzzjlzz亚洲女人| 一区在线电影| 亚洲成av人**亚洲成av**| 蜜桃视频动漫在线播放| 欧美专区在线观看| 日韩精品乱码av一区二区| 99热手机在线| 精品美女一区二区| 在线一级成人| 成人午夜视频免费观看| 午夜精品福利一区二区三区av| 成人免费看黄| 成人免费视频观看视频| 久久综合九色综合欧美98| 波多野结衣在线影院| 精品国产一区二区三区久久久狼 | 欧美精品亚州精品| 亚洲人成人一区二区三区| 国产精品理伦片| 日本韩国欧美| 国产精品久久97| 日韩福利电影在线观看| www.视频在线.com| 午夜免费电影一区在线观看| 91精品国产91久久久久久最新毛片 | 首页国产欧美日韩丝袜| www.成人爱| 黄页网站视频在线观看| 久久五月天婷婷| 成人免费精品视频| 精品这里只有精品| 久久久精品免费免费| 午夜电影福利| 日韩小视频在线| 欧美激情第8页| www在线观看免费| 粉嫩嫩av羞羞动漫久久久| 香蕉视频禁止18| 午夜精品成人在线视频| 久艹视频在线免费观看| 亚洲国产日韩a在线播放性色| 国产 日韩 亚洲 欧美| 视频一区二区不卡| 在线视频资源站| 偷拍盗摄高潮叫床对白清晰| 亚洲视频在线一区| 少妇精品视频一区二区免费看| 99在线视频播放| 一区二区视频免费在线观看| 99久热在线精品视频观看| 亚洲开发第一视频在线播放| 欧美视频一区二区在线观看| 亚洲v天堂v手机在线| 国产在线观看福利| 伊人精品在线观看| 国产成人一区在线| 在线毛片观看| 欧美性受xxxx黑人猛交88| 精品久久久三级丝袜| 狠狠久久婷婷| avtt亚洲| 国产在线精品一区二区中文| 色婷婷综合久久久中文一区二区 | 亚洲综合激情另类小说区| 豆花视频一区二区| 亚洲这里只有精品| 欧美国产视频日韩| 国产精品污网站| 成人精品毛片| 色婷五月综激情亚洲综合| 国产做受高潮69| 亚洲欧美一区二区三区久本道91| 欧美调教视频| 在线国产福利网站| 99超碰麻豆| 精品国产一区二区三区av性色| 美女视频网站黄色亚洲| 亚洲女同av| 成年人视频在线免费| 国内伊人久久久久久网站视频 | 欧美精品1区2区| 美女精品在线| 成人直播视频| 国产91对白刺激露脸在线观看| 欧美夫妻性生活xx| 亚洲不卡在线观看| 亚洲综合二区| 国产成人免费| 男人天堂2020| 精品在线一区| 在线播放亚洲激情| 亚洲欧洲美洲综合色网| 欧美xxxx中国| 国产丝袜在线播放| 欧美一级片中文字幕| 日韩美女主播视频| 在线播放一区二区三区| 国产成人在线看| 亚洲区小说区图片区qvod按摩| 深夜福利在线观看直播| 亚洲欧美国产不卡| 久久男人av资源网站| 欧美午夜久久久| 国产精品一区久久久久| 丝袜久久网站| 女女色综合影院| 欧美国产激情视频| 亚洲一区免费网站| 亚洲一级免费视频| 亚洲午夜国产一区99re久久| 一区二区三区四区五区精品视频 | 日韩在线激情视频| 一区二区高清视频在线观看| 伊人久久大香线蕉综合热线| 欧美成人精品一区二区男人小说| 一道本在线免费视频| 久久亚洲午夜电影| 欧美精品videosex极品1| 欧美天天综合网| 国产三区在线成人av| 一个色综合网| 亚洲精品一级| 欧美日韩高清| 97欧美成人| 男人的天堂免费在线视频| 国产精品视频黄色| 99久久99久久| 精品国产网站地址| av电影一区二区| 国产经典三级在线| 男女人搞j网站| 日韩中文不卡| 日韩女优人人人人射在线视频|