国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

關(guān)于 RocketMQ ClientID 相同引發(fā)的消息堆積的問題

開發(fā) 前端
這篇文章講解了 RocketMQ 的 Consumer 啟動之后都做了哪些操作,對理解本次要講解的 BUG 有一定的幫助。

[[436290]]

首先,造成這個問題的 BUG RocketMQ 官方已經(jīng)在 3月16號 的這個提交中修復(fù)了,這里只是探討一下在修復(fù)之前造成問題的具體細(xì)節(jié),更多的上下文可以參考我之前寫的 《RocketMQ Consumer 啟動時都干了些啥?》 ,這篇文章講解了 RocketMQ 的 Consumer 啟動之后都做了哪些操作,對理解本次要講解的 BUG 有一定的幫助。

其中講到了:

消息堆積

重復(fù)消費(fèi)自不必說,你 ClientID 都相同了。本篇著重聊聊為什么會消息堆積。

文章中講到,初始化 Consumer 時,會初始化 Rebalance 的策略。你可以大致將 Rebalance 策略理解為如何將一個 Topic 下的 m 個 MessageQueue 分配給一個 ConsumerGroup 下的 n 個 Consumer 實例的策略,看著有些繞,其實就長這樣:

rebalance策略

而從 Consumer 初始化的源碼中可以看出,默認(rèn)情況下 Consumer 采取的 Rebalance 策略是 AllocateMessageQueueAverage()。

默認(rèn)的 Rebalance 策略

默認(rèn)的策略很好理解,將 MessageQueue 平均的分配給 Consumer。舉個例子,假設(shè)有 8 個 MessageQueue,2 個 Consumer,那么每個 Consumer 就會被分配到 4 個 MessageQueue。

那如果分配不均勻怎么辦?例如只有 7 個 MessageQueue,但是 Consumer 仍然是 2 個。此時 RocketMQ 會將多出來的部分,對已經(jīng)排好序的 Consumer 再做平均分配,一個一個分發(fā)給 Consumer,直到分發(fā)完。例如剛剛說的 7 個 MessageQueue 和 2 個 ConsumerGroup 這種 case,排在第一個的 Consumer 就會被分配到 4 個 MessageQueue,而第二個會被分配到 3 個 MessageQueue。

大家可以先理解一下 AllocateMessageQueueAveragely 的實現(xiàn),作為默認(rèn)的 Rebalance 的策略,其實現(xiàn)位于這里:

默認(rèn)策略的實現(xiàn)位置

接下來我們看看,AllocateMessageQueueAveragely 內(nèi)部具體都做了哪些事情。

其核心其實就是實現(xiàn)的 AllocateMessageQueueStrategy 接口中的 allocate 方法。實際上,RocketMQ 對該接口總共有 5 種實現(xiàn):

  • AllocateMachineRoomNearby
  • AllocateMessageQueueAveragely
  • AllocateMessageQueueAveragelyByCircle
  • AllocateMessageQueueByConfig
  • AllocateMessageQueueByMachineRoom
  • AllocateMessageQueueConsistentHash

其默認(rèn)的 AllocateMessageQueueAveragely 只是其中的一種實現(xiàn)而已,那執(zhí)行 allocate 它需要什么參數(shù)呢?

入?yún)?/p>

需要以下四個:

  • ConsumerGroup 消費(fèi)者組的名字
  • currentCID 當(dāng)前消費(fèi)者的 clientID
  • mqAll 當(dāng)前 ConsumerGroup 所消費(fèi)的 Topic 下的所有的 MessageQueue
  • cidAll 當(dāng)前 ConsumerGroup 下所有消費(fèi)者的 ClientID

實際上是將某個 Topic 下的所有 MessageQueue 分配給屬于同一個消費(fèi)者的所有消費(fèi)者實例,粒度是 By Topic 的。

所以到這里剩下的事情就很簡單了,無非就是怎么樣把這一堆 MessageQueue 分配給這一堆 Consumer。這個怎么樣,就對應(yīng)了 AllocateMessageQueueStrategy 的不同實現(xiàn)。

