国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

Go語言的并發(fā)與WorkerPool

開發(fā) 后端
Golang 的并發(fā)模型非常強(qiáng)大,稱為 CSP(通信順序進(jìn)程),它將一個(gè)問題分解成更小的順序進(jìn)程,然后調(diào)度這些進(jìn)程的實(shí)例(稱為 Goroutine)。這些進(jìn)程通過 channel 傳遞信息實(shí)現(xiàn)通信。

[[414288]]

本文轉(zhuǎn)載自微信公眾號(hào)「Golang來啦」,作者Seekload。轉(zhuǎn)載本文請(qǐng)聯(lián)系Golang來啦公眾號(hào)。

四哥水平有限,如有翻譯或理解錯(cuò)誤,煩請(qǐng)幫忙指出,感謝!

昨天分享關(guān)于 workerPool 的文章,有同學(xué)在后臺(tái)說,昨天的 Demo 恰好符合項(xiàng)目的業(yè)務(wù)場景,真的非常棒!

所以今天就再來分享一篇 。

原文如下:

現(xiàn)代編程語言中,并發(fā)已經(jīng)成為必不可少的特性。現(xiàn)在絕大多數(shù)編程語言都有一些方法實(shí)現(xiàn)并發(fā)。

其中一些實(shí)現(xiàn)方式非常強(qiáng)大,能將負(fù)載轉(zhuǎn)移到不同的系統(tǒng)線程,比如 Java 等;一些則在同一線程上模擬這種行為,比如 Ruby 等。

Golang 的并發(fā)模型非常強(qiáng)大,稱為 CSP(通信順序進(jìn)程),它將一個(gè)問題分解成更小的順序進(jìn)程,然后調(diào)度這些進(jìn)程的實(shí)例(稱為 Goroutine)。這些進(jìn)程通過 channel 傳遞信息實(shí)現(xiàn)通信。

本文,我們將探討如何利用 golang 的并發(fā)性,以及如何在 workerPool 使用。系列文章的第二篇,我們將探討如何構(gòu)建一個(gè)強(qiáng)大的并發(fā)解決方案。

一個(gè)簡單的例子

