国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

消費者原理分析-RocketMQ知識體系(四)

開發(fā) 架構
本文將講講消息消費的過程及相關概念。關于消息消費,消費者組這些概念,基本和kafka 是類似的,一個消費組內(nèi)可以包含多個消費者,1個消費組可訂閱多個主題。消費組之間有集群模式與廣播模式兩種。

[[410711]]

前文了解了 RocketMQ消息存儲的相關原理,本文將講講消息消費的過程及相關概念。

消息消費

關于消息消費,消費者組這些概念,基本和kafka 是類似的,比如:

一個消費組內(nèi)可以包含多個消費者,1個消費組可訂閱多個主題。消費組之間有集群模式與廣播模式兩種。

集群模式下,主題下的同一消息只允許被消費組內(nèi)的一個消費者消費,消費進度存儲在 broker 端。廣播模式下,則每個消費者都可以消費該消息,消費進度存儲在消費者端。

集群模式下,一個消費隊列同一時間,只允許被一個消費者消費,1個消費者,可以消費多個消息隊列。具體的可以看我前面的文章。

而且 rocketmq 消息服務器與消費者的消息傳輸有 2 種方式:推模式、拉模式。拉模式,即消費者主動向消息服務器發(fā)送請求;推模式,即消息服務器向消費者推送消息。推模式,是基于拉模式實現(xiàn)的。

消費者啟動

主要就是初始化了三個組件,然后啟動后臺定時任務。

三個組件:

  • 【RebalanceImpl】均衡消息隊列服務,負責分配當前 Consumer 可消費的消息隊列( MessageQueue )。當有新的 Consumer 的加入或移除,都會重新分配消息隊列。
  • 【PullAPIWrapper】拉取消息組件
  • 【offsetStore】消費進度組件

幾個定時任務

  • PullMessageService
  • 從阻塞隊列pullRequestQueue中獲取consumer的pull請求
  • RebalanceService
  • 負載均衡定時任務,給 Consumer 分配可消費的 MessageQueue
  • fetchNameServerAddr
  • 定時獲取 NameSever 地址
  • updateTopicRouteInfoFromNameServer
  • 定時更新Topic路由信息
  • cleanOfflineBroker
  • 定時清理下線Broker
  • sendHeartbeatToAllBrokerWithLock
  • 發(fā)送心跳
  • persistAllConsumerOffset
  • 持久化消費進度 ConsumerOffset

消息拉取

對于任何一款消息中間件而言,消費者客戶端一般有兩種方式從消息中間件獲取消息并消費:

Pull

即消費者每隔一定時間主動去 Broker 拉取消息

優(yōu)點

消費速度、數(shù)量可控

缺點

如果間隔時間短,可能會拉空,并且頻繁 RPC 請求增加網(wǎng)絡開銷 如果間隔時間長,則可能會有消息延遲 消費進度offset需要consumer自己來維護

Push

即 Broker 主動實時推送消息給消費者

優(yōu)點

消息實時,保持長鏈接,不會頻繁建立鏈接

缺點

如果消息數(shù)量過大,消費者吞吐量小,肯能會造成消費者緩沖區(qū)溢出。

在文章的開頭我們也說了RocketMQ推模式,是基于拉模式實現(xiàn)的。

【PullMessageService 消息拉取】

RocketMQ 通過 PullMessageService 拉取消息。

