国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

Springboot + Rabbitmq 用了消息確認(rèn)機制,感覺掉坑里了

開發(fā) 前端
最近部門號召大伙多組織一些技術(shù)分享會,說是要活躍公司的技術(shù)氛圍,但早就看穿一切的我知道,這 T M 就是為了刷KPI。不過,話說回來這的確是件好事,與其開那些沒味的扯皮會,多做技術(shù)交流還是很有助于個人成長的。

本文轉(zhuǎn)載自微信公眾號「程序員內(nèi)點事」,作者程序員內(nèi)點事。轉(zhuǎn)載本文請聯(lián)系程序員內(nèi)點事公眾號。

最近部門號召大伙多組織一些技術(shù)分享會,說是要活躍公司的技術(shù)氛圍,但早就看穿一切的我知道,這 T M 就是為了刷KPI。不過,話說回來這的確是件好事,與其開那些沒味的扯皮會,多做技術(shù)交流還是很有助于個人成長的。

[[331829]]

 

這次我分享的是 springboot + rabbitmq 如何實現(xiàn)消息確認(rèn)機制,以及在實際開發(fā)中的一點踩坑經(jīng)驗,其實整體的內(nèi)容比較簡單,有時候事情就是這么神奇,越是簡單的東西就越容易出錯。

可以看到使用了 RabbitMQ 以后,我們的業(yè)務(wù)鏈路明顯變長了,雖然做到了系統(tǒng)間的解耦,但可能造成消息丟失的場景也增加了。例如:

  • 消息生產(chǎn)者 - > rabbitmq服務(wù)器(消息發(fā)送失敗)
  • rabbitmq服務(wù)器自身故障導(dǎo)致消息丟失
  • 消息消費者 - > rabbitmq服務(wù)(消費消息失敗)

所以說能不使用中間件就盡量不要用,如果為了用而用只會徒增煩惱。開啟消息確認(rèn)機制以后,盡管很大程度上保證了消息的準(zhǔn)確送達(dá),但由于頻繁的確認(rèn)交互,rabbitmq 整體效率變低,吞吐量下降嚴(yán)重,不是非常重要的消息真心不建議你用消息確認(rèn)機制。

 

下邊我們先來實現(xiàn)springboot + rabbitmq消息確認(rèn)機制,再對遇到的問題做具體分析。

一、準(zhǔn)備環(huán)境

1、引入 rabbitmq 依賴包

  1. <dependency> 
  2.     <groupId>org.springframework.boot</groupId> 
  3.     <artifactId>spring-boot-starter-amqp</artifactId> 
  4. </dependency> 

2、修改 application.properties 配置

配置中需要開啟 發(fā)送端和 消費端 的消息確認(rèn)。

  1. spring.rabbitmq.host=127.0.0.1 
  2. spring.rabbitmq.port=5672 
  3. spring.rabbitmq.username=guest 
  4. spring.rabbitmq.password=guest 
  5.  
  6. # 發(fā)送者開啟 confirm 確認(rèn)機制 
  7. spring.rabbitmq.publisher-confirms=true 
  8. # 發(fā)送者開啟 return 確認(rèn)機制 
  9. spring.rabbitmq.publisher-returns=true 
  10. #################################################### 
  11. # 設(shè)置消費端手動 ack 
  12. spring.rabbitmq.listener.simple.acknowledge-mode=manual 
  13. # 是否支持重試 
  14. spring.rabbitmq.listener.simple.retry.enabled=true 

3、定義 Exchange 和 Queue

