雙十一期間Kafka以這種方式丟消息讓我猝不及防
講真,我今年的雙十一有點(diǎn)“背”,負(fù)責(zé)的Kafka集群出了一些幺蛾子,但正是這些幺蛾子,讓我這個(gè)雙十一過(guò)的非常充實(shí),也讓我意識(shí)到如果不體系化學(xué)習(xí)Kafka,是無(wú)法做到生產(chǎn)集群及時(shí)預(yù)警,將故障扼殺在搖籃中,因此也下定決心研讀Kafka的內(nèi)核。
本文就先來(lái)分享一個(gè)讓我始料未及的故障:Kafka生產(chǎn)環(huán)境大面積丟失消息。
首先要闡述的是消息丟失并不是因?yàn)閿嚯?,而且集群的副本?shù)量為3,消息發(fā)送端設(shè)置的acks=-1(all)。
這樣嚴(yán)苛的設(shè)置,那為什么還會(huì)出現(xiàn)消息丟失呢?請(qǐng)聽筆者慢慢道來(lái)。
1、故障現(xiàn)象
故障發(fā)生時(shí),接到多個(gè)項(xiàng)目組反饋說(shuō)消費(fèi)組的位點(diǎn)被重置到幾天前了,截圖如下:
從上面的消費(fèi)組延遲監(jiān)控曲線上來(lái)看,一瞬間積壓數(shù)從零直接飆升,初步懷疑是位點(diǎn)被重置了。
那位點(diǎn)為什么會(huì)被重置呢?
什么?你這篇文章不是說(shuō)要講Kafka為什么會(huì)丟消息嗎?怎么你又扯說(shuō)消費(fèi)組位點(diǎn)被重置呢?標(biāo)題黨!!!
NO、NO、NO,各位看官,絕對(duì)不是文不對(duì)題,請(qǐng)帶著這個(gè)疑問(wèn),與我共同探究吧。
2、問(wèn)題分析
遇到問(wèn)題,莫慌,講道理,基于MQ的應(yīng)用,消費(fèi)端一般都會(huì)實(shí)現(xiàn)冪等,也就是消息可以重復(fù)被處理,并且不會(huì)影響業(yè)務(wù),故解決的方式就是請(qǐng)項(xiàng)目組先評(píng)估一下,先人工將位點(diǎn)設(shè)置到出現(xiàn)問(wèn)題的前30分鐘左右,快速止血。
一波操作猛如虎,接下來(lái)就得好好分析問(wèn)題產(chǎn)生的原因。
通過(guò)查看當(dāng)時(shí)Kafka服務(wù)端的日志(server.log),可以看到如下日志:
上面的日志被修改的“面目全非”,其關(guān)鍵日志如下:
- Member consumer-1-XX in group consumerGroupName has failed, removing it from the group
- Preparing to rebalance group XXXX on heartbeat expiration
上面的日志指向性非常明顯:由于心跳檢測(cè)過(guò)期,消費(fèi)組協(xié)調(diào)器將消費(fèi)者從消費(fèi)組中移除,重而觸發(fā)重平衡。
消費(fèi)組重平衡:當(dāng)主題分區(qū)數(shù)量或消費(fèi)者數(shù)量發(fā)生變化后,消費(fèi)者之間需要對(duì)分區(qū)進(jìn)行重新分配,實(shí)現(xiàn)消費(fèi)端端負(fù)載均衡。
消息消費(fèi)者在重平衡期間消費(fèi)會(huì)全部暫停,當(dāng)消費(fèi)者重新完成分區(qū)的負(fù)載均衡后,繼續(xù)從服務(wù)端拉起消息,此時(shí)消費(fèi)端并不知道從哪個(gè)位置開始,故需要從服務(wù)端查詢位點(diǎn),使得消費(fèi)者能從上次消費(fèi)的位點(diǎn)繼續(xù)消費(fèi)。
現(xiàn)在出現(xiàn)消費(fèi)位點(diǎn)被重置到最早位點(diǎn),可以理解為位點(diǎn)丟失?那為什么會(huì)丟失位點(diǎn)呢?
無(wú)外乎如下兩個(gè)原因:
- 服務(wù)端丟失位點(diǎn),導(dǎo)致客戶端無(wú)法查詢到位點(diǎn)
- 客戶端主動(dòng)向服務(wù)端提交了-1,導(dǎo)致位點(diǎn)丟失
目前我們公司使用的Kafka版本為2.2.x,消費(fèi)組的位點(diǎn)是存儲(chǔ)在一個(gè)系統(tǒng)主題(__consumer_offsets)中,無(wú)論是服務(wù)器級(jí)別還是Topic級(jí)別,參數(shù)unclean.leader.election.enable都是設(shè)置為false,表示只有ISR集合中的副本才能參與Leader選舉,這樣就能嚴(yán)格保證位點(diǎn)消息并不會(huì)丟失或回到歷史某一個(gè)位點(diǎn)。
查看客戶端提交位點(diǎn)的API,發(fā)現(xiàn)用于封裝客戶端位點(diǎn)的實(shí)體類會(huì)對(duì)位點(diǎn)進(jìn)行校驗(yàn),代碼截圖如下:
如果傳入的位點(diǎn)為-1,直接會(huì)拋出異常,故客戶端并沒有機(jī)會(huì)向服務(wù)端提交-1的位點(diǎn),那位點(diǎn)為什么會(huì)丟失呢?
為了進(jìn)一步探究,我們不得不將目光投向消費(fèi)組在初次時(shí)是如何獲取位點(diǎn),從源碼的角度去分析,從而尋找關(guān)鍵日志,并對(duì)日志文件進(jìn)行對(duì)照,嘗試得到問(wèn)題的解。
2.1 客戶端位點(diǎn)查找機(jī)制
為了探究客戶端的位點(diǎn)獲取機(jī)制,筆者詳細(xì)閱讀了消費(fèi)者在啟動(dòng)時(shí)的流程,具體入口為KafkaConsumer的poll方法,其詳細(xì)流程圖如下所示:
上述的核心要點(diǎn)說(shuō)明如下:
- 在消費(fèi)者(KafkaConsumer)的poll方法消息時(shí)會(huì)調(diào)用updateAssignmentMetadataIfNeeded方法,該方法主要執(zhí)行消費(fèi)組初始化、消費(fèi)組重平衡、獲取消費(fèi)位點(diǎn)等與元數(shù)據(jù)相關(guān)工作。
- 如果當(dāng)前消費(fèi)組訂閱的分區(qū)(重平衡后分配的分區(qū))都存在位點(diǎn),則返回true,說(shuō)明無(wú)需更新位點(diǎn)。
- 如果當(dāng)前存在分配的分區(qū)沒有正確的位點(diǎn)(例如一次重平衡后新增加的分區(qū)),此時(shí)需要向服務(wù)端發(fā)送查找位點(diǎn)請(qǐng)求,服務(wù)端查詢__consumer_offsets主題,返回位點(diǎn)信息。
- 如果查詢到位點(diǎn),輸出DEBUG級(jí)別日志(Setting offset for partition),輸出從服務(wù)端查詢到的位點(diǎn);如果未查詢到位點(diǎn),同樣會(huì)輸出DEBUG級(jí)別日志(Found no committed offset for partition)。
- 如果沒有查詢到位點(diǎn),則需要根據(jù)消費(fèi)組配置的位點(diǎn)重置策略,其具體配置參數(shù):auto.offset.reset,其可選值:
- latest 最新位點(diǎn)
- earliest 最早位點(diǎn)
- none 不重置位點(diǎn)
- 如果重置位點(diǎn)選擇的是none,則會(huì)拋出NoOffsetForPartitionException異常。
- 如果重置位點(diǎn)選擇的是latest、earliest,則消費(fèi)者將從查詢到的位點(diǎn)開始消費(fèi),并輸出DEBUG級(jí)別日志(Resetting offset for partition XX to offset XXXX.)
- 非常遺憾,消費(fèi)者的位點(diǎn)查找機(jī)制,Kafka客戶端打印的過(guò)程日志是DEBUG級(jí)別,這在生產(chǎn)環(huán)境基本是不會(huì)輸出的,給我排查問(wèn)題(找到足夠的證據(jù))帶來(lái)了不便。
這里不得不吐槽一下Kafka輸出日志的策略:位點(diǎn)的變更是一個(gè)非常關(guān)鍵的狀態(tài)變更,而且輸出這些日志的頻率不會(huì)很大,日志級(jí)別應(yīng)該使用INFO,而不是DEBUG。
Kafka的日志是Debug,故當(dāng)時(shí)是無(wú)法找到證據(jù)進(jìn)行輔助說(shuō)明,只能排查出為什么會(huì)因?yàn)樾奶瑫r(shí)而觸發(fā)重平衡。
溫馨提示:關(guān)于心跳為什么會(huì)超時(shí),從而觸發(fā)重平衡原因,將會(huì)在后續(xù)的故障分析相關(guān)的文章中詳細(xì)闡述。
找到重平衡觸發(fā)原因后,在測(cè)試環(huán)境進(jìn)行壓測(cè)并加以重現(xiàn),同時(shí)將客戶端日志級(jí)別設(shè)置為debug,從而查找證據(jù),功夫不負(fù)有心人,完美的找到了上文中提到的三條日志:
- Setting offset for partition 第一次查詢時(shí)找到了位點(diǎn),并且不為-1,也不是最早位點(diǎn)。
- Found no committed offset for partition 后面反復(fù)進(jìn)行重平衡,反復(fù)查詢?nèi)罩荆谷缓竺鏌o(wú)法正確查詢到位點(diǎn),而是返回沒有找到位點(diǎn)(返回-1)。
- Resetting offset for partition XX to offset XXXX. 根據(jù)重置策略進(jìn)行了位點(diǎn)重置。
從上面的日志分析,也可以明確地出結(jié)論,服務(wù)端是有存儲(chǔ)消費(fèi)組的位點(diǎn)的,不然不會(huì)出現(xiàn)第一條日志,成功找到了一個(gè)有效的位點(diǎn),只是在后續(xù)重平衡過(guò)程中,多次需要查詢位點(diǎn)時(shí),反而返回了-1,那服務(wù)端在什么情況下返回-1呢?
Broker服務(wù)端處理心跳包的入口是kafkaApis的handleOffsetFetchRequest方法,找到獲取位點(diǎn)的關(guān)鍵代碼,如下所示:
從上面來(lái)看,服務(wù)端返回INVALID_OFFSET = -1L的情況如下:
- 消費(fèi)組元信息管理器中的緩存(內(nèi)存)中并不存在該消費(fèi)組,將返回-1,那又在什么情況下服務(wù)端會(huì)沒有正在使用的消費(fèi)組元信息呢?
- __consumer_offsets主題的分區(qū)發(fā)生Leader選舉,當(dāng)前Broker中擁有的分區(qū)變更為follower后,與該分區(qū)對(duì)應(yīng)的消費(fèi)組的元信息將被移除。為什么會(huì)這樣呢?這里背后的原因是Kafka中的消費(fèi)組在Broker端需要選舉出一個(gè)組協(xié)調(diào)器,用于協(xié)調(diào)消費(fèi)組的重平衡,選舉算法就是將消費(fèi)組的名稱取hashcode,得到的值與 consumer_offsets主題的分區(qū)數(shù)取模得到一個(gè)分區(qū)數(shù),然后該分區(qū)的Leader節(jié)點(diǎn)所在的Broker為該消費(fèi)組的組協(xié)調(diào)器,故分區(qū)Leader發(fā)生變化,與之關(guān)聯(lián)的消費(fèi)組的組協(xié)調(diào)器需要重新選舉。
- 刪除消費(fèi)組時(shí)將器移出。
- 消費(fèi)組的狀態(tài)為GroupState.Dead 消費(fèi)組狀態(tài)變更為Dead,通常有如下幾種情況:
- 消費(fèi)組被刪除
- __consumer_offsets分區(qū)leader發(fā)生變化,觸發(fā)位點(diǎn)重新加載,要先將消費(fèi)組狀態(tài)變更為Dead,然后新的分區(qū)Leader所在機(jī)器上會(huì)加載新的位點(diǎn),然后引導(dǎo)消費(fèi)組重平衡。
服務(wù)端中并沒有存儲(chǔ)該消費(fèi)組的位點(diǎn)信息,說(shuō)明該消費(fèi)組還未提交過(guò)位點(diǎn)
那上面的情況,對(duì)于一個(gè)正在運(yùn)行許久的消費(fèi)組來(lái)說(shuō),上述這些情況會(huì)發(fā)生嗎?查找服務(wù)端相關(guān)日志,可以明確看到大量__consumer_offsets相關(guān)分區(qū)發(fā)生leader選舉,容易觸發(fā)上述第一種情況,這樣消費(fèi)組發(fā)起的Offset Fetch請(qǐng)求是有可能返回-1,從而會(huì)引導(dǎo)消費(fèi)組根據(jù)重置策略進(jìn)行位點(diǎn)重置。
查看文章開頭部分,消費(fèi)組設(shè)置的重置策略選的是earliest,消費(fèi)組在一瞬間消費(fèi)積壓從0飆升到幾個(gè)億,就能解釋的通了。
看到這里,大家是不是會(huì)突然“后背發(fā)涼”,如果消費(fèi)組配置的位點(diǎn)重置策略(auto.offset.reset)為latest,是不是很容易引起消息丟失,即一部分消費(fèi)被跳過(guò)而不被消費(fèi),示意圖說(shuō)明如下:
本文就說(shuō)到這里了,關(guān)于Kafka集群為什么會(huì)出現(xiàn)大量__consumer_offsets進(jìn)行Leader選舉,后續(xù)文章會(huì)一一展開,敬請(qǐng)持續(xù)關(guān)注我。
3、感想
講真,由于Kafka服務(wù)端使用的編程語(yǔ)言為scala,筆者并沒有嘗試去看Kafka的源碼,只是詳細(xì)剖析了Kafka的消息發(fā)送、消息消費(fèi)機(jī)制,本以為可以輕松駕馭公司各個(gè)項(xiàng)目關(guān)于Kafka使用層面的問(wèn)題,但事實(shí)上也是如此,對(duì)項(xiàng)目組的咨詢我應(yīng)對(duì)起來(lái)得心應(yīng)手,但一旦服務(wù)端出現(xiàn)問(wèn)題,還是會(huì)有點(diǎn)茫然,當(dāng)然我們有一套完備的集群?jiǎn)栴}出現(xiàn)應(yīng)急方案,但一旦出現(xiàn)問(wèn)題,盡管你能快速恢復(fù),但故障一旦發(fā)生,損失就無(wú)法避免,故我們還是要對(duì)自己負(fù)責(zé)的內(nèi)容研究透,提前做好巡檢、根據(jù)體系化的知識(shí)提前規(guī)避故障的發(fā)生。
正例如大部分朋友應(yīng)該知道kafka在后續(xù)版本中的消費(fèi)位點(diǎn)是存儲(chǔ)在系統(tǒng)主題__consumer_offsets中,但又有多少人知道,這個(gè)主題的分區(qū)一旦出現(xiàn)Leader選舉,伴隨而來(lái)的是一大堆消費(fèi)組全部發(fā)生重平衡,導(dǎo)致消費(fèi)組停止消費(fèi)呢?
故筆者將下定決心,好好閱讀一下kafka服務(wù)端相關(guān)源碼,成體系化理解Kafka,在工作中更好的駕馭Kafka,《Kafka原理與實(shí)戰(zhàn)》專欄在路上,有興趣的朋友可以點(diǎn)擊文章前的標(biāo)簽加以關(guān)注。
最后,期待您的點(diǎn)贊,您的點(diǎn)贊也是我最大的動(dòng)力,我們下回見。





