通過代碼段 PullMessageService#run可以看出:

  1. public void run() { 
  2.   // stopped 是 volidate 修飾的變量,用于線程間通信。 
  3.   while (!this.isStopped()) { 
  4.   // ..  
  5.       // 阻塞隊列, 如果 pullRequestQueue 沒有元素,則阻塞 
  6.       PullRequest pullRequest = this.pullRequestQueue.take(); 
  7.       // 消息拉取  
  8.       this.pullMessage(pullRequest); 
  9.    // ... 
  10.   } 

關于PullRequest

  1. // 消費者組 
  2. private String consumerGroup; 
  3. // 消息隊列 
  4. private MessageQueue messageQueue; 
  5. // 消息處理隊列,從 Broker 拉取到的消息先存入 ProcessQueue,然后再提交到消費者消費池消費 
  6. private ProcessQueue processQueue; 
  7. // 待拉取的 MessageQueue 偏移量 
  8. private long nextOffset; 
  9. // 是否被鎖定 
  10. private boolean lockedFirst = false

PullMessageService 添加 PullRequest 有兩種方式:

延時添加

立即添加

【關于ProcessQueue】

ProcessQueue 是 MessageQueue 在消費端的重現(xiàn)、快照。PullMessageService 從消息服務器默認每次拉取 32 條消息,按消息的隊列偏移量順序存放在 ProcessQueue 中,PullMessageService 再將消息提交到消費者消費線程池。消息消費成功后,從 ProcessQueue 中移除。

  1. // 讀寫鎖 
  2. private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); 
  3. // 消息存儲容器, k:消息偏移量,v:消息實體 
  4. private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); 
  5. // ProcessQueue 中消息總數(shù) 
  6. private final AtomicLong msgCount = new AtomicLong(); 
  7. // ProcessQueue 中消息總大小 
  8. private final AtomicLong msgSize = new AtomicLong(); 
  9. // 當前 ProcessQueue 中包含的最大隊列偏移量 
  10. private volatile long queueOffsetMax = 0L; 
  11. // 當前 ProcessQueue 是否被丟棄 
  12. private volatile boolean dropped = false
  13. // 上一次開始消息拉取時間戳 
  14. private volatile long lastPullTimestamp = System.currentTimeMillis(); 
  15. // 上一次消息消費時間戳 
  16. private volatile long lastConsumeTimestamp = System.currentTimeMillis(); 

【對消息拉取進行流量控制】

processQueue 的消息數(shù)量 大于 1000, processQueue 的消息大小 大于 100 MB,將延遲 50 毫秒后拉取消息

processQueue 中偏移量最大的消息與偏移量最小的消息的跨度超過 2000 則延遲 50 毫秒再拉取消息。

根據(jù)主題拉取訂閱的消息,如果為空,延遲 3 秒,再拉取。

【消息服務端 broker 組裝消息】

代碼位置:PullMessageProcessor#processRequest

  • 根據(jù)訂閱消息,構建消息過濾器
  • 調(diào)用 MessageStore.getMessage 查找消息
  • 根據(jù)主題名與隊列編號獲取消息消費隊列
  • 消息偏移量異常情況校對下一次拉取偏移量
  • 根據(jù) PullRequest 填充 responseHeader 的 nextBeginOffset、minOffset、maxOffset
  • 根據(jù)主從同步延遲,如果從節(jié)點數(shù)據(jù)包含下一次拉取的偏移量,設置下一次拉取任務的 brokerId
  • 如果 commitlog 標記可用并且當前節(jié)點為主節(jié)點,則更新消息消費進度

【消息拉取長輪詢機制】

RocketMQ 推模式是循環(huán)向消息服務端發(fā)送消息拉取請求。

消費者向 broker 拉取消息時,如果消息未到達消費隊列,并且未啟用 長輪詢機制,則會在服務端等待 shortPollingTimeMills(默認1秒) 時間后再去判斷消息是否已經(jīng)到達消息隊列,如果消息未到達,則提示消息拉取客戶端 PULL_NOT_FOUND。

如果開啟長輪詢模式,rocketMQ 會每 5s 輪詢檢查一次消息是否可達,同時一有新消息到達后立馬通知掛起線程再次驗證新消息是否是自己感興趣的消息,如果是則從 commitlog 文件提取消息返回給消息拉取客戶端,否則直到掛起超時,超時時間由消息拉取方在消息拉取時封裝在請求參數(shù)中,PUSH 模式默認 15s。

PULL 模式通過 DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis 設置。RocketMQ 通過在 Broker 端配置 longPollingEnable 為 true 來開啟長輪詢模式。

RocketMQ 的長輪詢機制由 2 個線程共同完成。PullRequestHoldService、ReputMessageService。

【Push消費模式流程簡析】