接下來我們就來看看 AllocateMessageQueueAveragely 是如何對 MessageQueue 進(jìn)行分配的,之前講源碼我一般都會一步一步的來,結(jié)合源碼跟圖,但是這個源碼太短了,我就直接先給出來吧。

  1. public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { 
  2.   if (currentCID == null || currentCID.length() < 1) { 
  3.     throw new IllegalArgumentException("currentCID is empty"); 
  4.   } 
  5.   if (mqAll == null || mqAll.isEmpty()) { 
  6.     throw new IllegalArgumentException("mqAll is null or mqAll empty"); 
  7.   } 
  8.   if (cidAll == null || cidAll.isEmpty()) { 
  9.     throw new IllegalArgumentException("cidAll is null or cidAll empty"); 
  10.   } 
  11.  
  12.   List<MessageQueue> result = new ArrayList<MessageQueue>(); 
  13.  
  14.   // 判斷一下當(dāng)前的客戶端是否在 cidAll 的集合當(dāng)中 
  15.   if (!cidAll.contains(currentCID)) { 
  16.     log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}"
  17.              consumerGroup, 
  18.              currentCID, 
  19.              cidAll); 
  20.     return result; 
  21.   } 
  22.  
  23.   // 拿到當(dāng)前消費(fèi)者在所有的消費(fèi)者實例數(shù)組中的位置 
  24.   int index = cidAll.indexOf(currentCID); 
  25.   // 用 messageQueue 的數(shù)量 對 消費(fèi)者實例的數(shù)量取余數(shù), 這個實際上就把不夠均勻分的 MessageQueue 的數(shù)量算出來了 
  26.   // 舉個例子, 12 個 MessageQueue, 有 5 個 Consumer, 12 % 5 = 2  
  27.   int mod = mqAll.size() % cidAll.size(); 
  28.   int averageSize = 
  29.     mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); 
  30.   int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; 
  31.   int range = Math.min(averageSize, mqAll.size() - startIndex); 
  32.   for (int i = 0; i < range; i++) { 
  33.     result.add(mqAll.get((startIndex + i) % mqAll.size())); 
  34.   } 
  35.   return result; 

其實前半部分都是些常規(guī)的 check,可以忽略不看,從這里:

  1. int index = cidAll.indexOf(currentCID); 

開始,才是核心邏輯。為了避免邏輯混亂,還是假設(shè)有 12 個 MessageQueue,5 個 Consumer,同時假設(shè) index=0 。

那么 mod 的值就為 12 % 5 = 2 了。

而 averageSize 的值,稍微有點(diǎn)繞。如果 MessageQueue 的數(shù)量比消費(fèi)者的數(shù)量還少,那么就為 1 ;否則,就走這一堆邏輯(mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())。我們 index 是 0,而 mod 是 2,index < mod 則是成立的,那么最終 averageSize 的值就為 12 / 5 + 1 = 3。

接下來是 startIndex,由于這個三元運(yùn)算符的條件是成立的,所以其值為 0 * 3 ,就為 0。

看了一大堆邏輯,是不是已經(jīng)暈了?直接舉實例:

12 個 Message Queue

5 個 Consumer 實例

按照上面的分法:

排在第 1 的消費(fèi)者 分到 3 個

排在第 2 的消費(fèi)者 分到 3 個

排在第 3 的消費(fèi)者 分到 2 個

排在第 4 的消費(fèi)者 分到 2 個

排在第 5 的消費(fèi)者 分到 2 個

具體分配流程

所以,你可以大致認(rèn)為:

先“均分”,12 / 5 取整為 2。然后“均分”完之后還剩下 2 個,那么就從上往下,挨個再分配,這樣第 1、第 2 個消費(fèi)者就會被多分到 1 個。

所以如果有 13 個 MessageQueue,5 個 Consumer,那么第 1、第 2、第 3 就會被分配 3 個。

但并不準(zhǔn)確,因為分配的 MessageQueue 是一次性的,例如那 3 個 MessageQueue 是一次性獲取的,不會先給 2 個,再給 1 個。

而我們開篇提到的 Consumer 的 ClientID 相同,會造成什么?

