国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

阿里二面:RocketMQ 消費(fèi)者拉取一批消息,其中部分消費(fèi)失敗了,偏移量怎樣更新?

人工智能 機(jī)器學(xué)習(xí)
如果一批消息按照順序消費(fèi),是不可能出現(xiàn)第 100 條消息消費(fèi)成功了,但第 50 條消費(fèi)失敗的情況,因?yàn)榈?50 條消息失敗的時(shí)候,應(yīng)該退出循環(huán),不再繼續(xù)進(jìn)行消費(fèi)。

大家好,我是君哥。

最近有讀者參加面試時(shí)被問了一個(gè)問題,如果消費(fèi)者拉取了一批消息,比如 100 條,第 100 條消息消費(fèi)成功了,但是第 50 條消費(fèi)失敗,偏移量會(huì)怎樣更新?就著這個(gè)問題,今天來聊一下,如果一批消息有消費(fèi)失敗的情況時(shí),偏移量怎么保存。

1 拉取消息

1.1 封裝拉取請(qǐng)求

以 RocketMQ 推模式為例,RocketMQ 消費(fèi)者啟動(dòng)代碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

上面的 DefaultMQPushConsumer 是一個(gè)推模式的消費(fèi)者,啟動(dòng)方法是 start。消費(fèi)者啟動(dòng)后會(huì)觸發(fā)重平衡線程(RebalanceService),這個(gè)線程的任務(wù)是在死循環(huán)中不停地進(jìn)行重平衡,最終封裝拉取消息的請(qǐng)求到 pullRequestQueue。這個(gè)過程涉及到的 UML 類圖如下:

圖片

1.2 處理拉取請(qǐng)求

封裝好拉取消息的請(qǐng)求 PullRequest 后,RocketMQ 就會(huì)不停地從 pullRequestQueue 獲取消息拉取請(qǐng)求進(jìn)行處理。UML 類圖如下:

圖片

拉取消息的入口方法是一個(gè)死循環(huán),代碼如下:

//PullMessageService
public void run(){
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

這里拉取到消息后,提交給 PullCallback 這個(gè)回調(diào)函數(shù)進(jìn)行處理。

拉取到的消息首先被 put 到 ProcessQueue 中的 msgTreeMap 上,然后被封裝到 ConsumeRequest 這個(gè)線程類來處理。把代碼精簡(jiǎn)后,ConsumeRequest 處理邏輯如下:

//ConsumeMessageConcurrentlyService.java
public void run(){
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
try {
//1.執(zhí)行消費(fèi)邏輯,這里的邏輯是在文章開頭的代碼中定義的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
}
if (!processQueue.isDropped()) {
//2.處理消費(fèi)結(jié)果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

2 處理消費(fèi)結(jié)果

2.1 并發(fā)消息

并發(fā)消息處理消費(fèi)結(jié)果的代碼做精簡(jiǎn)后如下:

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
){
int ackIndex = context.getAckIndex();
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
break;
case RECONSUME_LATER:
break;
default:
break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
}
break;
default:
break;
}

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

從上面的代碼可以看出,如果處理消息的邏輯是串行的,比如文章開頭的代碼使用 for 循環(huán)來處理消息,那如果在某一條消息處理失敗了,直接退出循環(huán),給 ConsumeConcurrentlyContext 的 ackIndex 變量賦值為消息列表中失敗消息的位置,這樣這條失敗消息后面的消息就不再處理了,發(fā)送給 Broker 等待重新拉取。代碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
for (int i = 0; i < msgs.size(); i++) {
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
context.setAckIndex(i);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

消費(fèi)成功的消息則從 ProcessQueue 中的 msgTreeMap 中移除,并且返回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:集群模式偏移量保存在 Broker 端,更新偏移量需要發(fā)送消息到 Broker,而廣播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。

如果處理消息的邏輯是并行的,處理消息失敗后給 ackIndex 賦值是沒有意義的,因?yàn)榭赡苡卸鄺l消息失敗,給 ackIndex 變量賦值并不準(zhǔn)確。最好的方法就是給 ackIndex 賦值 0,整批消息全部重新消費(fèi),這樣又可能帶來冥等問題。

2.2 順序消息

對(duì)于順序消息,從 msgTreeMap 取出消息后,先要放到 consumingMsgOrderlyTreeMap 上面,更新偏移量時(shí),是從 consumingMsgOrderlyTreeMap 上取最大的消息偏移量(lastKey)。

3 總結(jié)

回到開頭的問題,如果一批消息按照順序消費(fèi),是不可能出現(xiàn)第 100 條消息消費(fèi)成功了,但第 50 條消費(fèi)失敗的情況,因?yàn)榈?50 條消息失敗的時(shí)候,應(yīng)該退出循環(huán),不再繼續(xù)進(jìn)行消費(fèi)。