假設(shè)我們需要調(diào)用一個(gè)外部 API 接口,整個(gè)過程需要花費(fèi) 100ms。如果我們需要同步地調(diào)用該接口 1000 次,則需要花費(fèi) 100s。

  1. //// model/data.go 
  2.  
  3. package model 
  4.  
  5. type SimpleData struct { 
  6.  ID int 
  7.  
  8. //// basic/basic.go 
  9.  
  10. package basic 
  11.  
  12. import ( 
  13.  "fmt" 
  14.  "github.com/Joker666/goworkerpool/model" 
  15.  "time" 
  16.  
  17. func Work(allData []model.SimpleData) { 
  18.  start := time.Now() 
  19.  for i, _ := range allData { 
  20.   Process(allData[i]) 
  21.  } 
  22.  elapsed := time.Since(start) 
  23.  fmt.Printf("Took ===============> %s\n", elapsed) 
  24.  
  25. func Process(data model.SimpleData) { 
  26.  fmt.Printf("Start processing %d\n", data.ID) 
  27.  time.Sleep(100 * time.Millisecond) 
  28.  fmt.Printf("Finish processing %d\n", data.ID) 
  29.  
  30. //// main.go 
  31.  
  32. package main 
  33.  
  34. import ( 
  35.  "fmt" 
  36.  "github.com/Joker666/goworkerpool/basic" 
  37.  "github.com/Joker666/goworkerpool/model" 
  38.  "github.com/Joker666/goworkerpool/worker" 
  39.  
  40. func main() { 
  41.  // Prepare the data 
  42.  var allData []model.SimpleData 
  43.  for i := 0; i < 1000; i++ { 
  44.   data := model.SimpleData{ ID: i } 
  45.   allData = append(allData, data) 
  46.  } 
  47.  fmt.Printf("Start processing all work \n"
  48.  
  49.  // Process 
  50.  basic.Work(allData) 
  1. Start processing all work 
  2. Took ===============> 1m40.226679665s 

上面的代碼創(chuàng)建了 model 包,包里包含一個(gè)結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體只有一個(gè) int 類型的成員。我們同步地處理 data,這顯然不是最佳方案,因?yàn)榭梢圆l(fā)處理這些任務(wù)。我們換一種方案,使用 goroutine 和 channel 來處理。

異步

  1. //// worker/notPooled.go 
  2.  
  3. func NotPooledWork(allData []model.SimpleData) { 
  4.  start := time.Now() 
  5.  var wg sync.WaitGroup 
  6.  
  7.  dataCh := make(chan model.SimpleData, 100) 
  8.  
  9.  wg.Add(1) 
  10.  go func() { 
  11.   defer wg.Done() 
  12.   for data := range dataCh { 
  13.    wg.Add(1) 
  14.    go func(data model.SimpleData) { 
  15.     defer wg.Done() 
  16.     basic.Process(data) 
  17.    }(data) 
  18.   } 
  19.  }() 
  20.  
  21.  for i, _ := range allData { 
  22.   dataCh <- allData[i] 
  23.  } 
  24.  
  25.  close(dataCh) 
  26.  wg.Wait() 
  27.  elapsed := time.Since(start) 
  28.  fmt.Printf("Took ===============> %s\n", elapsed) 
  29.  
  30. //// main.go 
  31.  
  32. // Process 
  33. worker.NotPooledWork(allData) 
  1. Start processing all work 
  2. Took ===============> 101.191534ms 

上面的代碼,我們創(chuàng)建了容量 100 的緩存 channel,并通過 NoPooledWork() 將數(shù)據(jù) push 到 channel 里。channel 長度滿 100 之后,我們是無法再向其中添加元素直到有元素被讀取走。使用 for range 讀取 channel,并生成 goroutine 處理。這里我們沒有限制生成 goroutine 的數(shù)量,這可以盡可能多地處理任務(wù)。從理論上來講,在給定所需資源的情況下,可以處理盡可能多的數(shù)據(jù)。執(zhí)行代碼,完成 1000 個(gè)任務(wù)只花費(fèi)了 100ms。很瘋狂吧!不全是,接著往下看。

問題

除非我們擁有地球上所有的資源,否則在特定時(shí)間內(nèi)能夠分配的資源是有限的。一個(gè) goroutine 占用的最小內(nèi)存是 2k,但也能達(dá)到 1G。上述并發(fā)執(zhí)行所有任務(wù)的解決方案中,假設(shè)有一百萬個(gè)任務(wù),就會(huì)很快耗盡機(jī)器的內(nèi)存和 CPU。我們要么升級(jí)機(jī)器的配置,要么就尋找其他更好的解決方案。

計(jì)算機(jī)科學(xué)家很久之前就考慮過這個(gè)問題,并提出了出色的解決方案 - 使用 Thread Pool 或者 Worker Pool。這個(gè)方案是使用 worker 數(shù)量受限的工作池來處理任務(wù),workers 會(huì)按順序一個(gè)接一個(gè)處理任務(wù),這樣就避免了 CPU 和內(nèi)存使用急速增長。

解決方案:Worker Pool

我們通過實(shí)現(xiàn) worker pool 來修復(fù)之前遇到的問題。

  1. //// worker/pooled.go 
  2.  
  3. func PooledWork(allData []model.SimpleData) { 
  4.  start := time.Now() 
  5.  var wg sync.WaitGroup 
  6.  workerPoolSize := 100 
  7.  
  8.  dataCh := make(chan model.SimpleData, workerPoolSize) 
  9.  
  10.  for i := 0; i < workerPoolSize; i++ { 
  11.   wg.Add(1) 
  12.   go func() { 
  13.    defer wg.Done() 
  14.  
  15.    for data := range dataCh { 
  16.     basic.Process(data) 
  17.    } 
  18.   }() 
  19.  } 
  20.  
  21.  for i, _ := range allData { 
  22.   dataCh <- allData[i] 
  23.  } 
  24.  
  25.  close(dataCh) 
  26.  wg.Wait() 
  27.  elapsed := time.Since(start) 
  28.  fmt.Printf("Took ===============> %s\n", elapsed) 
  29.  
  30. //// main.go 
  31.  
  32. // Process 
  33. worker.PooledWork(allData) 
  1. Start processing all work 
  2. Took ===============> 1.002972449s 

上面的代碼,worker 數(shù)量限制在 100,我們創(chuàng)建了相應(yīng)數(shù)量的 goroutine 來處理任務(wù)。我們可以把 channel 看作是隊(duì)列,worker goroutine 看作是消費(fèi)者。多個(gè) goroutine 可以監(jiān)聽同一個(gè) channel,但是 channel 里的每一個(gè)元素只會(huì)被處理一次。

Go 語言的 channel 可以當(dāng)作隊(duì)列使用。

這是一個(gè)比較好的解決方案,執(zhí)行代碼,我們看到完成所有任務(wù)花費(fèi) 1s。雖然沒有 100ms 這么快,但已經(jīng)能滿足業(yè)務(wù)需要,而且我們得到了一個(gè)更好的解決方案,能將負(fù)載均攤在不同的時(shí)間片上。

處理錯(cuò)誤

我們能做的還沒完。上面看起來是一個(gè)完整的解決方案,但卻不是的,我們沒有處理錯(cuò)誤情況。所以需要模擬出錯(cuò)的情形,并且看下我們需要怎么處理。

  1. //// worker/pooledError.go 
  2.  
  3. func PooledWorkError(allData []model.SimpleData) { 
  4.  start := time.Now() 
  5.  var wg sync.WaitGroup 
  6.  workerPoolSize := 100 
  7.  
  8.  dataCh := make(chan model.SimpleData, workerPoolSize) 
  9.  errors := make(chan error, 1000) 
  10.  
  11.  for i := 0; i < workerPoolSize; i++ { 
  12.   wg.Add(1) 
  13.   go func() { 
  14.    defer wg.Done() 
  15.  
  16.    for data := range dataCh { 
  17.     process(data, errors) 
  18.    } 
  19.   }() 
  20.  } 
  21.  
  22.  for i, _ := range allData { 
  23.   dataCh <- allData[i] 
  24.  } 
  25.  
  26.  close(dataCh) 
  27.  
  28.  wg.Add(1) 
  29.  go func() { 
  30.   defer wg.Done() 
  31.   for { 
  32.    select { 
  33.    case err := <-errors: 
  34.     fmt.Println("finished with error:", err.Error()) 
  35.    case <-time.After(time.Second * 1): 
  36.     fmt.Println("Timeout: errors finished"
  37.     return 
  38.    } 
  39.   } 
  40.  }() 
  41.  
  42.  defer close(errors) 
  43.  wg.Wait() 
  44.  elapsed := time.Since(start) 
  45.  fmt.Printf("Took ===============> %s\n", elapsed) 
  46.  
  47. func process(data model.SimpleData, errors chan<- error) { 
  48.  fmt.Printf("Start processing %d\n", data.ID) 
  49.  time.Sleep(100 * time.Millisecond) 
  50.  if data.ID % 29 == 0 { 
  51.   errors <- fmt.Errorf("error on job %v", data.ID) 
  52.  } else { 
  53.   fmt.Printf("Finish processing %d\n", data.ID) 
  54.  } 
  55.  
  56. //// main.go 
  57.  
  58. // Process 
  59. worker.PooledWorkError(allData) 

我們修改了 process() 函數(shù),處理一些隨機(jī)的錯(cuò)誤并將錯(cuò)誤 push 到 errors chnanel 里。所以,為了處理并發(fā)出現(xiàn)的錯(cuò)誤,我們可以使用 errors channel 保存錯(cuò)誤數(shù)據(jù)。在所有任務(wù)處理完成之后,可以檢查錯(cuò)誤 channel 是否有數(shù)據(jù)。錯(cuò)誤 channel 里的元素保存了任務(wù) ID,方便需要的時(shí)候再處理這些任務(wù)。

比之前沒處理錯(cuò)誤,很明顯這是一個(gè)更好的解決方案。但我們還可以做得更好,

我們將在下篇文章討論如何編寫一個(gè)強(qiáng)大的 worker pool 包,并且在 worker 數(shù)量受限的情況下處理并發(fā)任務(wù)。

總結(jié)

Go 語言的并發(fā)模型足夠強(qiáng)大給力,只需要構(gòu)建一個(gè) worker pool 就能很好地解決問題而無需做太多工作,這就是它沒有包含在標(biāo)準(zhǔn)庫中的原因。但是,我們自己可以構(gòu)建一個(gè)滿足自身需求的方案。很快,我會(huì)在下一篇文章中講到,敬請(qǐng)期待!

點(diǎn)擊【閱讀原文】直達(dá)代碼倉庫[1]。

參考資料

[1]代碼倉庫: https://github.com/Joker666/goworkerpool?ref=hackernoon.com

via:https://hackernoon.com/concurrency-in-golang-and-workerpool-part-1-e9n31ao

作者:Hasan

 

責(zé)任編輯:武曉燕 來源: Golang來啦
相關(guān)推薦

2013-05-28 09:43:38

GoGo語言并發(fā)模式

2021-07-15 23:18:48

Go語言并發(fā)

2023-12-21 07:09:32

Go語言任務(wù)

2025-11-17 01:41:00

2024-08-12 11:32:12

Go語言程序

2023-02-10 09:40:36

Go語言并發(fā)

2023-05-15 08:01:16

Go語言

2023-01-30 15:41:10

Channel控制并發(fā)

2022-04-06 08:19:13

Go語言切片

2021-06-24 06:35:00

Go語言進(jìn)程

2021-04-07 09:02:49

Go 語言變量與常量

2021-04-13 07:58:42

Go語言函數(shù)

2014-04-09 09:32:24

Go并發(fā)

2021-09-30 09:21:28

Go語言并發(fā)編程

2025-03-24 00:25:00

Go語言并發(fā)編程

2022-03-04 10:07:45

Go語言字節(jié)池

2021-04-20 09:00:48

Go 語言結(jié)構(gòu)體type

2024-07-01 08:44:42

Go語言協(xié)程

2021-07-29 07:55:19

Demo 工作池

2024-04-07 00:04:00

Go語言Map
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

欧美另类极品| 国产一区调教| 最新97超碰在线| 中国黄色片免费看| 亚洲制服欧美久久| 91文字幕巨乱亚洲香蕉| 国色天香2019中文字幕在线观看| 亚洲电影免费观看高清完整版在线观看| 精品国产91乱高清在线观看| 91亚洲精品久久久蜜桃| 美女一区二区久久| 在线观看视频免费一区二区三区| 免费成人结看片| 日韩高清一区| 岛国一区二区| 日韩欧美一中文字暮专区| 男人的天堂在线视频免费观看 | 欧美一区免费视频| 91免费人成网站在线观看18| 欧美性资源免费| 九九精品在线播放| 久久久成人的性感天堂| 亚洲欧洲一区二区三区久久| 欧美精品一区二区久久久| 在线播放国产精品二区一二区四区| 欧美性xxxxx| 精品久久久久久久久中文字幕| 一区二区三区四区在线| 亚洲女人的天堂| 亚洲三级电影网站| 国产精品国产成人国产三级| 国产精品丝袜久久久久久app| 久久久久久久电影| www欧美成人18+| 久久亚洲一区二区三区明星换脸| 不卡电影免费在线播放一区| 大尺度一区二区| 国产精品亚洲专一区二区三区| 国产综合色视频| 国产乱对白刺激视频不卡| 国产一区二区三区久久久| 国产精品一区不卡| 国产白丝精品91爽爽久久| 国产成人精品亚洲777人妖| 国产综合一区二区| 国产丶欧美丶日本不卡视频| 国产69精品久久99不卡| bt欧美亚洲午夜电影天堂| 不卡视频在线看| 99国产麻豆精品| 久久天天做天天爱综合色| 国产拍揄自揄精品视频麻豆| 欧美激情综合五月色丁香| 欧美国产日本视频| 亚洲视频你懂的| 五月天久久比比资源色| 色又黄又爽网站www久久| 欧美日产国产精品| 精品美女在线播放| 日韩二区三区在线| 日韩二区三区在线| 中文字幕日韩在线观看| 日韩资源在线观看| 欧美激情乱人伦| 国产成人在线精品| 91黄在线观看| 日韩精品欧美专区| 中文字幕の友人北条麻妃| 国产v片免费观看| 国产一区二区三区精彩视频| 色总=综合色| 天堂在线视频中文网| 在线影院福利| 永久av在线| 成人免费短视频| 136导航精品福利| 欧美一级精品片在线看| 黄色在线成人| 久久精品国产精品青草| 92精品国产成人观看免费| 中文字幕亚洲不卡| 91国偷自产一区二区开放时间| 日韩三级免费观看| 综合久久五月天| 欧美一区亚洲一区| 国产高清自拍99| 亚洲永久激情精品| av五月天在线| 神马亚洲视频| 超碰高清在线| 亚洲视频三区| 亚洲一区二区三区| 免费在线观看成人| 中文字幕精品三区| 欧美日韩国产中文精品字幕自在自线| 日韩久久精品一区| 久久精品一区中文字幕| 国产一区视频在线播放| 日本不卡久久| 亚洲熟妇av一区二区三区| 中文在线视频| 国产乱码午夜在线视频| 卡通动漫国产精品| 亚洲精品社区| 不卡欧美aaaaa| 精品久久久久久久久久久久久| 亚洲国产91精品在线观看| 久久久久久尹人网香蕉| 国产精品久久7| 亚洲人成无码网站久久99热国产 | 亚洲视频你懂的| 欧美日韩一区中文字幕| 亚洲小视频在线观看| 国产大片精品免费永久看nba| 久久亚洲一区二区| 精品久久久久久久无码| 欧美精品少妇| 男人天堂久久| 伊人久久大香线蕉综合四虎小说 | 欧美三区四区| 精品大片一区二区| 日韩不卡免费视频| 中文字幕一区二区三区四区| 欧美群妇大交群的观看方式| 久久激情视频久久| 国产成人亚洲欧美| 日本福利视频在线| 三级理论午夜在线观看| 深夜视频一区二区| 亚洲破处大片| 2欧美一区二区三区在线观看视频| 欧美性xxxx18| 国产黄色在线观看| 午夜精品国产精品大乳美女| 国产一区二区三区久久久| www.-级毛片线天内射视视| 亚洲欧美经典视频| 97最新国自产拍视频在线完整在线看| 精品乱人伦一区二区三区| 亚洲欧洲美洲国产香蕉| 久久久久一区二区| 久久久精品中文字幕麻豆发布| 在线免费观看污| 成人网在线视频| 国产精品福利一区二区| 宅男在线观看免费高清网站| 91极品女神在线| 国内精品视频一区二区三区八戒| 在线影视一区| 日韩欧美国产午夜精品| 一区二区电影在线观看| 成人毛片免费在线观看| 国产一区二区三区日韩欧美| 亚洲裸体俱乐部裸体舞表演av| 天天久久人人| 亚洲男人都懂的| а√天堂资源地址在线下载| 亚洲激情在线视频| 狠狠操一区二区三区| 亚洲精品网站在线播放gif| 2001个疯子在线观看| 最新的欧美黄色| 国产一区二区三区免费在线 | 久久综合九色综合久久久精品综合| 亚洲女优视频| 欧美猛男性生活免费| 99精品免费网| 日韩欧美亚洲系列| 国产激情久久久| 2020国产精品| 国产网红在线观看| 国产日韩欧美综合| 亚洲精品中文字幕| 一级做a爱片久久| 超碰在线播放91| 精品捆绑美女sm三区 | 日本丶国产丶欧美色综合| 青青久久av| 亚洲 欧美 日韩系列| 午夜精品久久久久久99热| 久久精品一区四区| 大陆精大陆国产国语精品| 99re6在线视频| 日本精品一区二区三区在线播放视频 | 亚洲四虎av| 91av在线影院| 亚洲欧美日韩综合aⅴ视频| 欧美黑人巨大videos精品| 啊啊啊一区二区| 日韩精品在线免费观看视频| 日韩亚洲国产欧美| 色播在线观看| 久久九九国产精品怡红院 | 亚洲熟妇无码另类久久久| 777午夜精品视频在线播放| 国产欧美欧美| a√中文在线观看| 欧美牲交a欧美牲交aⅴ免费真| 国产综合福利在线| 欧美中文字幕久久|