定義交換機 confirmTestExchange 和隊列 confirm_test_queue ,并將隊列綁定在交換機上。

  1. @Configuration 
  2. public class QueueConfig { 
  3.  
  4.     @Bean(name = "confirmTestQueue"
  5.     public Queue confirmTestQueue() { 
  6.         return new Queue("confirm_test_queue"truefalsefalse); 
  7.     } 
  8.  
  9.     @Bean(name = "confirmTestExchange"
  10.     public FanoutExchange confirmTestExchange() { 
  11.         return new FanoutExchange("confirmTestExchange"); 
  12.     } 
  13.  
  14.     @Bean 
  15.     public Binding confirmTestFanoutExchangeAndQueue( 
  16.             @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange, 
  17.             @Qualifier("confirmTestQueue") Queue confirmTestQueue) { 
  18.         return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange); 
  19.     } 

rabbitmq 的消息確認(rèn)分為兩部分:發(fā)送消息確認(rèn) 和 消息接收確認(rèn)。

在這里插入圖片描述

 

二、消息發(fā)送確認(rèn)

發(fā)送消息確認(rèn):用來確認(rèn)生產(chǎn)者 producer 將消息發(fā)送到 broker ,broker 上的交換機 exchange 再投遞給隊列 queue的過程中,消息是否成功投遞。

消息從 producer 到 rabbitmq broker有一個 confirmCallback 確認(rèn)模式。

消息從 exchange 到 queue 投遞失敗有一個 returnCallback 退回模式。

我們可以利用這兩個Callback來確保消的100%送達(dá)。

1、 ConfirmCallback確認(rèn)模式

消息只要被 rabbitmq broker 接收到就會觸發(fā) confirmCallback 回調(diào) 。

  1. @Slf4j 
  2. @Component 
  3. public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { 
  4.      
  5.     @Override 
  6.     public void confirm(CorrelationData correlationData, boolean ack, String cause) { 
  7.  
  8.         if (!ack) { 
  9.             log.error("消息發(fā)送異常!"); 
  10.         } else { 
  11.             log.info("發(fā)送者爸爸已經(jīng)收到確認(rèn),correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause); 
  12.         } 
  13.     } 

實現(xiàn)接口 ConfirmCallback ,重寫其confirm()方法,方法內(nèi)有三個參數(shù)correlationData、ack、cause。

  • correlationData:對象內(nèi)部只有一個 id 屬性,用來表示當(dāng)前消息的唯一性。
  • ack:消息投遞到broker 的狀態(tài),true表示成功。
  • cause:表示投遞失敗的原因。

但消息被 broker 接收到只能表示已經(jīng)到達(dá) MQ服務(wù)器,并不能保證消息一定會被投遞到目標(biāo) queue 里。所以接下來需要用到 returnCallback 。

2、 ReturnCallback 退回模式

如果消息未能投遞到目標(biāo) queue 里將觸發(fā)回調(diào) returnCallback ,一旦向 queue 投遞消息未成功,這里一般會記錄下當(dāng)前消息的詳細(xì)投遞數(shù)據(jù),方便后續(xù)做重發(fā)或者補償?shù)炔僮鳌?/p>

  1. @Slf4j 
  2. @Component 
  3. public class ReturnCallbackService implements RabbitTemplate.ReturnCallback { 
  4.  
  5.     @Override 
  6.     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { 
  7.         log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); 
  8.     } 

實現(xiàn)接口ReturnCallback,重寫 returnedMessage() 方法,方法有五個參數(shù)message(消息體)、replyCode(響應(yīng)code)、replyText(響應(yīng)內(nèi)容)、exchange(交換機)、routingKey(隊列)。

下邊是具體的消息發(fā)送,在rabbitTemplate中設(shè)置 Confirm 和 Return 回調(diào),我們通過setDeliveryMode()對消息做持久化處理,為了后續(xù)測試創(chuàng)建一個 CorrelationData對象,添加一個id 為10000000000。

  1. @Autowired 
  2.     private RabbitTemplate rabbitTemplate; 
  3.  
  4.     @Autowired 
  5.     private ConfirmCallbackService confirmCallbackService; 
  6.  
  7.     @Autowired 
  8.     private ReturnCallbackService returnCallbackService; 
  9.  
  10.     public void sendMessage(String exchange, String routingKey, Object msg) { 
  11.  
  12.         /** 
  13.          * 確保消息發(fā)送失敗后可以重新返回到隊列中 
  14.          * 注意:yml需要配置 publisher-returnstrue 
  15.          */ 
  16.         rabbitTemplate.setMandatory(true); 
  17.  
  18.         /** 
  19.          * 消費者確認(rèn)收到消息后,手動ack回執(zhí)回調(diào)處理 
  20.          */ 
  21.         rabbitTemplate.setConfirmCallback(confirmCallbackService); 
  22.  
  23.         /** 
  24.          * 消息投遞到隊列失敗回調(diào)處理 
  25.          */ 
  26.         rabbitTemplate.setReturnCallback(returnCallbackService); 
  27.  
  28.         /** 
  29.          * 發(fā)送消息 
  30.          */ 
  31.         rabbitTemplate.convertAndSend(exchange, routingKey, msg, 
  32.                 message -> { 
  33.                     message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); 
  34.                     return message; 
  35.                 }, 
  36.                 new CorrelationData(UUID.randomUUID().toString())); 
  37.     } 

三、消息接收確認(rèn)

消息接收確認(rèn)要比消息發(fā)送確認(rèn)簡單一點,因為只有一個消息回執(zhí)(ack)的過程。使用@RabbitHandler注解標(biāo)注的方法要增加 channel(信道)、message 兩個參數(shù)。

  1. @Slf4j 
  2. @Component 
  3. @RabbitListener(queues = "confirm_test_queue"
  4. public class ReceiverMessage1 { 
  5.      
  6.     @RabbitHandler 
  7.     public void processHandler(String msg, Channel channel, Message message) throws IOException { 
  8.  
  9.         try { 
  10.             log.info("小富收到消息:{}", msg); 
  11.  
  12.             //TODO 具體業(yè)務(wù) 
  13.              
  14.             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 
  15.  
  16.         }  catch (Exception e) { 
  17.              
  18.             if (message.getMessageProperties().getRedelivered()) { 
  19.                  
  20.                 log.error("消息已重復(fù)處理失敗,拒絕再次接收..."); 
  21.                  
  22.                 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息 
  23.             } else { 
  24.                  
  25.                 log.error("消息即將再次返回隊列處理..."); 
  26.                  
  27.                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue);  
  28.             } 
  29.         } 
  30.     } 

消費消息有三種回執(zhí)方法,我們來分析一下每種方法的含義。

1、basicAck

basicAck:表示成功確認(rèn),使用此回執(zhí)方法后,消息會被rabbitmq broker 刪除。

  1. void basicAck(long deliveryTag, boolean multiple)  

deliveryTag:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加。手動消息確認(rèn)模式下,我們可以對指定deliveryTag的消息進(jìn)行ack、nack、reject等操作。

multiple:是否批量確認(rèn),值為 true 則會一次性 ack所有小于當(dāng)前消息 deliveryTag 的消息。

舉個栗子: 假設(shè)我先發(fā)送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認(rèn),當(dāng)我發(fā)第四條消息此時deliveryTag為8,multiple設(shè)置為 true,會將5、6、7、8的消息全部進(jìn)行確認(rèn)。

2、basicNack

basicNack :表示失敗確認(rèn),一般在消費消息業(yè)務(wù)異常時用到此方法,可以將消息重新投遞入隊列。

  1. void basicNack(long deliveryTag, boolean multiple, boolean requeue) 

deliveryTag:表示消息投遞序號。

multiple:是否批量確認(rèn)。

requeue:值為 true 消息將重新入隊列。

3、basicReject

basicReject:拒絕消息,與basicNack區(qū)別在于不能進(jìn)行批量操作,其他用法很相似。

  1. void basicReject(long deliveryTag, boolean requeue) 

deliveryTag:表示消息投遞序號。

requeue:值為 true 消息將重新入隊列。

四、測試

發(fā)送消息測試一下消息確認(rèn)機制是否生效,從執(zhí)行結(jié)果上看發(fā)送者發(fā)消息后成功回調(diào),消費端成功的消費了消息。

用抓包工具Wireshark 觀察一下rabbitmq amqp協(xié)議交互的變化,也多了 ack 的過程。

 

五、踩坑日志

1、別忘確認(rèn)消息

這是一個非常沒技術(shù)含量的坑,但卻是非常容易犯錯的地方。

開啟消息確認(rèn)機制,消費消息別忘了channel.basicAck,否則消息會一直存在,導(dǎo)致重復(fù)消費。

 

2、消息無限投遞

在我最開始接觸消息確認(rèn)機制的時候,消費端代碼就像下邊這樣寫的,思路很簡單:處理完業(yè)務(wù)邏輯后確認(rèn)消息, int a = 1 / 0 發(fā)生異常后將消息重新投入隊列。

  1. @RabbitHandler 
  2.     public void processHandler(String msg, Channel channel, Message message) throws IOException { 
  3.  
  4.         try { 
  5.             log.info("消費者 2 號收到:{}", msg); 
  6.  
  7.             int a = 1 / 0; 
  8.  
  9.             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 
  10.  
  11.         } catch (Exception e) { 
  12.  
  13.             channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue); 
  14.         } 
  15.     } 

但是有個問題是,業(yè)務(wù)代碼一旦出現(xiàn) bug 99.9%的情況是不會自動修復(fù),一條消息會被無限投遞進(jìn)隊列,消費端無限執(zhí)行,導(dǎo)致了死循環(huán)。

在這里插入圖片描述

 

本地的CPU被瞬間打滿了,大家可以想象一下當(dāng)時在生產(chǎn)環(huán)境導(dǎo)致服務(wù)死機,我是有多慌。

而且rabbitmq management 只有一條未被確認(rèn)的消息。

在這里插入圖片描述

 

經(jīng)過測試分析發(fā)現(xiàn),當(dāng)消息重新投遞到消息隊列時,這條消息不會回到隊列尾部,仍是在隊列頭部。

消費者會立刻消費這條消息,業(yè)務(wù)處理再拋出異常,消息再重新入隊,如此反復(fù)進(jìn)行。導(dǎo)致消息隊列處理出現(xiàn)阻塞,導(dǎo)致正常消息也無法運行。

而我們當(dāng)時的解決方案是,先將消息進(jìn)行應(yīng)答,此時消息隊列會刪除該條消息,同時我們再次發(fā)送該消息到消息隊列,異常消息就放在了消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業(yè)務(wù)的進(jìn)行。

  1. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 
  2. // 重新發(fā)送消息到隊尾 
  3. channel.basicPublish(message.getMessageProperties().getReceivedExchange(), 
  4.                     message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, 
  5.                     JSON.toJSONBytes(msg)); 

但這種方法并沒有解決根本問題,錯誤消息還是會時不時報錯,后面優(yōu)化設(shè)置了消息重試次數(shù),達(dá)到了重試上限以后,手動確認(rèn),隊列刪除此消息,并將消息持久化入MySQL并推送報警,進(jìn)行人工處理和定時任務(wù)做補償。

3、重復(fù)消費

 

如何保證 MQ 的消費是冪等性,這個需要根據(jù)具體業(yè)務(wù)而定,可以借助MySQL、或者redis將消息持久化,通過再消息中的唯一性屬性校驗。

 

責(zé)任編輯:武曉燕 來源: 程序員內(nèi)點事
相關(guān)推薦

2023-03-10 08:27:07

for循環(huán)項目線性結(jié)構(gòu)

2025-09-02 07:39:16

2023-06-01 08:54:08

RabbitMQ確認(rèn)機制生產(chǎn)端

2021-09-07 10:38:37

RabbitMQ 高可用消費

2022-07-26 00:00:00

MQ消息中間件

2021-07-19 09:42:45

Spring Boot@ValueJava

2020-09-14 11:50:21

SpringBootRabbitMQJava

2023-03-06 08:16:04

SpringRabbitMQ

2011-05-31 11:55:00

Android 消息機制

2025-06-12 09:46:15

2024-11-20 08:09:19

RabbitMQ項目客戶端

2018-06-01 09:11:23

2010-09-29 09:10:32

云計算

2023-09-07 10:31:27

2020-12-17 08:02:42

MyBatis插件框架

2023-12-04 09:23:49

分布式消息

2022-01-27 08:12:50

Potplayer播放器

2025-09-08 02:11:00

Token語言類型項目

2016-03-02 09:34:03

runtime消息ios開發(fā)

2013-04-11 12:40:16

Android消息機制
點贊
收藏

51CTO技術(shù)棧公眾號

色综合天天性综合| 日韩综合网站| av最新在线| 亚洲伊人伊色伊影伊综合网| 最新欧美日韩亚洲| 欧美aaa在线| 欧美牲交a欧美牲交aⅴ免费下载| 中文av一区特黄| 日本视频在线播放| 7777精品伊人久久久大香线蕉经典版下载| 欧美视频免费看| 精品综合久久久| 午夜精品成人在线| 素人一区二区三区| 亚洲第一福利视频| 亚洲va在线| 精品人伦一区二区三区| 久久久久一区| 91免费看网站| 亚洲欧美网站| 国产99久久九九精品无码| 国产欧美日韩久久| 男操女在线观看| 日韩电影免费在线观看中文字幕| 国产一区二区三区成人欧美日韩在线观看| 国产又爽又黄ai换脸| 亚洲人体大胆视频| 大地资源中文在线观看免费版| 欧美不卡在线视频| 国产在线美女| 人体精品一二三区| 北条麻妃国产九九精品视频| 成人三级网址| 美女av一区二区| 欧美午夜电影在线观看 | 99re久久最新地址获取| 欧美激情一区二区三区不卡| 福利资源在线久| 日本视频在线免费观看| 在线亚洲欧美专区二区| 欧美高清xxx| 日日碰狠狠躁久久躁婷婷| 欧美中文字幕一区二区三区亚洲| 免费在线稳定资源站| 9191久久久久久久久久久| 国产理论在线| 日韩av电影在线免费播放| 石原莉奈一区二区三区在线观看 | 精品国产拍在线观看| 欧美精品一区三区在线观看| 日韩中文字幕高清在线观看| xvideos成人免费中文版| 日韩在线观看中文字幕| www.69av| 日本午夜精品一区二区| 亚洲乱码一区二区三区在线观看| 久久久久久久久成人| 国产女人在线视频| 国产在线拍偷自揄拍精品| 国语对白精品一区二区| 成人高清dvd| 久久精品亚洲94久久精品| 91久久综合| 嫩草研究院在线观看| 国产成人精品综合在线观看 | 色综合久久综合网97色综合| 成人自拍视频网| 精品91免费| 亚洲黄色免费网站| 国产精品毛片无码| 国产av不卡一区二区| 欧美日韩在线直播| 天天做天天爱综合| 国产成免费视频| 久久精品国产久精国产思思| 亚洲图片在线观看| 欧美成人性生活视频| 久色视频在线播放| 欧美日韩免费不卡视频一区二区三区| 亚洲午夜天堂| 伊人情人网综合| 精品久久久久一区二区国产| 国产成人高清视频| 日本成人黄色免费看| 色妞在线综合亚洲欧美| 五月天国产精品| 蜜臀av性久久久久蜜臀aⅴ流畅| 国产精品亚洲欧美日韩一区在线| ·天天天天操| 在线欧美日韩精品| 国产伦精品一区二区三区视频| 中文在线有码| 国产一区二区三区小说| 欧美一级在线亚洲天堂| 五月天亚洲婷婷| 久久99精品国产麻豆婷婷| 粉嫩一区二区| a4yy在线播放免费观看视频| 久久精品国产一区二区三区日韩 | 黄色一级片网址| 国产女主播一区二区| 国产精品综合久久久| 国产www精品| 先锋资源久久| 日韩一区二区三区高清在线观看| 在线看无码的免费网站| 最好看的中文字幕久久| 亚洲欧洲色图综合| 亚洲综合成人网| 国产精品一区二区免费不卡| 国产·精品毛片| 亚洲v天堂v手机在线| 成人亚洲一区二区| 中日韩免视频上线全都免费| 国产秀色在线www免费观看| 国产高清视频网站| 国产在线视频91| 亚洲在线色站| 国产伦精品一区二区三区照片| 日韩一区二区电影在线观看| 在线观看欧美激情| 九九热在线免费| 69av二区| 成人黄色免费在线观看| 99国产在线视频| 另类视频在线观看+1080p| 超碰97在线人人| 国产成人精品在线观看| 欧美激情国产精品| 精品久久久999| 亚洲激情国产| 91精品一区国产高清在线gif| 超碰cao国产精品一区二区| 欧美调教sm| 黑森林国产精品av| 1区2区3区在线视频| 亚洲福利二区| 亚洲天堂色网站| 中文字幕日韩一区二区| 久久一区二区三区四区| 羞羞视频在线观看| 国产精品一区二区小说| 欧美福利精品| 霍思燕三级露全乳照| 久热免费在线观看| 免费av在线网站| 99国产盗摄| 一区二区三区四区不卡| 国产性xxxx18免费观看视频| 7777在线视频| 人人澡人一摸人人添| 黄页网站免费在线观看| 日漫免费在线观看网站| 无码播放一区二区三区| 亚洲国产资源| 欧美午夜网站| 不卡的电视剧免费网站有什么| 91精品在线免费| 日韩精品一区二区三区在线播放 | 中文字幕日韩欧美在线| 日本午夜在线亚洲.国产| 国产精品一区在线观看| 国产福利片一区二区| 91av在线免费播放| 理论视频在线| 樱桃视频成人在线观看| 先锋影音国产精品| 天天影视久久综合| 粉嫩的18在线观看极品精品| 人人超在线公开视频| 精品国产亚洲一区二区三区| 日本在线电影一区二区三区| 久久夜色电影| 久久久五月婷婷| 亚洲综合色在线| 国产一区二区三区直播精品电影| 成人自拍爱视频| 一二三区在线观看| 日本道不卡免费一区| 亚洲摸摸操操av| 欧美先锋影音| 欧洲一区在线电影| 欧美xxxx黑人又粗又长密月| 国产精品99999| 亚洲国内欧美| 欧美一区二区福利视频| 日本一区二区三不卡| 好吊日av在线| jiyouzz国产精品久久| 久久久久久69| 草草久视频在线观看电影资源| 激情小说亚洲色图| 日韩成人午夜电影| 色偷偷久久一区二区三区| 69精品小视频| 国产精品久久国产| 丝袜中文在线| 欧美欧美黄在线二区| 91免费版在线| 在线视频免费一区二区|