国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

孔乙己:Kotlin生產者消費者問題的八種解法

開發(fā) 后端
生產者和消費者問題是線程模型中的經典問題:生產者和消費者在同一時間段內共用同一個緩沖區(qū)(Buffer),生產者往 Buffer 中添加產品,消費者從 Buffer 中取走產品,當 Buffer 為空時,消費者阻塞,當 Buffer 滿時,生產者阻塞。

 

[[420560]]

本文轉載自微信公眾號「AndroidPub」,作者fundroid。轉載本文請聯(lián)系AndroidPub公眾號。

生產者和消費者問題是線程模型中的經典問題:生產者和消費者在同一時間段內共用同一個緩沖區(qū)(Buffer),生產者往 Buffer 中添加產品,消費者從 Buffer 中取走產品,當 Buffer 為空時,消費者阻塞,當 Buffer 滿時,生產者阻塞。

Kotlin 中有多種方法可以實現多線程的生產/消費模型(大多也適用于Java)

  1. Synchronized
  2. ReentrantLock
  3. BlockingQueue
  4. Semaphore
  5. PipedXXXStream
  6. RxJava
  7. Coroutine
  8. Flow

1. Synchronized

Synchronized 是最最基本的線程同步工具,配合 wait/notify 可以實現實現生產消費問題。

  1. val buffer = LinkedList<Data>() 
  2. val MAX = 5 //buffer最大size 
  3.  
  4. val lock = Object() 
  5.  
  6. fun produce(data: Data) { 
  7.     sleep(2000) // mock produce 
  8.     synchronized(lock) { 
  9.         while (buffer.size >= MAX) { 
  10.            // 當buffer滿時,停止生產 
  11.            // 注意此處使用while不能使用if,因為有可能是被另一個生產線程而非消費線程喚醒,所以要再次檢查buffer狀態(tài) 
  12.            // 如果生產消費兩把鎖,則不必擔心此問題 
  13.            lock.wait() 
  14.         } 
  15.  
  16.       buffer.push(data) 
  17.         // notify方法只喚醒其中一個線程,選擇哪個線程取決于操作系統(tǒng)對多線程管理的實現。 
  18.         // notifyAll會喚醒所有等待中線程,哪一個線程將會第一個處理取決于操作系統(tǒng)的實現,但是都有機會處理。 
  19.         // 此處使用notify有可能喚醒的是另一個生產線程從而造成死鎖,所以必須使用notifyAll 
  20.         lock.notifyAll() 
  21.     } 
  22.  
  23. fun consume() { 
  24.     synchronized(lock) { 
  25.         while (buffer.isEmpty()) 
  26.             lock.wait() // 暫停消費 
  27.         buffer.removeFirst() 
  28.         lock.notifyAll() 
  29.     } 
  30.     sleep(2000) // mock consume 
  31.  
  32.  
  33.  
  34. @Test 
  35. fun test() { 
  36.     // 同時啟動多個生產、消費線程 
  37.     repeat(10) { 
  38.         Thread { produce(Data()) }.start() 
  39.     } 
  40.     repeat(10) { 
  41.         Thread { consume() }.start() 
  42.     } 

2. ReentrantLock

Lock 相對于 Synchronized 好處是當有多個生產線/消費線程時,我們可以通過定義多個 condition 精確指定喚醒哪一個。下面的例子展示 Lock 配合 await/single 替換前面 Synchronized 寫法。

  1. val buffer = LinkedList<Data>() 
  2. val MAX = 5 //buffer最大size 
  3.               
  4. val lock = ReentrantLock()                      
  5. val condition = lock.newCondition()           
  6.                                                 
  7. fun produce(data: Data) {                       
  8.     sleep(2000) // mock produce                 
  9.     lock.lock()                                 
  10.                                                 
  11.     while (buffer.size >= 5)                       
  12.         condition.await()                       
  13.                                                 
  14.     buffer.push(data)                           
  15.     condition.signalAll()                       
  16.     lock.unlock()                               
  17. }                                               
  18.                                                 
  19. fun consume() {                                 
  20.     lock.lock()                                 
  21.     while (buffer.isEmpty())                       
  22.         condition.await()                       
  23.                                                 
  24.     buffer.removeFirst() 
  25.     condition.singleAll()                         
  26.     lock.unlock()                               
  27.     sleep(2000) // mock consume                 
  28. }                                             

3. BlockingQueue (阻塞隊列)

BlockingQueue在達到臨界條件時,再進行讀寫會自動阻塞當前線程等待鎖的釋放,天然適合這種生產/消費場景。

  1. val buffer = LinkedBlockingQueue<Data>(5)                
  2.                                                          
  3. fun produce(data: Data) {                                
  4.     sleep(2000) // mock produce                          
  5.     buffer.put(data) //buffer滿時自動阻塞                        
  6.                                                          
  7. fun consume() {                                          
  8.     buffer.take() // buffer空時自動阻塞 
  9.     sleep(2000) // mock consume                          
  10. }                                                        
  11.                                       

注意 BlockingQueue 的有三組讀/寫方法,只有一組有阻塞效果,不要用錯。

方法 說明
add(o)/remove(o) add 方法在添加元素的時候,若超出了隊列的長度會直接拋出異常
offer(o)/poll(o) offer 在添加元素時,如果發(fā)現隊列已滿無法添加的話,會直接返回false
put(o)/take(o) put 向隊尾添加元素的時候發(fā)現隊列已經滿了會發(fā)生阻塞一直等待空間,以加入元素
 

4. Semaphore(信號量)

Semaphore 是 JUC 提供的一種共享鎖機制,可以進行擁塞控制,此特性可用來控制 buffer 的大小。

  1. // canProduce: 可以生產數量(即buffer可用的數量),生產者調用acquire,減少permit數目     
  2. val canProduce = Semaphore(5)                                                                                            
  3. // canConsumer:可以消費數量,生產者調用release,增加permit數目                   
  4. val canConsume = Semaphore(5)                                                                                       
  5. // 控制buffer訪問互斥                                                 
  6. val mutex = Semaphore(0)                                        
  7.                                                                 
  8. val buffer = LinkedList<Data>()                                 
  9.                                                                 
  10. fun produce(data: Data) {                                       
  11.     if (canProduce.tryAcquire()) {                              
  12.         sleep(2000) // mock produce                             
  13.                                                                 
  14.         mutex.acquire()                                         
  15.         buffer.push(data)                                       
  16.         mutex.release()                                         
  17.                                                                 
  18.         canConsume.release() //通知消費端新增加了一個產品                    
  19.     }                                                           
  20. }                                                               
  21.                                                                 
  22. fun consume() {                                                 
  23.     if (canConsume.tryAcquire()) {                              
  24.         sleep(2000) // mock consume                             
  25.                                                                 
  26.         mutex.acquire()                                         
  27.         buffer.removeFirst()                                    
  28.         mutex.release()                                         
  29.                                                                 
  30.         canProduce.release() //通知生產端可以再追加生產                     
  31.     }                                                           
  32.                                                                 
  33. }                                         

5. PipedXXXStream (管道)

Java 里的管道輸入/輸出流 PipedInputStream / PipedOutputStream 實現了類似管道的功能,用于不同線程之間的相互通信,輸入流中有一個緩沖數組,當緩沖數組為空的時候,輸入流 PipedInputStream 所在的線程將阻塞。

  1. val pis: PipedInputStream = PipedInputStream() 
  2. val pos: PipedOutputStream by lazy { 
  3.     PipedOutputStream().apply { 
  4.         pis.connect(this) //輸入輸出流之間建立連接 
  5.     } 
  6.  
  7. fun produce(data: ContactsContract.Data) { 
  8.     while (true) { 
  9.         sleep(2000) 
  10.         pos.use { // Kotlin 使用 use 方便的進行資源釋放 
  11.             it.write(data.getBytes()) 
  12.             it.flush() 
  13.         } 
  14.     } 
  15.  
  16. fun consume() { 
  17.     while (true) { 
  18.         sleep(2000) 
  19.         pis.use { 
  20.             val byteArray = ByteArray(1024) 
  21.             it.read(byteArray) 
  22.         } 
  23.     } 
  24.  
  25. @Test 
  26. fun Test() { 
  27.     repeat(10) { 
  28.         Thread { produce(Data()) }.start() 
  29.     } 
  30.  
  31.     repeat(10) { 
  32.         Thread { consume() }.start() 
  33.     } 

6. RxJava

RxJava 從概念上,可以將 Observable/Subject 作為生產者, Subscriber 作為消費者, 但是無論 Subject 或是 Observable 都缺少 Buffer 溢出時的阻塞機制,難以獨立實現生產者/消費者模型。

Flowable 的背壓機制,可以用來控制 buffer 數量,并在上下游之間建立通信, 配合 Atomic 可以變向實現單生產者/單消費者場景,(不適用于多生產者/多消費者場景)。

  1. class Producer : Flowable<Data>() { 
  2.  
  3.     override fun subscribeActual(subscriber: org.reactivestreams.Subscriber<in Data>) { 
  4.         subscriber.onSubscribe(object : Subscription { 
  5.             override fun cancel() { 
  6.                 //... 
  7.             } 
  8.  
  9.             private val outStandingRequests = AtomicLong(0) 
  10.  
  11.             override fun request(n: Long) { //收到下游通知,開始生產 
  12.                 outStandingRequests.addAndGet(n) 
  13.  
  14.                 while (outStandingRequests.get() > 0) { 
  15.                     sleep(2000) 
  16.                     subscriber.onNext(Data()) 
  17.                     outStandingRequests.decrementAndGet() 
  18.                 } 
  19.             } 
  20.  
  21.         }) 
  22.     } 
  23.  
  24.  
  25.  
  26.  
  27. class Consumer : DefaultSubscriber<Data>() { 
  28.  
  29.     override fun onStart() { 
  30.         request(1) 
  31.     } 
  32.  
  33.     override fun onNext(i: Data?) { 
  34.         sleep(2000) //mock consume 
  35.         request(1) //通知上游可以增加生產 
  36.     } 
  37.  
  38.     override fun onError(throwable: Throwable) { 
  39.         //... 
  40.     } 
  41.  
  42.     override fun onComplete() { 
  43.         //... 
  44.     } 
  45.  
  46.  
  47.  
  48. @Test 
  49. fun test_rxjava() { 
  50.     try { 
  51.         val testProducer = Producer) 
  52.         val testConsumer = Consumer() 
  53.  
  54.         testProducer 
  55.             .subscribeOn(Schedulers.computation()) 
  56.             .observeOn(Schedulers.single()) 
  57.             .blockingSubscribe(testConsumer) 
  58.  
  59.     } catch (t: Throwable) { 
  60.         t.printStackTrace() 
  61.     } 
  62.  

7. Coroutine Channel

協(xié)程中的 Channel 具有擁塞控制機制,可以實現生產者消費者之間的通信。可以把 Channel 理解為一個協(xié)程版本的阻塞隊列,capacity 指定隊列容量。

  1. val channel = Channel<Data>(capacity = 5) 
  2.  
  3. suspend fun produce(data: ContactsContract.Contacts.Data) = run { 
  4.     delay(2000) //mock produce 
  5.     channel.send(data) 
  6.  
  7.  
  8. suspend fun consume() = run { 
  9.     delay(2000)//mock consume 
  10.     channel.receive() 
  11.  
  12. @Test 
  13. fun test_channel() { 
  14.     repeat(10) { 
  15.         GlobalScope.launch { 
  16.             produce(Data()) 
  17.         } 
  18.     } 
  19.  
  20.     repeat(10) { 
  21.         GlobalScope.launch { 
  22.            consume() 
  23.         } 
  24.     } 

此外,Coroutine 提供了 produce 方法,在聲明 Channel 的同時生產數據,寫法上更簡單,適合單消費者單生產者的場景:

  1. fun CoroutineScope.produce(): ReceiveChannel<Data> = produce { 
  2.     repeat(10) { 
  3.         delay(2000) //mock produce 
  4.         send(Data()) 
  5.     } 
  6.  
  7. @Test 
  8. fun test_produce() { 
  9.     GlobalScope.launch { 
  10.         produce.consumeEach { 
  11.             delay(2000) //mock consume 
  12.         } 
  13.     } 

8. Coroutine Flow

Flow 跟 RxJava 一樣,因為缺少 Buffer 溢出時的阻塞機制,不適合處理生產消費問題,其背壓機制也比較簡單,無法像 RxJava 那樣收到下游通知。但是 Flow 后來發(fā)布了 SharedFlow, 作為帶緩沖的熱流,提供了 Buffer 溢出策略,可以用作生產者/消費者之間的同步。

  1. val flow : MutableSharedFlow<Data> = MutableSharedFlow( 
  2.     extraBufferCapacity = 5  //緩沖大小 
  3.     , onBufferOverflow = BufferOverflow.SUSPEND // 緩沖溢出時的策略:掛起 
  4.  
  5. @Test 
  6. fun test() { 
  7.  
  8.     GlobalScope.launch { 
  9.         repeat(10) { 
  10.             delay(2000) //mock produce 
  11.             sharedFlow.emit(Data()) 
  12.         } 
  13.     } 
  14.  
  15.     GlobalScope.launch { 
  16.         sharedFlow.collect { 
  17.             delay(2000) //mock consume 
  18.         } 
  19.     } 

注意 SharedFlow 也只能用在單生產者/單消費者場景。

總結

生產者/消費者問題,其本質核心還是多線程讀寫共享資源(Buffer)時的同步問題,理論上只要具有同步機制的多線程框架,例如線程鎖、信號量、阻塞隊列、協(xié)程 Channel等,都是可以實現生產消費模型的。 

另外,RxJava 和 Flow 雖然也是多線程框架,但是缺少Buffer溢出時的阻塞機制,不適用于生產/消費場景,更適合在純響應式場景中使用。

 

責任編輯:武曉燕 來源: AndroidPub
相關推薦

2015-08-26 09:39:30

java消費者

2009-08-13 13:14:31

C#生產者和消費者

2021-12-22 11:00:05

模型Golang語言

2012-02-14 12:31:27

Java

2024-03-14 11:58:43

2017-05-16 12:30:21

Python多線程生產者消費者模式

2024-10-11 09:27:52

2021-04-20 08:32:51

消息MQ隊列

2021-12-28 12:01:59

Kafka 消費者機制

2024-08-27 10:19:31

2020-09-14 08:45:58

多線程模型面試

2023-06-01 08:08:38

kafka消費者分區(qū)策略

2015-06-15 11:29:34

數據中心綠色數據中心

2021-10-26 10:50:25

Kafkabroker

2014-12-10 21:50:44

AdMaster

2022-07-07 09:00:49

RocketMQ消費者消息消費

2011-07-22 16:25:38

CA TechnoloIT消費化

2011-08-05 16:21:24

2020-12-31 10:00:40

PoS終端終端安全網絡攻擊

2011-11-15 10:05:29

Kindle Fire平板市場
點贊
收藏

51CTO技術棧公眾號

日韩在线资源网| 亚洲精品无码久久久久久| 中文字幕av资源一区| 久久国产精品久久久久久| 人人九九精品| 国内精品视频在线观看| 一区二区三区久久精品| 成人黄18免费网站| 成人av电影在线网| 日韩a级黄色片| 玛丽玛丽电影原版免费观看1977| 成人在线播放免费观看| 自拍偷拍一区二区三区| 国产在线播放观看| 国产专区在线播放| 美女主播精品视频一二三四| 午夜亚洲成人| 精品久久久久久久| 国产精品女主播av| 日本一区二区在线视频观看| 视频二区在线| 水蜜桃久久夜色精品一区的特点| 日韩午夜激情视频| 91精品国产高久久久久久五月天| 色婷婷亚洲mv天堂mv在影片| 亚洲一级二级在线| 91欧美视频在线| 欧美视频二区| 亚洲欧美日韩中文在线| 国产精品一区视频网站| 三级无遮挡在线观看| 亚洲作爱视频| 亚洲japanese制服美女| 中文字幕不卡在线| 一区二区三区成人| 免费在线黄网站| 日本韩国欧美精品大片卡二| 男同在线观看| 国产亚洲亚洲| 亚洲精品久久久久久下一站 | 欧美麻豆精品久久久久久| 97在线视频观看| 2019中文字幕在线视频| 五月天一区二区| 网站一区二区三区| a在线播放不卡| 欧美一区二区三区婷婷月色| a视频网址在线观看| 午夜精品爽啪视频| 日本在线视频站| 久久不射电影网| 加勒比色老久久爱综合网| 国内精品模特av私拍在线观看| 成人精品毛片| 欧美高跟鞋交xxxxxhd| 国产一区二区欧美| 成人午夜av在线| 玖玖玖精品中文字幕| 日韩精品欧美成人高清一区二区| 国产成人精品电影久久久| 久久中文字幕二区| 国产日本欧美在线| 国产精品久久久久久久久动漫| 日本免费中文字幕在线| 日韩av一区在线| 国产欧美日韩视频在线| 杨幂一区欧美专区| 欧洲一区在线观看| 亚洲成人三级| 欧美老女人性生活| 久久精品久久精品| 国产精品亚洲a| 亚洲国产婷婷香蕉久久久久久| eeuss影院www在线观看| 7m精品福利视频导航| 九九在线高清精品视频| 91猫先生在线| 欧美日韩一级片在线观看| 日本乱理伦在线| 国产精品日韩专区| 日韩专区欧美专区| 免费不卡av在线| 亚洲男人天堂手机在线| 另类av一区二区| 精彩国产在线| 97欧美精品一区二区三区| av男人天堂一区| heyzo在线播放| 亚洲视频欧美在线| 羞羞视频在线观看免费| 国产精品丝袜久久久久久高清| 国产精品嫩草影院av蜜臀| 亚洲1卡2卡3卡4卡乱码精品| 精品呦交小u女在线| 91精品国产乱码久久久久久| 欧美日韩美女在线| 一本色道久久综合亚洲精品不卡| 97在线观看免费观看高清| 久久久www免费人成黑人精品| 日韩欧美精品网站| 久久中文精品| 欧美成人免费全部网站| 中午字幕在线观看| 婷婷久久伊人| 午夜欧美不卡精品aaaaa| 欧美在线一区二区三区| 麻豆极品一区二区三区| 麻豆tv在线播放| 国产美女高潮在线观看| 伊人久久综合97精品| 国产欧美中文字幕| 国产乱码精品一品二品| av大片在线播放| 五月天亚洲综合小说网| 日韩在线观看网址| 国产精品美女一区二区三区 | 欧美毛片免费观看| 亚洲女人视频| 天堂av免费看| 国产精品久久久久99| 日韩欧美成人一区| 2020国产成人综合网| 亚洲有吗中文字幕| 国产成+人+综合+亚洲欧美| 偷偷要色偷偷| 欧美色图12p| 校花撩起jk露出白色内裤国产精品 | 日本va欧美va精品发布| 特级毛片在线| 天天色综合天天色| 青草成人免费视频| 在线观看日韩电影| 国产欧美综合在线观看第十页| 久久99国产精品麻豆| 影音先锋男人在线资源| 久久伊人一区二区| 图片区日韩欧美亚洲| 欧美黄色录像片| 99爱在线观看| 青青在线免费观看| 成人精品在线| 三级黄色的网站| 丁香婷婷综合激情五月色| 久久亚洲国产成人精品无码区| 不卡的av在线| 黑森林福利视频导航| 国产不卡av在线免费观看| 中文字幕一区日韩电影| 91精品国产乱码久久久久久久久 | 国语对白在线视频| 粉嫩av一区二区三区天美传媒| 日本不卡一区| 成人免费淫片视频软件| 中文字幕久精品免费视频| 欧美日韩成人高清| 一区二区在线观看视频在线观看| 日本欧美韩国一区三区| 狠狠做深爱婷婷综合一区| 日韩一区二区三区精品视频第3页 日韩一区二区三区精品 | 黄色片一级视频| av成人动漫| 99草草国产熟女视频在线| 五月天丁香久久| 日韩在线亚洲| 日韩欧美精品三级| 精品亚洲一区二区三区在线观看| 97超碰人人看人人 | 91网站在线观看免费| yy111111少妇影院日韩夜片| 久久综合五月天| 日韩最新中文字幕电影免费看| 日韩一区二区视频| 欧美日韩另类视频| 国产精品久久国产精麻豆99网站| 成人黄色大片在线观看| 麻豆视频观看网址久久| 国产在线精品不卡| 蜜桃精品视频在线观看| 久久男女视频| 免费观看久久久4p| 91精品国产视频| 日日夜夜精品视频天天综合网| 日韩精品电影一区亚洲| 国产一区二区电影| 99国产一区二区三精品乱码| www.亚洲人| 一区二区三区久久久| 在线免费视频一区二区| 日韩欧美你懂的| 九九热精品视频在线播放| 国产一区视频在线| 亚洲不卡1区| 久久99爱视频| 超碰国产在线| 久草在线中文最新视频| 国产麻豆久久| 成人资源在线| 日本网站在线观看一区二区三区 | 免费播放av| 亚洲欧美一区二区三区在线播放|