国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

一文帶你理解 RocketMQ 廣播模式實現機制

開發 架構
本文主要講解了 RocketMQ 廣播消息的實現機制,理解廣播消息。

大家好,我是君哥。今天聊聊 RocketMQ 的廣播消息實現機制。

RocketMQ 有兩種消費模式,集群模式和廣播模式。

集群模式是指 RocketMQ 中的一條消息只能被同一個消費者組中的一個消費者消費。如下圖,Producer 向 TopicTest 這個 Topic 并發寫入 3 條新消息,分別被分配到了 MessageQueue1~MessageQueue3 這 3 個隊列,然后 Group 中的三個 Consumer 分別消費了一條消息:

圖片

廣播模式是  RocketMQ 中的消息會被消費組中的每個消費者都消費一次,如下圖:

圖片

使用 RocketMQ 的廣播模式時,需要在消費端進行定義,下面是一段官方示例:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}

從代碼中可以看到,在定義 Consumer 時,通過 messageModel 這個屬性指定消費模式,這里指定為 BROADCASTING,也就啟動了廣播模式的消費者。

1、消費者啟動

以 RocketMQ 推模式為例,看一下消費者調用關系類圖:

圖片

DefaultMQPushConsumer 作為啟動入口類,它的 start 方法調用了 DefaultMQPushConsumerImpl 類的 start 方法,下面重點看一下這個方法。

(1)拷貝訂閱關系

start 方法中調用了 copySubscription 方法,代碼如下:

private void copySubscription() throws MQClientException {
try {
//拷貝訂閱關系
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

這里的代碼有一點需要注意:集群模式會創建一個重試 Topic 的訂閱關系,而廣播模式是不會創建這個訂閱關系的。也就是說廣播模式不考慮重試。

(2)初始化偏移量

下面是初始化 offset 的代碼:

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}

從上面的代碼可以看到,廣播模式使用了 LocalFileOffsetStore,也就是說偏移量保存在客戶端本地,除了在內存中會保存,在本地文件中也會保存。

2、消息拉取

ConsumeMessageService 是真正拉取消息的地方,消費者初始化時會初始化 ConsumeMessageService,并且這里會區分并發消息還是順序消息。

(1)順序消息

在集群模式下,需要獲取到 processQueue 的鎖才會拉取消息,而在廣播模式下,不用獲取鎖,直接就可以拉取消息。判斷邏輯如下:

//ConsumeMessageOrderlyService.ConsumeRequest
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
}
}

這里有個疑問,對于順序消息,獲取鎖是必須的,這樣才能保證一個 processQueue 只能由一個線程進行處理,從而保證消費的順序性。那對于廣播模式,為什么不用獲取 processQueue 的鎖呢?難道廣播模式不支持順序消息?

(2)并發消息

對于并發消息,廣播模式不同的是,對消費結果的處理。集群模式消費失敗后需要把消息發送回 Broker 等待再次被拉取,而廣播模式則不需要重試。代碼如下:

//ConsumeMessageConcurrentlyService.rocessConsumeResult
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}

這再次說明,廣播模式是不支持消息重試的。

3、重平衡

在消費者啟動過程中,會調用 RebalanceService 的 start 方法,進行重平衡。從重平衡的代碼中可以看到,廣播模式消費者會消費所有 MessageQueue,而集群模式下會根據負載均衡策略選擇其中幾個 MessageQueue。代碼如下:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
//省略部分邏輯
} else {
}
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
//省略部分邏輯
if (mqSet != null && cidAll != null) {
//省略部分邏輯
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
//省略部分邏輯
}
break;
}
default:
break;
}
}

上面 updateProcessQueueTableInRebalance 這個方法調用前,要獲取到需要消費的 MessageQueue 集合。廣播模式下,直接取了訂閱的 Topic 下的所有集合元素,而集群模式下,則需要通過負責均衡獲取當前消費者自己要消費的 MessageQueue 集合。

4、總結

本文主要講解了 RocketMQ 廣播消息的實現機制,理解廣播消息,要把握下面幾點:

1.偏移量保存在消費者本地內存和文件中。

2.廣播消息不支持重試。

3.從源碼上看,廣播模式并不能支持順序消息。

4.廣播模式消費者訂閱了 Topic 下的所有 MessageQueue,不會重平衡。

責任編輯:姜華 來源: 君哥聊技術
相關推薦

2019-10-11 08:41:35

JVM虛擬機語言

2022-06-27 11:04:24

RocketMQ順序消息

2021-09-02 12:07:48

Swift 監聽系統Promise

2024-10-16 10:11:52

2023-07-17 10:45:03

向量數據庫NumPy

2021-09-08 17:42:45

JVM內存模型

2020-03-18 13:40:03

Spring事數據庫代碼

2022-03-18 13:58:00

RocketMQ消息隊列

2020-11-17 09:32:57

設計模式責任鏈

2023-07-31 08:18:50

Docker參數容器

2023-11-06 08:16:19

APM系統運維

2021-05-29 10:11:00

Kafa數據業務

2022-11-11 19:09:13

架構

2023-11-20 08:18:49

Netty服務器

2022-12-20 07:39:46

2023-12-21 17:11:21

Containerd管理工具命令行

2022-05-11 07:38:45

SpringWebFlux

2023-12-26 08:08:02

Spring事務MySQL

2022-06-13 11:05:35

RocketMQ消費者線程

2020-05-14 13:39:19

Java 垃圾回收機制
點贊
收藏

51CTO技術棧公眾號

