国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

消息隊列,聊聊發送消息的四種姿勢!

開發 架構
本文詳細介紹了微服務開發中常用的4種消息發送方式。對于那些對數據一致性要求不高的場景,可以選擇使用進階版的消息發送方式。

微服務開發中經常會使用消息隊列進行跨服務通信。在一個典型場景中,服務A執行一個業務邏輯,需要保存數據庫,然后通知服務B執行相應的業務邏輯。在這種場景下,我們需要考慮如何發送消息。

圖片

1. 基礎版

首先,我們可能會考慮將數據庫操作和消息發送放在同一個事務中,以下是偽代碼示例:

@Transactional  
public void saveWithMessage(BusinessDO businessDO){ 
 String id = IdUtils.nextId();
 businessDO.setId(id);
    xxxRepository.save(businessDO);  
    
    BusinessMessage businessMessage = new BusinessMessage();  
    businessMessage.setKey(id);  
    SendResult send = rocketMQTemplate.syncSend("test-topic", sendMessage);
}

在這段代碼里通過@Transactional注解將數據庫的操作以及發送消息放到一個事務中,如果數據庫的保存或者消息發送失敗,則回滾事務。

乍一看似乎沒什么問題,但稍微推敲一下就會發現此方式有如下兩個缺陷:

1.1 數據不一致

首先最容易想到的是,這種消息發送方式無法保證數據的最終一致性。

這里先讓我來解釋一下基于消息隊列,生產者發送消息到消費者消費消息的過程:

  • 生產者發送消息
  • MQ收到消息并將數據持久化,在存儲中新增一條記錄
  • 返回ACK給生產者
  • MQ 推送消息給對應的消費者,等待消費者返回ACK
  • 如果消費者在指定時間內成功返回ACK,那么MQ則認為消費成功,執行第6步刪除消息,如果MQ在指定時間內沒有收到ACK,則認為消息消費失敗,會重新推送消息,重復執行第4、5、6步操作。
  • 刪除消息。

圖片圖片

好,現在回到上面發送消息的場景,假設數據庫處理成功,消息消費成功,但是MQ由于某些原因處理超時,導致ACK確認失敗,此時整個事務回滾,結果出現數據不一致問題。

這種數據不一致的問題在RPC調用的場景下也經常出現,其根本的原因在于:遠程調用,結果最終可能為成功、失敗、超時;而對于超時的情況,處理方最終的結果可能是成功,也可能是失敗,調用方是無法知曉的。

1.2 事務未提交

其次,使用以上方式還存在另一個問題,即消費者在處理消息時可能讀不到剛剛保存的數據,即消費者消費速度快于事務提交的速度。

舉例:

假設服務B需要通過消息中的數據ID獲取服務A數據庫保存的數據。這種情況下,數據庫操作與消息發送在同一事務中。可能出現服務B在處理消息時,服務A事務還未提交,導致服務B獲取的數據是空數據,無法執行相應業務邏輯。

1.3 適用場景

盡管這種發送方式存在上述兩個問題,但在某些場景下仍然適用。例如,消費者在處理時不依賴生產者的數據,且對數據一致性要求不高,這種情況下消息發送和數據庫保存可以不在同一個事務中。

2. 進階版

為解決事務未提交問題,我們可以確保事務提交后再發送消息。在SpringBoot項目中,有兩種常見解決方案。

2.1 事務同步器

基于事務同步器的方法:

@Transactional  
public void saveWithMessage(BusinessDO businessDO){  
    xxxRepository.save(businessDO);  
    
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {  
        @Override  
        public void afterCommit() {  
            BusinessMessage businessMessage = new BusinessMessage();  
            businessMessage.setXXX();  
            rocketMQTemplate.syncSend("test-topic", sendMessage);  
        }  
    });  
}

TransactionSynchronizationManager.registerSynchronization 是 Spring 框架中用于注冊事務同步的方法。通過這個方法,你可以在事務提交、回滾或完成時執行一些額外的邏輯。

