国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

消息隊列批量收發消息,請避開這五個坑!

開發 架構
使用批量消息,在一定程度上可以提高性能和吞吐量,但是確實也會存在一些問題,使用的時候要結合業務場景避開這些坑。

大家好,我是君哥。

使用消息隊列時,為了提高生產和消費的性能,有時會開啟批量處理。

在生產端,生產者發送的消息先發送到一個消息列表,積累到一定的消息量之后再批量發送給 Broker,如下圖:

在消費端,消費者拉取消息后先不立即處理,而是把消息轉存到一個內存隊列或數據庫,由業務線程去處理,如下圖:

無論是生產者做批量發送,還是消費者做批量處理,都需要考慮使用批量消息的業務場景,避免踩坑。下面看一下批量操作可能會遇到哪些坑。

批量大小

當生產者采用批量發送的方式來提高發送性能時,一定要考慮發送消息的批量大小。下面是 RocketMQ 批量發送的官方示例:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}

RocketMQ 默認消息大小是 4M,由 maxMessageSize 參數控制,如果批量消息大小超過 maxMessageSize,則會拋出異常。

如果遇到消息大小超過 maxMessageSize 的情況時,可以用下面方法進行處理:

  • 把這個參數改大,但需要考慮 Broker 的性能和網絡帶寬;
  • 將消息進行拆分后分批發送;
  • 對消息進行壓縮處理。

RabbitMQ 相關的 API 則提供了更加靈活的批量控制,對消息數量和消息大小都做了控制,下面看一下源碼:

冪等

消費端可以批量拉取消息進行消費,這樣可以減少拉取消息時的 RPC 次數,提升消費性能。比如在 RocketMQ 中,可以通過 Consumer 中的 pullBatchSize 來設置一次拉取的消息數量,通過 consumeMessageBatchMaxSize 參數來設置一次消費的消息數量。

但需要注意的是,如果批量消息中一條消息消費失敗了,這一批消息都需要進行重試,已經消費成功的消息會被重復消費,帶來業務問題。

為了不對業務造成影響,必須考慮冪等。一個簡單的方法是在消息中增加全局唯一 id 屬性,對消息消費結果進行記錄,消費成功后保存 id。這樣在消費消息之前先查詢是否存在消費成功的記錄,如果存在則直接返回處理成功。

時延

在使用消息隊列進行批量操作時,必須要考慮到時延問題。比如我們設置一個批次 100 條消息,積累夠 100 條消息后再發送,在消息量小的情況下,可能積累夠 100 條消息會很長時間,導致消費端拉取到一條消息時延很大。

雖然消息隊列的一個重要作用是削峰填谷,但在一些場景下,對消息的實時性也有要求。比如在車聯網的充電場景,車聯網平臺需要實時感知充電樁的狀態,如果充電樁積累夠一批消息再上報平臺,平臺獲取到的狀態會不準確,如果心跳消息延時太久,平臺會認為充電樁離線。

對于有時延要求又需要批量操作的場景,可以設置一個超時時間,超時后即使消息數量不夠,也會發送出去。看下 RabbitMQ 的處理:

public synchronized void send(String exchange, String routingKey, Message message, CorrelationData correlationData)
  throws AmqpException {
 if (correlationData != null) {
  //...
  super.send(exchange, routingKey, message, correlationData);
 }
 else {
  if (this.scheduledTask != null) {
   this.scheduledTask.cancel(false);
  }
  MessageBatch batch = this.batchingStrategy.addToBatch(exchange, routingKey, message);
  if (batch != null) {
   super.send(batch.getExchange(), batch.getRoutingKey(), batch.getMessage(), null);
  }
  //這里獲取到超時時間,到達超時時間后使用定時器將消息發送出去
  Date next = this.batchingStrategy.nextRelease();
  if (next != null) {
   this.scheduledTask = this.scheduler.schedule((Runnable) () -> releaseBatches(), next);
  }
 }
}

可靠性

使用批處理一定要考慮可靠性的問題。

在消費端,消費者批量拉取一批消息后把消息暫存到一個內存臨時隊列,然后多線程去臨時隊列消費消息,如果服務宕機,臨時隊列中的消息會丟失。