九色精品蝌蚪| 国产清纯美女被跳蛋高潮一区二区久久w| 国产尤物99| 成人av电影在线网| 午夜视频在线观看韩国| 久久最新资源网| 国产成人一区二区精品非洲| 男人添女人下部高潮视频在线观看 | 自拍偷拍欧美日韩| 黄网站色视频免费观看| 亚洲国产成人久久| 久久99精品一区二区三区三区| 视频三区在线观看| 91在线视频免费| 亚洲一级二级三级| 日韩高清在线免费观看| free性欧美1819hd| 国产精品久久久久秋霞鲁丝| 懂色av色香蕉一区二区蜜桃| 国产欧美一区二区精品仙草咪| 久久久久久久久久久免费视频| 欧美午夜精品久久久| 中文一区二区三区四区| 亚洲最大福利网| 狠狠干狠狠久久| 成人羞羞网站| 国产视频在线播放| 韩国成人av| 在线免费观看羞羞视频一区二区| 99re国产精品| 九色porny丨国产首页在线| 国产精品国模大尺度私拍| 成人激情午夜影院| 久久丁香四色| 青青草娱乐在线| 久久精品一区二区三区不卡免费视频| 欧美日韩国产精品| 美女脱光内衣内裤视频久久网站| 漫画在线观看av| 免费看av大片| 亚洲一区二区三区欧美| 久久久久久久久久av| 在线免费不卡视频| 亚洲激情网站免费观看| 黄色网址视频在线观看| 久久福利网址导航| 蜜乳av另类精品一区二区| 国产三级一区| 综合激情一区| 精品无人区一区二区三区竹菊| 国产精品毛片无遮挡高清| 欧美人妖在线| 91网在线播放| 亚洲va韩国va欧美va精四季| 日韩手机在线导航| 成人中文字幕电影| 欧美中文字幕一区二区| 日韩在线资源| 男女啪啪网站视频| 国产免费一区二区三区在线能观看| 日韩视频一区二区| 久久久久国产一区二区三区四区 | 日韩二区三区在线观看| 精品176二区| 一区视频二区视频| 18久久久久久| 91精品办公室少妇高潮对白| 成人黄色在线看| 亚洲欧美亚洲| 国产中文在线播放| 秋霞福利视频| 九九久久九九久久| 国产精品区免费视频| 久久精品免费播放| 在线播放视频一区| 欧美日韩在线视频一区二区| www国产成人免费观看视频 深夜成人网| 仙踪林久久久久久久999| 97视频精彩视频在线观看| 牛夜精品久久久久久久| 欧美视频免费看欧美视频| 国产免费一区二区三区| 国产精品黄页免费高清在线观看| 欧美三级乱人伦电影| 亚洲天堂免费在线观看视频| 99久久99久久精品免费看蜜桃| 国产精品久久久亚洲一区| 唐人社导航福利精品| 国产三级在线播放| 97dyy97影院理论片在线| 欧美高清一区二区| 国产精品久久久久久中文字| 色综久久综合桃花网| 中文字幕精品—区二区| 国产亚洲精品久久久久久| 久久国产精品第一页| 欧美一二区在线观看| 精品国产一区二区三区久久久蜜臀| 91麻豆精品国产91久久久更新资源速度超快| 成年在线电影| 欧美精品少妇| www免费视频观看在线| 午夜dj在线观看高清视频完整版| 91一区二区三区在线| 男人插曲女人视频免费| 奇米影音第四色| 国产精品97在线| 日韩欧美国产片| 九七影院理伦片| 在线免费激情视频| 有码在线播放| √最新版天堂资源网在线| 欧美亚洲黄色| xxxxx91麻豆| 日本在线免费观看一区| 精品国产一区二区三区麻豆小说| 成人两性免费视频| 亚洲欧美久久久久一区二区三区| 黄瓜视频免费观看在线观看www| 韩日视频在线观看| 粗大的内捧猛烈进出在线视频| 久久国产精品高清一区二区三区| 青青草免费观看免费视频在线| 牛牛精品在线| 国产一区二区三区探花 | 成人99免费视频| 亚洲午夜久久久久中文字幕久| 精品粉嫩超白一线天av| xxxxx91麻豆| 午夜精品久久久久久久99热 | 欧美日韩亚洲丝袜制服| 久久久欧美一区二区| 亚洲成人精品电影在线观看| 加勒比日本影视| 国产在线视频你懂得| 成人免费福利| 外国成人免费视频| 久久久夜色精品亚洲| 亚洲另类图片色| 国产精品免费视频一区二区| 成人嫩草影院免费观看| 欧美黄页在线免费观看| 国产呦精品一区二区三区网站| 欧美羞羞免费网站| 国产日韩中文字幕| 日本免费高清视频| 欧美日韩一区二区三区四区不卡| 不卡一区二区在线| 亚洲国产欧美自拍| 国产精品免费久久久久久| 国产羞羞视频| 这里视频有精品| 亚洲国产成人在线| 亚洲精品国偷自产在线99热| 日产国产精品精品a∨| 91精品无人成人www| 秋霞午夜在线观看| 亚洲最新av| 欧美喷潮久久久xxxxx| 99www免费人成精品| 国产亚洲精aa在线看| 亚洲一区影音先锋| 免费看av大片| 久久精品电影一区二区| 久久精品日产第一区二区三区| 成人在线观看网站| 中文天堂在线一区| 国产69精品久久久久孕妇| 亚洲人成在线播放| 成人va天堂| 性视频一区二区三区| av色综合久久天堂av色综合在| 天堂久久一区| 国产一区二区三区观看| 一本色道久久综合亚洲aⅴ蜜桃 | 亚洲天天在线日亚洲洲精| 亚洲精品一区在线观看香蕉| 青青久久aⅴ北条麻妃| av女优在线播放| 亚洲精品久久久| 无颜之月在线看| 夜夜精品浪潮av一区二区三区 | 亚洲一二三区不卡| 日韩av最新在线观看| 成人av片网址| 蜜臀精品一区二区三区在线观看| 99国产精品一区| 日韩精品在线影院| 成人免费淫片aa视频免费| 亚洲最新中文字幕| 久久中文字幕国产| 亚洲天堂网一区| 国产成人免费av一区二区午夜| 国产一区中文字幕| 麻豆免费网站| 96视频在线观看欧美| 国产精品亚洲午夜一区二区三区| 这里只有精品视频在线观看| 精品捆绑美女sm三区| 不卡av在线网|