如果是并發(fā)消費(fèi),如果出現(xiàn)了這種情況,建議是整批消息全部重新消費(fèi),也就是給 ackIndex 賦值 0,這樣必須考慮冥等問題。

責(zé)任編輯:武曉燕 來源: 君哥聊技術(shù)
相關(guān)推薦

2022-03-14 11:05:01

RocketMQRedis緩存

2022-06-02 10:54:16

BrokerRocketMQ

2023-03-14 08:45:25

RocketMQ消息消費(fèi)

2022-08-15 10:45:34

RocketMQ消息隊(duì)列

2021-12-17 08:17:00

RocketMQ數(shù)據(jù)結(jié)構(gòu)消息中間件

2024-01-24 09:00:31

SSD訂閱關(guān)系內(nèi)存

2024-04-22 00:00:00

RocketMQ優(yōu)化位點(diǎn)

2022-07-07 09:00:49

RocketMQ消費(fèi)者消息消費(fèi)

2022-11-08 07:36:17

RocketMQ消費(fèi)者消息堆積

2009-04-15 11:17:23

2021-07-12 10:25:03

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2011-08-05 16:21:24

2011-07-22 16:25:38

CA TechnoloIT消費(fèi)化

2023-06-01 08:08:38

kafka消費(fèi)者分區(qū)策略

2024-03-14 11:58:43

2021-04-20 08:32:51

消息MQ隊(duì)列

2025-02-26 07:53:21

2015-08-26 09:39:30

java消費(fèi)者

2022-05-09 11:15:05

RocketMQPULL 模式PUSH 模式

2021-03-01 07:31:53

消息支付高可用
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