為了避免宕機引發的損失,可以拉取一批消息后保存到數據庫,然后給 Broker 返回 ACK,之后業務代碼去數據庫查詢消息并消費,不過要考慮數據庫大事務、鎖競爭等問題。

當然,對于一些消息丟失不敏感的場景,比如日志收集之類的,可靠性這個指標是不用太關注的。

特殊場景

因為批量消息有一些復雜性,消息隊列的部分特性不支持。

事務消息

批量消息會增加消息重試的難度,所以對于事務消息,建議使用單條消息,一條消息對應一個事務。

順序消息

順序消息的實現思路一般是生產者將消息發送到同一個分區,消費者綁定這個分區并使用單線程消費這個分區的消息。如果對同一個 Topic 下的同一個分區來實現批量發送,難度會增大。所以建議順序消息使用單條消息進行發送。

延時消息

如果延時消息使用批量進行發送,這一批消息的延時時間必須相同,同時要考慮批量消息的超時時間,超時時間太大會影響延時時間的準確性,生產端實現復雜度大大增加。

總結

使用批量消息,在一定程度上可以提高性能和吞吐量,但是確實也會存在一些問題,使用的時候要結合業務場景避開這些坑。

責任編輯:姜華 來源: 君哥聊技術
相關推薦

2022-07-26 20:00:35

場景RabbitMQMQ

2020-09-14 11:50:21

SpringBootRabbitMQJava

2017-07-28 09:30:55

2017-10-11 15:08:28

消息隊列常見

2025-03-28 10:06:01

架構輪詢延時

2022-08-22 08:45:57

Kafka網絡層源碼實現

2023-09-26 08:20:12

消息隊列RabbitMQ

2020-10-09 15:00:56

實時消息編程語言

2015-08-12 10:10:21

2025-03-28 12:20:00

代碼C#異步編程

2020-10-10 12:46:17

編程指南誤區

2016-08-24 15:43:01

2019-07-19 07:56:13

消息隊列消息代理消息中間件

2019-11-19 08:35:09

數據數據準備自動化

2017-02-27 14:25:50

Java隊列Web

2022-04-12 11:15:31

Redis消息隊列數據庫

2009-12-07 09:23:05

2010-04-21 12:39:48

Unix 消息隊列

2012-09-24 11:48:05

IBMdw

2010-04-13 17:00:43

Unix消息隊列
點贊
收藏

51CTO技術棧公眾號