在上述代碼中,使用了afterCommit方法,在事務成功提交后執行發送消息操作,確保在數據庫操作成功且事務穩定的情況下發送消息。

2.2 消息監聽器

另一種方法是基于ApplicationEventPublisher,在保存數據庫操作后發布一個事件,并通過@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)注解監聽事件。這樣可以確保消息在數據庫事務提交之后再發送。

@Transactional  
public void saveWithMessage(BusinessDO businessDO){  
    xxxRepository.save(businessDO);  

 eventPublisher.publishEvent(new UserCreatedEvent(registerUser));
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 
public void handleUserRegisteredEvent(UserCreatedEvent userCreatedEvent) {  
    rocketMQTemplate.syncSend("test-topic", sendMessage);  
}

這里需要說明一下:在默認情況下Spring的事件監聽機制并不是異步的(上次群友弄錯了),而是同步的將代碼進行解耦,@TransactionalEventListener也是通過同步的方式,但是加入了回調的方式來解決,這樣就能夠控制事務進行Commited、Rollback時才進行事件的處理,來達到事務同步的目的。

2.3 適用范圍

通過以上方式,相較于基礎版,可以確保消息在事務提交后發送,解決了消費者讀取空數據的問題。但仍然無法保證數據的一致性,適用于對數據一致性要求不高的場景。

3. 本地消息表+補償重試

如果需要保證最終一致性而非強一致性,可以采用本地消息表+補償重試的方式來發送消息。

這種方式的執行原理如下:

  • 在執行業務操作的同時,在本地消息表中插入一條狀態為待發送的記錄,業務數據的記錄與消息記錄必須在同一個事務中完成,這是此方案的核心原則。由于消息表與業務表在同一個庫中,事務可以通過數據庫來保證。
  • 事務提交后發送消息,如果消息發送成功,則將消息狀態標記為發送成功或刪除消息
  • 在生產者服務中會創建一個定時任務,定時從消息表中檢索待發送的消息重新發送。
  • 對于消費者消費失敗,則依賴MQ本身的重試機制來完成,保證數據的最終一致性。

圖片圖片

核心代碼如下:

@Transactional  
public void saveWithMessage(BusinessDO businessDO){  
    TransactionMessage transactionMessage = new TransactionMessage();  
    transactionMessage.setStaus(MessageStatus.WAITING_SEND);  
    transactionMessage.setMessageKey(businessDO.getId());  
    ...  
  
    xxxRepository.save(businessDO);  
    messageRepository.save(TransactionMessage);  
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {  
        @Override  
        public void afterCommit() {  
            messageService.sendMessage(transactionMessage,businessDO);  
        }  
    });  
}  

public void sendMessage(TransactionMessage transactionMessage,BusinessDO businessDO){  
    BusinessMessage businessMessage = new BusinessMessage();  
    businessMessage.setXXX();  
    try{  
        rocketMQTemplate.syncSend("test-topic", sendMessage);  
        transactionMessage.setStatus(MessageStatus.SUCCESS);  
        messageRepository.update(transactionMessage);  
    }catch (Exception e){  
        // 執行失敗的業務邏輯  
    }  
  
}

3.1 問題

雖然這種方式能夠保證消息在事務提交后發送,且能夠保證最終一致性,但仍然存在一些缺陷:

首先,需要額外的消息表,增加了系統復雜度。(針對此問題,我們又可以將該功能單獨提取出來,做成一個消息服務來統一處理,考慮篇幅問題,這里暫不展開。)

其次,通過定時任務輪詢消息表,對于處理成功但ACK超時的數據會重新發送消息,這對下游系統產生了強烈的冪等性保障要求,消費者的處理邏輯必須做好冪等控制。關于冪等的處理方案,我在[[Dailymart17:并發與冪等的實現方案]]一文中有詳細的說明,歡迎翻閱。

4. 基于事務消息發送

目前,RocketMQ是主流MQ中唯一一個支持事務消息的,如果你們項目恰好使用的是RocketMQ,可以采用事務消息來發送。