后臺獨立線程RebalanceServic根據(jù)Topic中消息隊列個數(shù)和當前消費組內(nèi)消費者個數(shù)進行負載均衡,給當前消費者分配對應的MessageQueue,將其封裝為PullRequest實例放入隊列pullRequestQueue中。

Consumer端開啟后臺獨立的線程PullMessageService不斷地從隊列pullRequestQueue中獲取PullRequest并通過網(wǎng)絡通信模塊異步發(fā)送Pull消息的RPC請求給Broker端。這里算是比較典型的生產(chǎn)者-消費者模型,實現(xiàn)了準實時的自動消息拉取。

PullMessageService異步拉取到消息后,通過PullCallback進行回調(diào)處理,如果拉取成功,則更新消費進度,putPullRequest到阻塞隊列pullRequestQueue中,接著立即進行拉取

監(jiān)聽器 ConsumeMessageConcurrentlyService 會一直監(jiān)聽回調(diào)方法 PullCallback,把拉取到的消息交給Consumerrequest進行處理,Consumerrequest會調(diào)用消費者業(yè)務方實現(xiàn)的consumeMessage()接口處理具體業(yè)務,消費者業(yè)務方處理完成后返回ACK給Consumerrequest,如果消費者ACK返回的失敗,則在集群模式下把消息發(fā)回 Broker 進行重試(廣播模型重試的成本太高),最后更新消費進度offsetTable

在Broker端,PullMessageProcessor業(yè)務處理器收到Pull消息的RPC請求后,通過MessageStore實例從commitLog獲取消息。如果第一次嘗試Pull消息失敗(比如Broker端沒有可以消費的消息),則通過長輪詢機制先hold住并且掛起該請求,然后通過Broker端的后臺線程PullRequestHoldService重新嘗試和后臺線程ReputMessageService進行二次處理。

【Push消息流程圖】

圖片

RocketMQ消息消費的長輪詢機制

普通輪詢和長輪詢的區(qū)別:

普通輪詢比較簡單,就是定時發(fā)起請求,服務端收到請求后不論數(shù)據(jù)有沒有更新都立即返回

優(yōu)點就是實現(xiàn)簡單,容易理解。

缺點就是服務端是被動的,服務端要不斷的處理客戶端連接,并且服務端無法控制客戶端pull的頻率以及客戶端數(shù)量.

長輪詢是對普通輪詢的優(yōu)化,依然由客戶端發(fā)起請求,服務端收到后并不立即響應而是hold住客戶端連接,等待數(shù)據(jù)產(chǎn)生變更后(或者超過指定時間還未產(chǎn)生變更)才回復客戶端

說白了,就是對普通輪詢加了個控制,你客戶端可以隨時請求我,但是回不回復我說了算,這就保證了服務端不會被客戶端帶節(jié)奏,導致自己的壓力不可控.

在 RocketMq 中消費者主動發(fā)起pull請求,broker在處理消息拉取請求時,如果沒有查詢到消息,將不返回消費者任何信息,而是先hold住并且掛起請求,使其不會立即發(fā)起下一次拉取請求,會將請求信息pullRequest添加到pullRequestTable中,等待觸發(fā)通知消費者的事件。

當生產(chǎn)者發(fā)送最新消息過來后,首先持久化到commitLog文件,通過異步方式同時持久化consumerQueue和index。然后激活consumer發(fā)送來hold的請求,立即將消息通過channel寫入consumer客戶。

如果沒有消息到達且客戶端拉取的偏移量是最新的,會hold住請求。其中hold請求超時時間 < 請求設定的超時時間。同時Broker端也定時檢測是否請求超時,超時則立即將請求返回,狀態(tài)code為NO_NEW_MESSAGE。

然后在Broker端,通過后臺獨立線程PullRequestHoldService遍歷所有掛起的請求pullRequestTable,如果有消息,則返回響應給消費者。

同時,另外一個ReputMessageService線程不斷地構建ConsumeQueue/IndexFile數(shù)據(jù),不斷的檢測是否有新消息產(chǎn)生,如果有新消息,則從pullRequestTable通過Topic+queueId的key獲取對應hold住的請求pullRequest,再根據(jù)其中的長鏈接channel進行通信響應。