當(dāng)然是 index 的值相同,進(jìn)而造成 mod、averageSize、startIndex、range 全部相同。那么最后 result.add(mqAll.get((startIndex + i) % mqAll.size())); 時,本來不同的 Consumer,會取到相同的 MessageQueue(舉個例子,Consumer 1 和 Consumer 2 都取到了前 3 個 MessageQueue),從而造成有些 MessageQueue(如果有的話) 沒有 Consumer 對其消費(fèi),而沒有被消費(fèi),消息也在不停的投遞進(jìn)來,就會造成消息的大量堆積。

當(dāng)然,現(xiàn)在的新版本從代碼上看已經(jīng)修復(fù)這個問題了,這個只是對之前的版本的原因做一個探索。

 

責(zé)任編輯:姜華 來源: SH的全棧筆記
相關(guān)推薦

2023-12-21 08:01:41

RocketMQ消息堆積

2022-11-08 07:36:17

RocketMQ消費(fèi)者消息堆積

2021-11-08 15:38:15

消息延遲堆積

2021-10-26 08:22:38

消息堆積擴(kuò)容RocketMQ

2024-07-29 00:01:00

RabbitMQ消息堆積

2024-06-24 08:42:11

2021-10-03 21:41:13

RocketMQKafkaPulsar

2013-06-20 09:59:12

Javascriptvar

2013-08-08 10:20:04

云計算災(zāi)難恢復(fù)反思

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2021-07-08 07:16:24

RocketMQ數(shù)據(jù)結(jié)構(gòu)Message

2021-02-02 11:01:31

RocketMQ消息分布式

2010-09-29 15:55:22

DHCP IP相同

2024-12-09 08:44:58

2022-12-22 10:03:18

消息集成

2024-06-06 11:57:44

2021-07-14 17:18:14

RocketMQ消息分布式

2021-07-07 15:29:52

存儲RocketMQ體系

2023-01-10 08:20:55

RocketMQ消息源碼
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