亚洲va电影大全| 亚洲妇女屁股眼交7| 亚洲制服丝袜av| 电影中文字幕一区二区| 2020国产精品久久精品美国| 99在线热播| 欧美2区3区4区| 欧美成人aa大片| 免费观看v片在线观看| 99久久亚洲一区二区三区青草| 国产午夜精品一区| 国产综合久久久| 欧美激情高清视频| 悠悠资源网亚洲青| 欧美夫妻性生活| 91精品久久久久久9s密挑| 97精品久久久久中文字幕| 欧美午夜精品久久久久免费视 | 亚州一区二区| 日韩精品黄色网| 在线视频婷婷| 欧美日韩美女视频| jizz欧美性11| 97久久超碰国产精品电影| 亚洲欧美日韩另类精品一区二区三区| 成人毛片免费看| 欧美黄色成人网| 日韩成人综合网站| 日韩禁在线播放| 黄页网站大全在线免费观看| 色哟哟日韩精品| 500福利第一精品导航| 亚洲国产精品精华液2区45| 91免费黄视频| 成人不卡免费av| 免费不卡av在线| 国产a久久麻豆| 国产www免费| 99精品视频一区二区| 国产白丝袜美女久久久久| 成人激情小说乱人伦| 国产成年人在线观看| 日本欧美一区二区三区乱码 | 久久精品国产福利| 亚洲视频axxx| 91久久久久久白丝白浆欲热蜜臀| 亚洲激情免费观看| 日韩欧美精品电影| 最近更新的2019中文字幕| 国产精品毛片久久久久久久久久99999999| 91精品国产手机| 国产精品亚洲美女av网站| 国产精品你懂的在线欣赏| 91亚洲免费视频| 久久久久久黄色| 亚洲男人天堂九九视频| 五月天av在线| 久久久国产一区二区三区| 一区二区精彩视频| 欧美亚洲免费电影| 色综合天天爱| 久久精品二区| 国内精品伊人久久久久av影院| 免费在线看黄色片| 国产精品国产精品国产专区不蜜| 久久国产情侣| 欧美精品久久久久久久久老牛影院| 香蕉视频在线免费看| 亚洲男人av电影| **爰片久久毛片| 亚洲自拍偷拍网址| 日韩精品视频网站| 成人综合视频在线| 亚洲地区一二三色| 青春草在线免费视频| 色偷偷噜噜噜亚洲男人的天堂| 亚洲香蕉视频| 品久久久久久久久久96高清| 福利一区在线观看| 一二三四中文在线| 日韩女优毛片在线| 国产精品国产三级在线观看| 国内精品免费午夜毛片| 欧美大片专区| 黄色激情在线视频| 狠狠操狠狠色综合网| 一个人www视频在线免费观看| 国模gogo一区二区大胆私拍 | 91精品国自产在线观看| 日本视频免费一区| 97影视在线观看| 欧美一区二区三区在线看| 综合久草视频| 久久偷看各类wc女厕嘘嘘偷窃| 91小视频免费观看| 免费在线观看av| 国内精品免费午夜毛片| 亚洲无线视频| 日韩一级在线免费观看| 欧美日韩一区二区不卡| 91久久爱成人| 精品国产一区二区亚洲人成毛片| 亚洲一区自拍偷拍| www.中文字幕久久久| 中文字幕亚洲综合久久筱田步美 | www欧美xxxx| 日本乱人伦a精品| 日本欧美韩国一区三区| 国产激情三区| 国产视频久久网| 久久久影院免费| 国产黄色一级网站| 欧美精品777| 国产一区二区在线| 久久精品国产精品亚洲色婷婷| 欧美精品一级二级| 一呦二呦三呦国产精品| 午夜久久久久久久久久久| 欧美性猛交xxxx乱大交| 91成人在线精品视频| 中文字幕久久一区| 欧美少妇性性性| 成人午夜av| 成人av小说网| 久久精品亚洲精品| 黄一区二区三区| h网站久久久| 97影院在线午夜| 最新欧美精品一区二区三区| 免费v片在线观看| 国产精品三区四区| 亚洲一二三区在线观看| 日韩高清二区| 1024av视频| 亚洲网站在线播放| 捆绑紧缚一区二区三区视频 | 69中国xxxxxxxxx69| 一区二区三区国产视频| 日韩高清欧美激情| 9i精品一二三区| 亚洲综合日韩中文字幕v在线| 亚洲女人的天堂| 国产精品美女在线观看直播| 69sex久久精品国产麻豆| 亚洲黄色av网站| 日韩精品乱码免费| 草莓福利社区在线| 欧美主播一区二区三区美女 久久精品人 | 黑人巨大精品| 日韩资源av在线| 精品精品国产高清a毛片牛牛| 激情综合自拍| gogogo高清在线观看免费完整版| 成人网在线免费观看| 狠狠操狠狠色综合网| 99久久精品费精品国产风间由美| 嫩草影院网站在线| 亚洲成av人片一区二区梦乃| 日韩三级中文字幕| 一区二区中文字幕| 色综合久久中文综合久久97| 成人国产精品视频| 椎名由奈jux491在线播放 | 精品国产一区二区三区免费| 精品高清一区二区三区| 欧美日韩伦理| 天堂男人av| 96精品久久久久中文字幕| 欧美小视频在线观看| 欧美午夜一区| 性爱视频在线播放| 免费观看国产视频在线| 日韩在线视频观看| 国产精品女主播av| 欧洲激情综合| 国产黄在线播放| 亚洲成人网上| 日韩亚洲第一页| 国产精品成人免费| 91视频综合| 国产传媒在线播放| 99久re热视频精品98| 欧美成人国产va精品日本一级| 国产精品欧美精品| 中文字幕一区二区三区久久网站| 成人资源www网在线最新版| 一本久道久久综合狠狠爱亚洲精品| 亚洲精品一区中文| 中日韩av电影| 久久国产影院| 国产区美女在线| 成人性做爰aaa片免费看不忠| 国产成人av网| 欧美精品色一区二区三区| 国产成人自拍网| 竹菊久久久久久久| 国产在线观看91| 国产一二三四在线视频| 成人黄色免费网站在线观看| 亚洲第一福利网站|