通過這種長輪詢機制,即可解決Consumer端需要通過不斷地發(fā)送無效的輪詢Pull請求,而導致整個RocketMQ集群中Broker端負載很高的問題。

流程如下:

圖片

消息隊列負載與重新分布機制

當一個業(yè)務系統(tǒng)部署多臺機器時,每臺機器都啟動了一個Consumer,并且這些Consumer都在同一個ConsumerGroup也就是消費組中,此時一個消費組中多個Consumer消費一個Topic,而一個Topic中會有多個MessageQueue。

比如有2個Consumer,3個MessageQueue,那么這3個MessageQueue怎么分配呢?這就涉及到Consumer的負載均衡了。

首先 Consumer 在啟動時,會把自己注冊給所有 Broker ,并保持心跳,讓每一個 Broker 都知道消費組中有哪些 Consumer 。

然后 Consumer 在消費時,會隨機鏈接一臺 Broker ,獲取消費組中的所有 Consumer 。

主要流程如下:

圖片

RocketMQ 消息隊列重新分布由 RebalanceService 線程來實現(xiàn)的。RebalanceService 隨著 MQClientInstance 的啟動而啟動。RebalanceService 默認每 20 秒,執(zhí)行一次 MQClientInstance#doRebalance

【主題的消息隊列負載流程】

  1. 獲取主題的隊列,向 broker 發(fā)送請求,獲取主題下,消費組所有消費者客戶端ID。
  2. 只有當 2 者均不為空時,才有必要進行 rebalance。
  3. 在 rebalance 時,需要對 隊列,還有消費者客戶端 ID 進行排序,以確保同一個消費組下的視圖是一致的。
  4. 根據(jù) 分配策略 AllocateMessageQueueStrategy 為 消費者分配隊列。

客戶端執(zhí)行期間 伴隨著PullMessageService 與 RebalanceService 線程交互

圖片

消息消費過程

【消費過程】

  1. 默認拉取32條消息,如果消息數(shù)量大于 32 則分頁處理。
  2. 每次進行消費時,都會判斷 processQueue 是否被刪除,阻止消費者 消費 不屬于自己的 隊列
  3. 恢復重試消息主題名, rocketMQ 消息重試機制,決定了,如果發(fā)現(xiàn)消息的延時級別 delayTimeLevel 大于 0,會首先將重試主題存入消息的屬性中,然后設置主題名稱為 SCHEDULE_TOPIC ,以便時間到后重新參與消息消費。
  4. 在消費之前,執(zhí)行 hock
  5. 執(zhí)行,我們編寫的消費代碼
  6. 在消費之后,執(zhí)行 hock
  7. 消費完畢后,再次驗證 processQueue 是否被刪除,如果被刪除,不處理結(jié)果。
  8. 對消費者返回的結(jié)果,進行處理
  9. 如果消費成功,那么 ack = consumeRequest.getMsgs().size() - 1。會直接更新消費進度。如果消費失敗,那么 ack = -1,重新發(fā)送消息。如果在重新發(fā)送消息時,又失敗了,那么會延遲 5 秒在繼續(xù)消費。
  10. 不管是消費成功,還是失敗,都會更新消費進度

【消息確認】

客戶端在發(fā)送重試消息時,封裝了 ConsumerSendMsgBackRequestHeader。

  1. // 消息物理偏移量 
  2. private Long offset; 
  3. // 消費組 
  4. private String group
  5. // 延遲等級 
  6. private Integer delayLevel; 
  7. // 消息ID 
  8. private String originMsgId; 
  9. // 消息主題 
  10. private String originTopic; 
  11. // 最大重新消費次數(shù),默認 16 次   SubscriptionGroupConfig.retryMaxTimes 中定義 
  12. private Integer maxReconsumeTimes; 

服務端的接收邏輯

  • 先獲取消費組訂閱配置信息,不存在則直接返回
  • 創(chuàng)建主題:%RETRY% + group,并隨機選擇一個隊列
  • 用原來的消息,創(chuàng)建一個新的消息
  • 如果重試消息的最大重試次數(shù)超過 16 次(默認),則將消息放入 %DLQ% 隊列(死信隊列)。等待人工處理
  • 由 Commitlog.putMessage 存入消息。