欧美大片大片在线播放| 日韩精品一区二区三区中文字幕| 欧美色另类天堂2015| 中文在线www| 91麻豆精品国产无毒不卡在线观看 | 免费观看国产精品视频| 国产一区二区三区在线观看免费视频 | 在线激情影院一区| 伊人久久大香线蕉综合网蜜芽| 91精品黄色| 精品亚洲免费视频| 99热在线免费观看| 91精品免费在线观看| 欧美综合影院| 51国产成人精品午夜福中文下载| 久久99国产精品久久| 污视频网站免费| 欧美一区二区三区四区久久| av日韩在线免费观看| av一本久道久久波多野结衣| 国产高清亚洲一区| 日本中文字幕视频| 亚洲国产欧美一区| 国产伦精品一区二区三区千人斩 | 欧美精品福利在线| 伊人久久成人| 成年网站在线免费观看| 欧美三级一区二区| 亚洲一区 二区| 日韩免费一区二区三区| 亚洲乱码中文字幕| 范冰冰一级做a爰片久久毛片| 国产精品欧美在线| av在线播放不卡| 免费黄色在线| 日本不卡高字幕在线2019| 精品一区二区免费视频| 色视频www在线播放| 亚洲男人天堂九九视频| 国产精品久久观看| 国产天堂在线播放| 日韩av一区二区在线| 欧美成人午夜| 午夜激情av在线| 精品一区二区电影| 亚洲一级一区| 人人在线97| 欧美不卡视频一区发布| 美女视频一区在线观看| 国产成人天天5g影院在线观看| 欧美国产第二页| 国产精品亚洲成人| 超碰porn在线| 成人做爽爽免费视频| 国产亚洲va综合人人澡精品| 亚洲风情在线资源站| 欧美大片高清| 六月婷婷久久| 黄色精品在线看| 黄色成人美女网站| 妞干网在线观看视频| 亚洲成色www8888| 亚洲高清在线| 中午字幕在线观看| 日韩美女在线看| 国产精品你懂的在线| 91tv亚洲精品香蕉国产一区| 日本不卡一区| 色婷婷综合久久久中文一区二区 | 久久久精品动漫| 偷偷要91色婷婷| 欧美三级自拍| 美女网站色免费| 欧美成年人视频| 99久久综合国产精品| 成人直播视频| 亚洲第一精品区| 精品国产区一区| 日韩电影在线一区二区三区| 黄色av免费在线| 久久亚洲综合网| 911国产精品| 美女久久一区| 欧美日韩xx| 蜜桃导航-精品导航| 3751色影院一区二区三区| 国产一区白浆| 天堂成人av| 中文字幕一区二区三区在线乱码| 91精品国产品国语在线不卡| 日韩网站在线| 青青草原国产在线| 日韩国产一区久久| 亚洲另类xxxx| 成人福利在线看| 日韩在线你懂得| 日韩大片一区二区| 国产v综合ⅴ日韩v欧美大片| 亚洲亚洲精品在线观看| 亚洲精品成人无限看| 91在线看片| 亚洲一区三区| 精品国内亚洲在观看18黄| 久久久精品黄色| 婷婷精品在线| 欧美新色视频| 日韩中文字幕一区| 亚洲最新在线视频| 国产精品蜜臀在线观看| 成人三级视频| 国内精品久久久久国产| 在线播放 亚洲| 欧美成人在线免费| 亚洲一区二区三区爽爽爽爽爽| 亚洲精品一区二区在线看| 黄色网页在线免费看| 一区中文字幕在线观看| 久久视频在线看| 亚洲高清在线视频| 9国产精品视频| 精品日韩视频| 成年人福利视频| 精品日韩美女| 色偷偷av一区二区三区| 亚洲人吸女人奶水| 亚洲一卡久久| 色成人综合网| 麻豆传媒在线视频| 蜜桃传媒视频第一区入口在线看| 亚洲精品久久久久久久久久久久| 久久美女高清视频| 欧美国产三区| 自拍偷自拍亚洲精品被多人伦好爽| 色婷婷综合久久久久中文字幕 | 精品福利视频一区二区三区| 国产iv一区二区三区| 在线亚洲a色| v天堂福利视频在线观看| 福利视频一二区| 国产女人18毛片水18精品| 日韩色在线观看| 国产欧美一区二区在线| 欧美三级网页| 四虎国产精品免费久久| 日韩电影网址| 久久艹国产精品| 国产精品毛片a∨一区二区三区|国| 欧美一级二级三级乱码| 国产女人水真多18毛片18精品视频| 欧美日本国产| 视频在线观看免费影院欧美meiju| 成人亚洲综合天堂| 中文字幕在线观看第三页| 精品无人区一区二区三区竹菊| 欧美精品少妇videofree| 欧美色网一区二区| 国产人成亚洲第一网站在线播放| 日韩午夜电影| 亚洲视频分类| 伊人久久国产| 欧洲成人av| 黄色片在线免费| 欧美一区二区视频在线| 欧美一级大胆视频| 亚洲欧美中文日韩在线| 精品女厕一区二区三区| 26uuu亚洲婷婷狠狠天堂| 亚洲免费成人| 蜜桃tv一区二区三区| 蜜臀国产一区| 日中文字幕在线| 蜜桃免费在线视频| 中文字幕一区综合| 成人欧美在线视频| 欧美精品午夜视频| 亚洲国产成人av在线| 高跟丝袜一区二区三区| 国产欧美日韩精品a在线观看| 久久亚洲欧洲| 亚洲精品小说| 欧美三级午夜理伦三级在线观看 | 黄频免费在线观看| 欧美一区二区视频| 天堂一区在线观看| 欧美 日韩 国产精品| 国产成人精品日本亚洲11 | 国产成人在线一区| 久久精品一区中文字幕| 精品国产亚洲一区二区三区在线观看| 天天综合色天天综合| 日本一区二区三区四区| 国产乱子伦视频一区二区三区 | japanese色系久久精品| 女海盗2成人h版中文字幕| 久草电影在线| 濑亚美莉vs黑人在线观看| 午夜肉伦伦影院| 免费观看中文字幕| 日韩成人av影视| 国产99久久精品一区二区永久免费|