有關RocketMQ事務消息的詳細信息,可以參考我之前的文章[SpringCloud基于RocketMQ實現分布式事務,這里不再贅述。同時,由于在Dailymart項目中使用的是RocketMQ,也可以參考Dailymart的代碼實現。

小結

本文詳細介紹了微服務開發中常用的4種消息發送方式。對于那些對數據一致性要求不高的場景,可以選擇使用進階版的消息發送方式。而對于需要保證最終一致性的情況,推薦采用事務消息和本地消息表的方式進行消息發送。

責任編輯:武曉燕 來源: JAVA日知錄
相關推薦

2023-12-18 08:36:39

消息隊列微服務開發

2023-11-27 13:42:00

消息隊列RocketMQ

2021-08-10 09:59:15

RabbitMQ消息微服務

2019-11-18 09:58:11

中間件投遞模式

2021-08-24 08:01:15

死鎖工具多線編程

2021-04-07 19:34:16

社區買菜團購

2025-07-28 02:11:00

爬取數據JSOUP

2023-08-26 20:08:15

分庫分表Spring

2024-05-28 08:24:18

2017-10-11 15:08:28

消息隊列常見

2022-05-24 10:43:02

延時消息分布式MQ

2019-05-28 05:07:11

物聯網聯網汽車MQTT

2021-03-19 10:55:06

Eslint代碼前端

2024-11-04 09:39:08

Java?接口Thread?類

2024-03-29 08:33:10

應用場景存儲搜索

2019-07-19 07:56:13

消息隊列消息代理消息中間件

2023-09-26 08:20:12

消息隊列RabbitMQ

2022-12-13 09:19:26

分布式消息隊列

2020-08-05 08:30:25

Spring BootJavaSE代碼

2018-04-26 15:18:49

RTOS應用MPU
點贊
收藏

51CTO技術棧公眾號

大尺度一区二区| 欧美在线免费视屏| 成人久久一区二区| 久久bbxx| 97久久精品人人做人人爽50路| 91精品在线播放| 美女视频一区| 欧美一区二区三区啪啪| 另类春色校园亚洲| 欧美日韩国产综合一区二区 | 免费网站在线观看视频| 欧美成人有码| 97在线视频一区| 成人综合网站| 亚洲成人中文字幕| 免费黄网在线观看| 精品九九在线| 日韩成人在线网站| aaa在线观看| 亚洲成人1区2区| 邪恶网站在线观看| av电影在线观看一区| 五月婷婷一区| 亚洲欧美日韩专区| 91精品久久久久久蜜桃| 精品欧美久久| 2019亚洲日韩新视频| 午夜免费啪视频观看视频| 国产成人精品影院| 日韩中文字幕av在线| 伊人成人在线| 日韩三级中文字幕| 精品影院一区| 日韩欧美aⅴ综合网站发布| 成人在线观看91| 九九精品在线| 性色av一区二区三区| 在线免费观看亚洲| 亚洲精品午夜久久久| 国产精品日韩电影| 日韩av免费观影| 亚洲综合激情网| jizz18欧美| 亚洲一区二区在线视频| 天天夜夜亚洲| 色综合咪咪久久| 免费黄网站在线观看| 欧美午夜精品理论片a级按摩| 97在线观看免费观看高清| 欧美专区日韩专区| 巨大荫蒂视频欧美另类大| 精品国产亚洲在线| 刘亦菲一区二区三区免费看| 亚洲香蕉在线观看| 国产精品久久久久久久久久久久久久久 | 亚洲毛片aa| 韩日精品视频一区| 成年丰满熟妇午夜免费视频| a级精品国产片在线观看| 两根大肉大捧一进一出好爽视频| 久久久久久久久蜜桃| 狠狠操第一页| 精品成人av一区| 黄网站免费在线播放| 日韩av在线网站| 亚洲精品伦理| 国产成人精品综合久久久| 国产在线不卡| 中文字幕一区二区三区最新 | 精品国产乱码久久久久久闺蜜| 韩国成人免费视频| 日韩在线视频一区| 美女网站色精品尤物极品姐弟| 国产精品v日韩精品| 激情婷婷久久| 手机福利在线视频| 欧美国产精品中文字幕| 中文在线а√在线| 欧美成人激情免费网| 国产精品毛片无码| 91视频免费网站| 国产麻豆日韩欧美久久| 成人满18在线观看网站免费| 色噜噜夜夜夜综合网| 欧美自拍电影| 国产精品一区二区久久久久| 久久一区二区三区超碰国产精品| 欧美成人免费在线观看视频| 一区二区三区在线视频观看58| 国产淫片在线观看| 欧美成人在线影院| 亚洲精品麻豆| 久久久久久久999| 99在线视频影院| 欧美日本精品在线| 亚洲网站在线| 怡红院av亚洲一区二区三区h| 亚洲www啪成人一区二区麻豆| av在线播放资源| 国产成人在线精品| 国内精品免费**视频| 88av在线| 一个人www欧美| 国产秀色在线www免费观看| 在线播放亚洲激情| 天天做综合网| a√天堂在线观看| 欧美日韩精品免费| 久久综合社区| av动漫在线免费观看| 91精品1区2区| 欧美性生活一级片| 超碰97在线看| 在线观看国产精品网站| 91久久偷偷做嫩草影院电| 免费一区二区三区在在线视频| 国产精品久久久久久久裸模| 国内在线免费视频| 成人在线一区二区| 国产精品久久久久影院色老大| 国模精品视频| 成人午夜小视频| 久久久久成人黄色影片| 欧美1—12sexvideos| 91福利入口| 亚洲一区在线观看视频| 免费观看亚洲天堂| 久久精品视频免费播放| 影音先锋中文字幕一区二区| 国产主播中文字幕| 亚洲欧洲在线观看| 午夜在线播放视频欧美| 亚洲夫妻av| 国产91精品青草社区| 国产91对白在线观看九色| 国产精品69久久久| 成人丝袜视频网| av片在线观看永久免费| 亚洲伊人一本大道中文字幕| 亚洲色图清纯唯美| a一区二区三区亚洲| 中文字幕欧美日韩一区二区三区| 欧美无人高清视频在线观看| 欧美成免费一区二区视频| 国产无遮挡又黄又爽免费网站| 久久影院中文字幕| 国产高清在线精品| 欧美人与禽猛交乱配| av一区和二区| 色屁屁一区二区| 午夜久久免费观看| 在线观看国产视频| 成人在线观看视频网站| 亚洲日本在线a| 麻豆成人入口| 色琪琪原网站亚洲香蕉| 57pao成人永久免费视频| 中文字幕亚洲电影| 私拍精品福利视频在线一区| 色播五月综合网| 26uuu国产精品视频| 亚洲精品一二三四区| 三级精品视频| 美女免费免费看网站| 国产精品欧美在线| 一本一道久久a久久精品 | 欧美大片在线播放| 中文字幕日韩视频| 成人av一区二区三区| 一本色道久久亚洲综合精品蜜桃| 日韩在线观看你懂的| 久久综合久久综合久久| 国产视频网站一区二区三区| 欧美激情成人网| 欧美区在线播放| 亚洲视频在线观看三级| 日韩一区电影| 午夜免费福利在线观看| 日韩精品电影网站| 伊人激情综合网| 欧美激情资源网| 国产尤物久久久| 国产在线视频网站| 日韩电影天堂视频一区二区| 亚洲免费视频观看| 波多野结衣在线播放一区| 波多野结衣xxxx| 国产欧美日韩亚洲精品| 欧美在线观看视频一区二区| 日韩在线a电影| 国产成人a视频高清在线观看| 日韩欧美黄色大片| 日本一区二区在线免费播放| 色婷婷精品久久二区二区蜜臀av| 日韩高清中文字幕一区| 欧美系列精品| 久草电影在线| 视频二区一区| 97国产真实伦对白精彩视频8| 国产精品传媒视频|