小結(jié)

從消息消費者和消費者組的基本概念,到消息消費的流程。我們了解了RocetMQ消息消費的相關原理。消費者客戶端的啟動后,會后臺運行幾個定時任務來處理相關的邏輯。也知道了RocetMQ消息獲取有推拉兩種模式,而且推模式也是建立在拉模式的基礎之上。知道了普通輪詢和長輪詢的區(qū)別,并且了解了長輪詢的實現(xiàn)邏輯。對消息消費和確認流程有了了解。

 

責任編輯:姜華 來源: 小汪哥寫代碼
相關推薦

2021-07-08 05:52:34

Kafka架構主從架構

2021-07-08 07:16:24

RocketMQ數(shù)據(jù)結(jié)構Message

2022-07-07 09:00:49

RocketMQ消費者消息消費

2025-07-08 08:51:45

2021-07-09 07:15:48

RocketMQ數(shù)據(jù)結(jié)構kafka

2022-11-08 07:36:17

RocketMQ消費者消息堆積

2021-07-14 17:18:14

RocketMQ消息分布式

2021-07-16 18:44:42

RocketMQ知識

2021-07-07 15:29:52

存儲RocketMQ體系

2024-01-24 09:00:31

SSD訂閱關系內(nèi)存

2024-04-22 00:00:00

RocketMQ優(yōu)化位點

2021-07-13 11:52:47

順序消息RocketMQkafka

2023-03-28 07:08:09

RocketMQ消費者堆棧

2015-07-28 17:52:36

IOS知識體系

2017-06-22 13:07:21

2012-03-08 11:13:23

企業(yè)架構

2017-04-03 15:35:13

知識體系架構

2017-02-27 16:42:23

Spark識體系

2021-07-05 06:26:08

生產(chǎn)者kafka架構

2022-05-09 11:15:05

RocketMQPULL 模式PUSH 模式
點贊
收藏

51CTO技術棧公眾號