鲁大师精品99久久久| 亚洲一区免费视频| 自拍偷拍亚洲一区| 亚洲欧美另类图片| 欧美在线视频你懂得| 91麻豆蜜桃| 亚洲影视一区| 欧美在线一区二区三区四| 日本成人在线视频网址| 免费观看性欧美大片无片| 午夜av成人| 一本色道久久88综合亚洲精品ⅰ| 生活片a∨在线观看| 中文成人综合网| 欧美123区| 国产美女直播视频一区| 国偷自产av一区二区三区| 人体精品一二三区| 国产成人澳门| 欧美一二三四五区| 日韩av有码| 日韩电影第一页| 精品国产乱码久久久久久图片 | 日韩理论在线观看| 欧美刺激性大交免费视频| 亚洲大片在线| 国产精品旅馆在线| 国产成人aaa| 国产毛片视频| 日韩三区在线观看| 欧美丝袜激情| 欧美日韩精品免费观看| 成人黄色视屏网站| 亚洲女成人图区| 亚洲亚洲免费| 日本不卡高字幕在线2019| 欧美呦呦网站| 亚洲一区三区电影在线观看| 欧洲精品一区二区三区在线观看| 韩国三级一区| 国产一区二区三区高清| 处破女av一区二区| 丝袜美女写真福利视频| 日本精品久久中文字幕佐佐木| 欧美日韩成人一区二区| 亚洲成人福利在线| 久久精品一区四区| 大j8黑人w巨大888a片| 国产精品一区二区男女羞羞无遮挡| 日本不卡二区| 日韩精品欧美精品| 蜜桃欧美视频| 国产一区欧美一区| 国产成人艳妇aa视频在线| 国产精品主播直播| 免费无码毛片一区二三区| 成人18视频在线播放| 国产黄色一级网站| 亚洲欧洲精品天堂一级| h短视频大全在线观看| 欧美日韩国内自拍| 国产原创精品视频| 亚洲天堂免费视频| av在线精品| 欧美重口另类videos人妖| 农村少妇一区二区三区四区五区 | 高清在线不卡av| 精品国产一区三区| 国产精品每日更新| 久青草国产在线| 欧美日韩国产综合久久| 国产丝袜精品丝袜| 久久久精品影院| 国产日产一区 | 久久久久综合网| 中文字幕在线综合| 亚洲午夜久久久久久久久电影院 | 亚洲伊人av| www.欧美精品一二三区| 蜜臀91精品国产高清在线观看| 97影院在线午夜| 国产一区在线精品| 男人插女人欧美| 欧美主播一区二区三区美女| 伊人网在线播放| 97超碰蝌蚪网人人做人人爽| 影院欧美亚洲| 欧美一区二区三区爽大粗免费| 亚洲精品一二三四区| caopon在线免费视频| 爱福利视频一区| 外国成人免费视频| 黄色高清视频网站| 亚洲国产日韩精品| 345成人影院| 国产第一区电影| 老司机午夜精品99久久| 91大神影片| 日韩激情片免费| 欧美日韩性在线观看| 亚洲狠狠婷婷综合久久久| 亚洲青青青在线视频| 欧美日韩色网| 国产美女高潮久久白浆| 国产91高潮流白浆在线麻豆| 在线中文字幕视频| 蜜月aⅴ免费一区二区三区 | 精品无人区麻豆乱码久久久| 日本一区网站| 一区二区三区高清在线| 日韩精品视频免费| 中日韩免视频上线全都免费| 久久综合色视频| 最新日韩中文字幕| 亚洲国产专区校园欧美| 色在线中文字幕| 真实国产乱子伦对白视频| 日本免费在线视频| 久久久久久久久国产精品| 日韩av高清在线观看| 色视频网站在线观看| 日韩精品黄色网| 极品少妇一区二区三区| free性欧美1819hd| 亚洲一区第一页| 午夜在线播放视频欧美| 动漫成人在线| 欧美激情图片区| 狠狠色综合日日| 一区二区在线观看不卡| 狠狠色狠狠色综合日日小说| 国产精品久久久久久久久久辛辛| 精品国产免费人成电影在线观看四季| 婷婷成人影院| 精品国产一区三区| 精品久久人人做人人爽| 色乱码一区二区三在线看| 极品尤物一区| 日韩一级片免费视频| 精品久久五月天| 伊人精品成人久久综合软件| 伊人资源视频在线| 欧美亚洲日本黄色| 久久久精品免费网站| 亚洲承认视频| 亚洲欧美日本国产有色| 欧美视频在线播放| 欧美第一精品| 久草在线资源视频| 亚洲91精品在线| 中文字幕精品三区| jizz18欧美18| 91小视频网站| 欧美成人激情视频免费观看| 国产麻豆日韩欧美久久| 欧美激情网站| 色乱码一区二区三区熟女| 精品国产乱码久久| 欧美aaaaaa午夜精品| 天堂va在线| 在线观看成人av电影| 亚洲精品美女视频| 高清国产午夜精品久久久久久| 中国字幕a在线看韩国电影| 一本—道久久a久久精品蜜桃| 精品亚洲国产视频| 懂色av一区二区三区免费看| 成人深夜福利| 尤蜜粉嫩av国产一区二区三区| 久久久久久亚洲| 亚洲人被黑人高潮完整版| av资源久久| av中文字幕在线| 欧美日本亚洲| 国产亚洲精品美女| 91亚洲精品久久久蜜桃网站| av成人资源网| 在线免费视频福利| 91久久精品国产91久久性色tv| 欧美日韩一区三区| 美腿丝袜亚洲三区| 欧美伊人亚洲伊人色综合动图| xxxx一级片| 亚洲综合成人婷婷小说| 日韩欧美一区在线| 成人av先锋影音| 欧美亚洲tv| 精品成人一区二区三区免费视频| 久久99欧美| 色噜噜狠狠狠综合曰曰曰| 中文字幕一区不卡| 精久久久久久| 精品欧美一区二区三区在线观看 | 日韩欧美999| 蜜臀av一区二区在线免费观看| 欧美视频第一| 黄页网站在线播放| 亚洲综合视频一区| 欧美一级bbbbb性bbbb喷潮片| 91精品国产综合久久久久 |