慢聊Golang協(xié)程池Ants實(shí)現(xiàn)原理
大家都知道goroutine 是 Go語(yǔ)言中的輕量級(jí)線程實(shí)現(xiàn),由 Go 運(yùn)行時(shí)(runtime)管理,Go 程序會(huì)智能地將 goroutine 中的任務(wù)合理地分配給每個(gè) CPU。創(chuàng)建一個(gè)goroutine大小大概在2k左右,可以說(shuō)非常的節(jié)省機(jī)器資源。
但是為什么要用池化的方式呢?機(jī)器資源總是有限的,如果創(chuàng)建了幾十萬(wàn)個(gè)goroutine,那么就消耗比較大了,在一些需要對(duì)并發(fā)資源進(jìn)行控制、提升性能、控制生命周期的場(chǎng)景中,還是需要用到協(xié)程池去處理。
今天就介紹在github用Go語(yǔ)言實(shí)現(xiàn)的有 11.5k?的 Ants 協(xié)程池庫(kù)的實(shí)現(xiàn)!
圖片
初識(shí)Ants
Ants介紹
Go的協(xié)程非常輕量,但是在超高并發(fā)場(chǎng)景,每個(gè)請(qǐng)求創(chuàng)建一個(gè)協(xié)程也是低效的,一個(gè)簡(jiǎn)單的思想就是協(xié)程池。
Ants實(shí)現(xiàn)了一個(gè)具有固定容量的goroutine池,管理和回收大量goroutine,允許開(kāi)發(fā)人員限制并發(fā)程序中的goroutines數(shù)量。
圖片
Github地址:https://github.com/panjf2000/ants
這是在github上的截圖,注意不同版本之間代碼實(shí)現(xiàn)會(huì)略有差異。
圖片
特性
??Ants具有如下特性:
- ? 自動(dòng)管理和回收大量goroutine
- ? 定期清除過(guò)期的goroutines
- ? 豐富的API:提交任務(wù),獲取運(yùn)行g(shù)oroutine的數(shù)量,動(dòng)態(tài)調(diào)整池的容量,釋放池,重新啟動(dòng)池
- ? 優(yōu)雅地處理死機(jī)以防止程序崩潰
- ? 高效的內(nèi)存使用,甚至比Golang中的無(wú)限goroutine實(shí)現(xiàn)了更高的性能
- ? 非阻塞機(jī)制
核心概念
- ? Pool :Ants協(xié)程池核心結(jié)構(gòu)
- ? WorkerArray:Pool池中的worker隊(duì)列,存放所有的Worker
- ? goWorker:運(yùn)行任務(wù)的實(shí)際執(zhí)行者,它啟動(dòng)一個(gè) goroutine 來(lái)接受任務(wù)并執(zhí)行函數(shù)調(diào)用
- ? sync.Pool:golang 標(biāo)準(zhǔn)庫(kù)下并發(fā)安全的對(duì)象池,緩存申請(qǐng)用于之后的重用,以減輕GC的壓力
- ? spinLock:基于CAS機(jī)制和指數(shù)退避算法實(shí)現(xiàn)的一種自旋鎖
運(yùn)行流程圖
Ants運(yùn)行流程圖如下:
圖片
前置知識(shí)
自旋鎖 spinLock
我們先了解下什么是自旋鎖!
加鎖的目的就是保證共享資源在任意時(shí)間里,只有一個(gè)線程訪問(wèn),而自旋鎖加鎖失敗后,線程會(huì)忙等待,直到它拿到鎖。
圖片
如果要實(shí)現(xiàn)鎖的話需要實(shí)現(xiàn)Go 標(biāo)準(zhǔn)庫(kù)sync的Locker接口
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}Ants的自旋鎖是基于CAS機(jī)制和指數(shù)退避算法實(shí)現(xiàn)的一種自旋鎖,主要利用了下面幾個(gè)關(guān)鍵的點(diǎn):
- ? sync.Locker接口
- ? 指數(shù)退避算法
- ? sync. atomic 原子包中的方法了解
- ? runtime.Gosched() 讓當(dāng)前goroutine讓出CPU時(shí)間片
?? Go語(yǔ)言中 sync/atomic包提供了底層的原子級(jí)內(nèi)存操作,可實(shí)用CAS 函數(shù)(Compare And Swap)
?? 指數(shù)退避算法以指數(shù)方式重試請(qǐng)求,請(qǐng)求失敗后重試間隔分別是 1、2、4 ...,2的n次方秒增加
我們看下具體實(shí)現(xiàn)代碼和添加的注釋?zhuān)?/p>
//實(shí)現(xiàn)Locker接口
type spinLock uint32
//最大回退次數(shù)
const maxBackoff = 16
// 加鎖
func (sl *spinLock) Lock() {
backoff := 1
//基于CAS機(jī)制,嘗試獲取鎖
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
//執(zhí)行backoff次 cpu讓出時(shí)間片次數(shù)
for i := 0; i < backoff; i++ {
//使當(dāng)前goroutine讓出CPU時(shí)間片
runtime.Gosched()
}
if backoff < maxBackoff {
//左移后賦值 等于 backoff = backoff << 1
//左移一位就是乘以 2的1次方
backoff <<= 1
}
}
}
//釋放鎖
func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}Gosched()使當(dāng)前goroutine程放棄處理器,以讓其它goroutine運(yùn)行,它不會(huì)掛起當(dāng)前goroutine,因此當(dāng)前goroutine未來(lái)會(huì)恢復(fù)執(zhí)行。
?? backoff <<= 1 這段代碼會(huì)有你知道什么意思嗎?
這是Go語(yǔ)言的位運(yùn)算符 << 表示左移n位就是乘以2的n次方, 而 <<= 表示左移后賦值。
??代碼中 backoff <<= 1 其實(shí)就是 backoff = backoff << 1,這是左移一位的結(jié)果就是 backoff = backoff * 2^1。
自旋鎖執(zhí)行 backoff 次讓 cpu 時(shí)間片動(dòng)作,次數(shù)分別是 1、2、4 ...,封頂16
Ants自旋鎖邏輯用圖表示如下:
圖片
核心數(shù)據(jù)結(jié)構(gòu)
這里簡(jiǎn)單介紹下三個(gè)核心的結(jié)構(gòu)體和屬性:
圖片
Pool結(jié)構(gòu)體
Pool就是協(xié)程池的實(shí)際結(jié)構(gòu),在下面代碼中已經(jīng)標(biāo)記了注釋。
type Pool struct {
// 協(xié)程池容量
capacity int32
// 當(dāng)前協(xié)程池中正在運(yùn)行的協(xié)程數(shù)
running int32
// ants 實(shí)現(xiàn)的自旋鎖,用于同步并發(fā)操作
lock sync.Locker
// 存放一組Worker
workers workerArray
// 協(xié)程池狀態(tài) (1-關(guān)閉、0-開(kāi)啟)
state int32
// 并發(fā)協(xié)調(diào)器,用于阻塞模式下,掛起和喚醒等待資源的協(xié)程
cond *sync.Cond
// worker 對(duì)象池
workerCache sync.Pool
// 等待的協(xié)程數(shù)量
waiting int32
// 回收協(xié)程是否關(guān)閉
heartbeatDone int32
// 閉回收協(xié)程的控制器函數(shù)
stopHeartbeat context.CancelFunc
// 協(xié)程池的配置
options *Options
}這里對(duì)幾個(gè)配置著重講一下:
workerCache :這是sync.Pool類(lèi)型,主要作用保存和復(fù)用臨時(shí)對(duì)象,減少內(nèi)存分配,降低 GC 壓力,在Ants中是為了緩存釋放的 Worker 資源
options:可配置化過(guò)期時(shí)間、是否支持預(yù)分配、最大阻塞數(shù)量、panic 處理、日志,這里是通過(guò)函數(shù)式選項(xiàng)模式進(jìn)行實(shí)現(xiàn)的
goWorker
goWorker 是運(yùn)行任務(wù)的實(shí)際執(zhí)行者,它啟動(dòng)一個(gè) goroutine 來(lái)接受任務(wù)并執(zhí)行函數(shù)調(diào)用,這個(gè)協(xié)程是一個(gè)長(zhǎng)期運(yùn)行不會(huì)被主動(dòng)回收的。
type goWorker struct {
//goWorker 所屬的協(xié)程池
pool *Pool
//接收實(shí)際執(zhí)行任務(wù)的管道
task chan func()
//goWorker 回收到協(xié)程池的時(shí)間
recycleTime time.Time
}WorkerArray
workerArray 是一個(gè)接口( interface),其實(shí)現(xiàn)包含 stack 棧版本和 queue 隊(duì)列兩種實(shí)現(xiàn)。
圖片
它定義了幾個(gè)通用和用于回收過(guò)期 goWorker 的 api
type workerArray interface {
// worker 列表長(zhǎng)度
len() int
// 是否為空
isEmpty() bool
// 插入一個(gè)goworker
insert(worker *goWorker) error
// 從WorkerArray獲取可用的goworker
detach() *goWorker
// 清理pool.workers中的過(guò)期goworker
retrieveExpiry(duration time.Duration) []*goWorker
// 重置,清空WorkerArray中所有的goWorker
reset()
}核心方法
這是核心實(shí)現(xiàn)代碼的走讀部分,基本上都有進(jìn)行了注釋?zhuān)雌饋?lái)可能會(huì)有點(diǎn)不怎么理解,多看兩遍就好,相信我 ????!
創(chuàng)建Pool
創(chuàng)建Pool其實(shí)就是New一個(gè)Pool實(shí)例,對(duì)Pool中結(jié)構(gòu)體的屬性進(jìn)行初始化、加載一些配置,這種方式很常見(jiàn),大家可以注意觀察積累。
圖片
代碼實(shí)現(xiàn)和注釋如下:
func NewPool(size int, options ...Option) (*Pool, error) {
//讀取一些自定義的配置
opts := loadOptions(options...)
...
// 創(chuàng)建 Pool 對(duì)象
p := &Pool{
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
// 指定 sync.Pool 創(chuàng)建 worker 的方法
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
// 初始化Pool時(shí)是否進(jìn)行內(nèi)存預(yù)分配
// 區(qū)分workerArray 的實(shí)現(xiàn)方式
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
// 預(yù)先分配固定 Size 的池子
p.workers = newWorkerArray(loopQueueType, size)
} else {
// 初始化不創(chuàng)建,運(yùn)行時(shí)再創(chuàng)建
p.workers = newWorkerArray(stackType, 0)
}
p.cond = sync.NewCond(p.lock)
// 開(kāi)啟一個(gè)goroutine清理過(guò)期的 worker
go p.purgePeriodically()
return p, nil
}workerChanCap:確定工作程序的通道是否應(yīng)為緩沖通道,當(dāng)獲取給GOMAXPROCS設(shè)置的值等于1時(shí)表示單核執(zhí)行,此時(shí)的通道是無(wú)緩沖通道,否則是有緩沖通道,且容量是1。
這里講的是默認(rèn)未進(jìn)行預(yù)分配,采用 workerStack 棧實(shí)現(xiàn)workerArray的初始化。
清理過(guò)期goWorker
在初始化好Pool結(jié)構(gòu)屬性后,會(huì)開(kāi)啟一個(gè)goroutine清理過(guò)期的 worker。
??怎么判定goroutine是過(guò)期的?
Ants過(guò)期的定義是:每個(gè) goWorker的 recycleTime 加上用戶(hù)配置的過(guò)期時(shí)間 Pool.options.ExpiryDuration 小于 time.Now() 時(shí)即認(rèn)為該協(xié)程已過(guò)期。
我們看下具體流程
func (p *Pool) purgePeriodically(ctx context.Context) {
// ExpiryDuration 默認(rèn)是1s
heartbeat := time.NewTicker(p.options.ExpiryDuration)
...
for {
select {
case <-heartbeat.C:
case <-ctx.Done():
return
}
// pool關(guān)閉
if p.IsClosed() {
break
}
// 從 workers 中獲取過(guò)期的 worker
p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock()
// 清理過(guò)期的worker
for i := range expiredWorkers {
expiredWorkers[i].task <- nil
expiredWorkers[i] = nil
}
// 喚醒所有等待的線程
if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
p.cond.Broadcast()
}
}
}??清理流程如下:
- 1. 取出過(guò)期的goWorker
- 2. 通知 goWorker 退出,方式是向過(guò)期 goWorker 的 task channel 發(fā)送一個(gè) nil
- 3. 接收值為 nil 的任務(wù)后goWorker會(huì)退出
- 4. 所有工作程序都已清理完畢,可能這時(shí)還有 goroutine 阻塞在cond.Wait上,會(huì)調(diào)用 p.cond.Broadcast() 喚醒這些 goroutine
Submit任務(wù)提交
在初始化完成Pool之后,就需要往池中提交帶執(zhí)行任務(wù)了,Pool提供了 Submit 方法,提供外部發(fā)起提交任務(wù)的接口。
func (p *Pool) Submit(task func()) error {
// pool是否關(guān)閉
if p.IsClosed() {
return ErrPoolClosed
}
var w *goWorker
// 嘗試獲取空閑的goWorker
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
}
// 發(fā)送到 goWorker的channel中
w.task <- task
return nil
}獲取可用goWork
Submit方法內(nèi)部調(diào)用 pool.retrieveWorker 方法并嘗試獲取一個(gè)空閑的 goWorker,如果獲取成功會(huì)將任務(wù)發(fā)送到goWorker的channel類(lèi)型task中。
func (p *Pool) retrieveWorker() (w *goWorker) {
//創(chuàng)建一個(gè)新的goWorker,并執(zhí)行
spawnWorker := func() {
//實(shí)例化 worker
w = p.workerCache.Get().(*goWorker)
// 運(yùn)行
w.run()
}
// 加鎖
p.lock.Lock()
// 從workers 中取出一個(gè) goWorker
// workerStack 實(shí)現(xiàn)了p.workers的方法
w = p.workers.detach()
if w != nil {
p.lock.Unlock()
// Pool容量大于正在工作的 goWorker 數(shù)量)
//則調(diào)用 spawnWorker() 新建一個(gè) goWorker
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
spawnWorker()
} else {
// options設(shè)置了非阻塞選項(xiàng),直接返回 nil
if p.options.Nonblocking {
p.lock.Unlock()
return
}
retry:
//option設(shè)置了最大阻塞隊(duì)列,當(dāng)前阻塞等待的任務(wù)數(shù)量已經(jīng)達(dá)設(shè)置上限,直接返回 nil
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
...
var nw int
//如果正在執(zhí)行的worker數(shù)量為0時(shí),則重新創(chuàng)建woker
if nw = p.Running(); nw == 0 {
p.lock.Unlock()
spawnWorker()
return
}
//p.workers中獲取可用的worker
//執(zhí)行開(kāi)頭創(chuàng)建的spawnWorker
if w = p.workers.detach(); w == nil {
if nw < p.Cap() {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
p.lock.Unlock()
}
return
}??看完注釋后理一理retrieveWorker的執(zhí)行邏輯:
- 1. 聲明一個(gè)spawnWorker,從對(duì)象池 workerCache 中獲取 goWorker
- 2. 嘗試從 workers 中取出可用的 goWorker
- 3. 如未達(dá)到協(xié)程池的容量限制,獲取并啟動(dòng) spawnWorker(goWorker)
- 4. 如何用戶(hù)設(shè)置了非阻塞選項(xiàng),直接返回空的goWorker
- 5. 如果正在執(zhí)行的goWorker 的數(shù)量等于0,調(diào)用 spawnWorker()
- 6. 未獲取到goWorker,并且Pool容量未滿(mǎn),同樣調(diào)用 spawnWorker()
?? spawnWorker() 是一個(gè)創(chuàng)建和運(yùn)行g(shù)oWorker的函數(shù),為后面獲取不到goWorker時(shí)先進(jìn)行預(yù)創(chuàng)建goWorker
任務(wù)執(zhí)行
任務(wù)執(zhí)行就是開(kāi)啟了一個(gè)協(xié)程,然后執(zhí)行g(shù)oWorker中channel的任務(wù)task。
func (w *goWorker) run() {
// pool的running 加 一
w.pool.addRunning(1)
go func() {
defer func() {
...
if p := recover(); p != nil {
//處理捕獲的panic
}
w.pool.cond.Signal()
}()
//任務(wù)執(zhí)行
for f := range w.task {
if f == nil {
return
}
f()
//執(zhí)行完后回收worker
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
}goWorker放回pool
我們知道實(shí)際用戶(hù)的任務(wù)是綁定在goWorker上的, 在執(zhí)行完任務(wù)之后Ants,會(huì)將該goWorker放回到workers結(jié)構(gòu)的items數(shù)組中(協(xié)程池)。
func (p *Pool) revertWorker(worker *goWorker) bool {
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
p.cond.Broadcast()
return false
}
// 重置空閑計(jì)時(shí)器,用于判定過(guò)期
worker.recycleTime = p.nowTime()
p.lock.Lock()
...
// 調(diào)用works的insert方法放回Pool
err := p.workers.insert(worker)
if err != nil {
p.lock.Unlock()
return false
}
// p.cond.Signal() 喚醒一個(gè)可能等待的線程
p.cond.Signal()
p.lock.Unlock()
return true
}