亚洲天堂中文字幕在线观看| 男人资源在线播放| 五月精品视频| 99久久婷婷国产综合精品电影√| 欧美一区永久视频免费观看| 69国产精品| 99国内精品久久| 丰满女人性猛交| 午夜激情久久| 亚洲免费三区一区二区| 欧美日本一区二区视频在线观看| 久久天天躁狠狠躁夜夜躁2014| 国产羞羞视频在线播放| 欧美日韩国产精品成人| 日本成人一区| 色综合久久99| 日本一区高清| 日韩欧美中文字幕在线观看| 粉嫩粉嫩芽的虎白女18在线视频| 国产欧美一区二区精品仙草咪| 欧美少妇一级片| 久久精品国产77777蜜臀| 蜜桃传媒一区二区| 久久精品一区二区国产| 免费观看国产成人| 日韩精品一二三四| 亚州欧美一区三区三区在线| 裸体一区二区| 一区二区三区视频在线播放| 裸体一区二区三区| 樱空桃在线播放| 国产**成人网毛片九色| 欧美牲交a欧美牲交| 久久精品男人天堂av| 成人亚洲成人影院| 一区二区三区在线观看动漫 | 在线精品自拍| 久久久噜噜噜久久久| y111111国产精品久久久| 欧美激情伊人电影| 精品国产一区二区三区小蝌蚪| 国产成人午夜视频网址| 欧美久久99| 亚洲一区二区三区在线观看视频| 国产一区二区三区综合| 青青在线视频观看| 洋洋av久久久久久久一区| 女人天堂在线| 亚洲成人久久电影| 亚洲视频国产精品| 国产精品视频久久久久| 一区在线播放| www.国产在线视频| 中文字幕不卡的av| 久久国产精品高清一区二区三区| 91精品麻豆日日躁夜夜躁| 欧美xoxoxo| 人妖精品videosex性欧美| 国产精品大片| www.成年人视频| 亚洲韩国精品一区| 久久免费电影| 国产91精品高潮白浆喷水| 精品99视频| 日韩av黄色网址| 色综合一个色综合亚洲| 美女日韩欧美| 热99精品里视频精品| 噜噜噜在线观看免费视频日韩| 国产中文字幕二区| 欧美日韩激情网| 日韩av中字| 国产精品嫩草影院久久久| 视频一区二区三区在线| 波多野结衣在线中文| 欧美mv和日韩mv的网站| 国产极品模特精品一二| 色女人综合av| 亚洲成人av电影在线| 日韩毛片在线| 国产精品久久久久久久天堂第1集| 国产一区二区调教| 亚洲图区欧美| 欧美激情二区三区| 精品一区二区三区日韩| 青青青草原在线| 国内外成人免费激情在线视频网站| 在线电影av不卡网址| 黄色免费看片| 日韩av在线免播放器| 日本久久成人网| 日韩在线三区| 亚洲超碰精品一区二区| 欧美黄色a视频| 欧美成人免费在线| 18涩涩午夜精品.www| 国产欧美视频在线| 在线不卡国产精品| 午夜日本精品| sihu成人| 精品国模在线视频| 女人让男人操自己视频在线观看 | 亚洲成人av中文| 快播电影网址老女人久久| 免费观看成人在线| 色婷婷亚洲婷婷| 永久免费观看精品视频| 亚洲韩国在线| 亚洲一线二线三线视频| 一本久久青青| 日韩一区av| 精品久久一二三| 成人美女av在线直播| 欧美顶级少妇做爰| 国产麻豆精品视频| 欧美成人自拍| 亚洲第一影院| 成人欧美一区二区三区在线湿哒哒| 精品一区三区| 欧洲美女和动交zoz0z| 亚洲成人精品av| 日韩综合小视频| 精品视频一二区| 另类专区欧美制服同性| 欧美黄污视频| 久久女同性恋中文字幕| 蜜桃视频免费网站| 亚洲蜜臀av乱码久久精品 | 91麻豆精品国产自产在线观看一区| 国产综合av| 成人免费视频网站在线看| 成人在线播放网站| 亚洲妇熟xx妇色黄| 老司机精品视频导航| 亚洲片区在线| 96av在线| 激情小说亚洲| 欧美一级欧美一级| 亚洲美女免费精品视频在线观看| 日日摸夜夜添夜夜添精品视频| 亚洲1卡2卡3卡4卡乱码精品| 国产激情一区二区三区在线观看| 精品久久久久久亚洲精品| 日韩精品久久久久久久电影99爱| 免费白浆视频| 国产精品视频最多的网站| 一区二区三区成人| 波多野结衣在线观看一区二区| 超碰在线人人| 国产欧美 在线欧美| 亚洲成人av电影| 亚洲视频一区| 欧美人体视频xxxxx| 国产精品久久成人免费观看| 亚洲免费视频在线观看| av在线不卡观看免费观看| 日韩av综合| 黄色春季福利在线看| 成人做爽爽免费视频| 欧美视频日韩视频| 日韩国产在线一| 欧美性aaa| 国产成免费视频| 国产精品久久波多野结衣| 欧美老年两性高潮| 国产馆精品极品| 日韩aaa久久蜜桃av| 色网站在线免费观看| 日韩亚洲欧美精品| 自拍亚洲一区欧美另类| 国产精品电影院| 亚洲欧美亚洲| gogo久久| 成人拍拍拍在线观看| 91免费电影网站| 亚洲国产婷婷香蕉久久久久久| 久久久久久久性| 欧美va天堂| 日韩成人影音| 久热久精久品这里在线观看| 欧美精品国产精品久久久 | 91久久精品网| 国产很黄免费观看久久| 亚洲三级网页| 免费不卡av| 成人www视频网站免费观看| 国产精品久久久久久久久久久久午夜片 | 国产精品久久久久久久久免费丝袜| 婷婷丁香综合| 日本成人三级电影| 爽爽免费视频| 欧美日韩中文字幕在线播放| 国产精品高清在线观看| 日韩精品免费在线视频观看| 亚洲一区二区影院| 国产白丝网站精品污在线入口| 一区二区三区午夜探花| 亚洲老司机网| 成人福利网站| 360天大佬第二季在线观看|