深度解析:基于 RocketMQ 實現分布式事務的技術實踐與原理探究
在上一篇文章Spring Boot自動裝配原理以及實踐我們完成了服務通用日志監控組件的開發,確保每個服務都可以基于一個注解實現業務功能的監控。 而本文我們嘗試基于RocketMQ實現下單的分布式的事務。可能會有讀者會有疑問,之前我們不是基于Seata完成了分布式事務,為什么我們還要用到RocketMQ呢?
我們的再來回顧一下我們下單功能大抵是做以下三件事情:
- 創建訂單,將訂單記錄存到數據庫中。
- 扣款,記錄用戶扣款后錢包所剩下的額度。
- 扣除商品庫存,并發放商品。

我們將該場景放到高并發場景下,這個功能勢必要考慮性能和可靠性問題,所以我們在業務需求清楚明了的情況下,就希望能有一種方式確保下單功能在高并發場景保證性能、可靠性。 而Seata的AT模式確實可以保證最終一致性,但是seata的AT模式本質上是依賴于global_table、branch_table等數據表維護應用層分布式事務,在操作期間會涉及大量的更新和刪除操作,隨著時間的推移還是會出現大量的索引碎片,導致索引性能下降。
所以我們就考慮采用RocketMQ實現分布式事務,盡管RocketMQ對于分布式事務的實現業務侵入性相對強一些,但它可以保證業務層面的功能解耦從而提升并發性能,且RocketMQ還對消息消費可靠性做了許多不錯的優化,例如:失敗重試、死信隊列等,所以我們還是嘗試使用RocketMQ來改良我們的下單分布式事務問題。
一、詳解RocketMQ落地分布式事務案例
1. 需求說明
用戶下單大抵需要在三個服務中完成:
- 訂單服務完成訂單創建,基于用戶傳入的產品編碼、用戶編碼、產品購買數生成訂單信息,對應的調用參數如下:
{
"accountCode": "0932897",
"productCode": "P003",
"count": 1
}- 基于入參的用戶代碼定位到用戶錢包金額,完成賬戶扣款。
- 基于產品和購買數完成庫存扣減。
這其中會跨域三個服務,分別是訂單服務創建訂單、賬戶服務扣款、商品服務扣減庫存。

2. 落地思路
以我們業務為最終目標,RocketMQ實現分布式事務的原理是基于2PC的,流程大抵如下:
- 訂單服務發送一個事務消息到消息隊列,消息內容就是我們的訂單信息,這里面包含用戶賬號、購買的產品代碼、購買產品數量等數據。
- MQ收到half消息,并回復ack確認。
- 生產者(訂單服務order-service)得知我們發送的消息已被收到,訂單服務則執行本地事務并提交事務,即將訂單信息寫入數據庫中,同時在該事務內將訂單插入結果寫入transaction_log表中。
- 生產者(訂單服務order-service)完成本地事務的提交,告知MQ將事務消息commit,此時消費者就可以消費這條消息了,注意若生產者消費失敗,則將消息rollback,一切就當沒有發生過。
- 如果上述的消息是commit則將消息持久化到commitLog中,以便后續MQ宕機或者服務宕機后依然可以繼續消費這條沒有被消費的消息。
- (非必要步驟)若MQ長時間沒有收到生產者的commit或者rollback的信號,則攜帶事務id找生產者查詢transaction_log索要當前消息狀態,如果看到對應的消息則判定生產者事務成功將消息commit給消費者消費,若沒看到則說明生產者本地事務執行失敗,回滾該消息。
- 消費者即我們的用戶服務或者庫存服務收到消息則執行本地事務并提交,若失敗則會不斷重試,直到達到上限則將消息存到死信隊列并告警。
- 人工介入查看死信隊列查看失敗消息手工補償數據。

二、實踐-基于RocketMQ實現分布式事務
1. 部署RocketMQ(Linux環境)
在編寫業務代碼之前,我們必須完成一下RocketMQ的部署,首先我們自然要下載一下RocketMQ,下載地址如下,筆者下載的是rocketmq-all-4.8.0-bin-release這個版本:https://rocketmq.apache.org/download/。
完成完成后,我們將其解壓到自定義的路徑,鍵入sudo vim /etc/profile配置MQ環境變量,完成后鍵入source /etc/profile使之生效,對應的配置內容如下所示:
export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin需要注意的是筆者本次采用WSL的Ubuntu子系統時啟動時腳本會拋出runserver.sh: 70: [[: Exec format error錯誤,嘗試格式化和指令配置后都沒有很好的解決,于是循著報錯找到runserver.sh這行對應的腳本內容,該括弧本質上就是基于JDK內容配置對應的GC算法:

以筆者為里系統是jdk8,所以直接去掉判斷用走JDK8的配置即可:
choose_gc_options()
{
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFractinotallow=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
}完成后鍵入./mqnamesrv &將MQ啟動,如果彈窗輸出下面這條結果,則說明mq的NameServer啟動成功。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON然后我們再鍵入./mqbroker -n 127.0.0.1:9876啟動broker,需要注意的是默認情況下broker占用堆內存差不多是4g,所以讀者本地部署時建議修改一下runbroker.sh的堆內存,如下圖所示:

若彈窗輸出下面所示的文字,則說明broker啟動成功,自此mq就在windows環境部署成功了。我們就可以開始編碼工作了。
The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:98762. 服務引入MQ完成下單功能開發
(1) 服務引入RocketMQ依賴
完成RocketMQ部署之后,我們就可以著手編碼工作了,首先我們要在在三個服務中引入RocketMQ的依賴,由于筆者的spring-boot版本比較老,所以這里筆者為了統一管理在父pom中指定了mq較新的版本號:
<!--rocketmq-->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>然后我們分別對order、account、product三個服務中引入依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>(2) 注冊中心配置RocketMQ信息
由于我們的分布式事務涉及3個服務,而且mq的消費模式采用的是發布訂閱模式,所以我們的生產者(order-service)和消費者(account-serivce)都配置為cloud-group
rocketmq.name-server=172.29.193.12:9876
# 指定消費者組
rocketmq.producer.group=cloud-group之所以沒有沒將消費者2(product-service)也配置到cloud-group中的原因也很簡單,同一個消息只能被同一個消費者組中的一個成員消費,假如我們的將product-service配置到同一個消費者組中就會出現因一條消息只能被一個服務消費而導致product-service收不到消息。

對此我們實現思路有兩種:
- 將服務都放到同一個消費者組,消費模式改為廣播模式。
- 將product-service設置到別的消費者組中。
考慮后續擴展筆者選擇方案2,將產品服務的訂閱者放到消費者組2中:
rocketmq.name-server=172.29.193.12:9876
rocketmq.producer.group=cloud-group2(3) 創建消息日志表
我們在上文進行需求梳理時有提到一個MQServer沒收到生產者本地事務執行狀態進行回查的操作,所以我們在生產者在執行本地事務時,需要創建一張表記錄生產者本地事務執行狀態,建表SQL如下:
DROP TABLE IF EXISTS `rocketmq_transaction_log`;
CREATE TABLE `rocketmq_transaction_log` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`transaction_id` varchar(50) DEFAULT NULL,
`log` varchar(500) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;(4) 完成order服務half消息發送、監聽、回查回調邏輯
我們的訂單服務需要做以下三件事:
- 發送half消息給MQ。
- half消息發送成功執行本地事務并記錄日志。
- 告知MQ可以提交事務消息。
所以我們需要定義一下消息格式,對象類中必須包含訂單號、產品編碼、用戶編碼、購買產品數量等信息。
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class OrderDto {
private static final long serialVersionUID = 1L;
//設置主鍵自增,避免插入時沒必要的報錯
@TableId(value = "ID", type = IdType.AUTO)
private Integer id;
/**
* 訂單號
*/
private String orderNo;
/**
* 用戶編碼
*/
private String accountCode;
/**
* 產品編碼
*/
private String productCode;
/**
* 產品扣減數量
*/
private Integer count;
/**
* 余額
*/
private BigDecimal amount;
/**
* 本次扣減金額
*/
private BigDecimal price;
}然后我們就可以編寫控制層的代碼了,通過獲取前端傳輸的參數調用orderService完成half消息發送。
@PostMapping("/order/createOrderByMQ")
public ResultData<String> createOrderByMQ(@RequestBody OrderDto orderDTO) {
log.info("基于mq完成用戶下單流程,請求參數: " + JSON.toJSONString(orderDTO));
orderService.createOrderByMQ(orderDTO);
return ResultData.success("基于mq完成用戶下單完成");
}orderService的實現邏輯很簡單,定義好消息設置消息頭內容和消息載體的對象,通過sendMessageInTransaction方法完成半消息發送,需要了解一下消息的主題(topic)為ORDER_MSG_TOPIC,只有訂閱這個主題的消費者才能消費這條消息:
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void createOrderByMQ(OrderDto orderDto) {
//創建half消息對應的事務日志的id
String transactionId = UUID.randomUUID().toString();
//調用產品服務獲取商品詳情
ResultData<ProductDTO> productInfo = productFeign.getByCode(orderDto.getProductCode());
//計算總售價
BigDecimal amount = productInfo.getData().getPrice().multiply(new BigDecimal(orderDto.getCount()));
orderDto.setAmount(amount);
//將訂單信息作為載體
Message<OrderDto> message = MessageBuilder.withPayload(orderDto)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
//下單用戶編碼
.setHeader("accountCode", orderDto.getAccountCode())
//產品編碼
.setHeader("productCode", orderDto.getProductCode())
//產品購買數
.setHeader("count", orderDto.getCount())
//下單金額
.setHeader("amount", amount)
.build();
//發送half消息
rocketMQTemplate.sendMessageInTransaction("ORDER_MSG_TOPIC", message, orderDto);
}完成half消息發送之后,我們就必須知曉消息發送結果才能確定是否執行本地事務并提交,所以我們的訂單服務必須創建一個監聽器了解half消息的發送情況,executeLocalTransaction方法就是mq成功收到半消息后的回調函數,一旦我們得知消息成功發送之后,MQ就會執行這個方法,筆者通過這個方法獲取消息頭的參數創建訂單對象,調用createOrderWithRocketMqLog完成訂單的創建的本地事務成功的日志記錄。
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class OrderListener implements RocketMQLocalTransactionListener {
private final IOrderService orderService;
private final RocketmqTransactionLogMapper rocketMqTransactionLogMapper;
/**
* 監聽到發送half消息,執行本地事務
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
log.info("order執行本地事務");
try {
//解析消息頭
MessageHeaders headers = message.getHeaders();
//獲取購買金額
BigDecimal amount = new BigDecimal(String.valueOf(headers.get("amount")));
//獲取訂單信息
Order order = Order.builder()
.accountCode((String) headers.get("accountCode"))
.amount(amount)
.productCode((String) headers.get("productCode"))
.count(Integer.valueOf(String.valueOf(headers.get("count"))))
.build();
//獲取事務id
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
//執行本地事務和記錄事務日志
orderService.createOrderWithRocketMqLog(order, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("創建訂單失敗,失敗原因: {}", e.getMessage(), e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 本地事務的檢查,檢查本地事務是否成功
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
//獲取事務ID
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("檢查本地事務,事務ID:{}", transactionId);
//根據事務id從日志表檢索
QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("transaction_id", transactionId);
RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
//如果消息表存在,則說明生產者事務執行完成,回復commit
if (null != rocketmqTransactionLog) {
return RocketMQLocalTransactionState.COMMIT;
}
//回復rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
}createOrderWithRocketMqLog做了兩件事,分別是插入訂單信息和創建消息日志,這里筆者用到了事務注解確保了兩個操作的原子性。 這樣一來,MQserver后續的回查邏輯完全可以基于RocketmqTransactionLog 進行判斷,如果消息的事務id在表中存在,則說明生產者本地事務成功,反之就是失敗。
@Transactional(rollbackFor = Exception.class)
@Override
public void createOrderWithRocketMqLog(Order order, String transactionId) {
//創建訂單編號
order.setOrderNo(UUID.randomUUID().toString());
//插入訂單信息
orderMapper.insert(order);
//事務日志
RocketmqTransactionLog log = RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("執行創建訂單操作")
.build();
rocketmqTransactionLogMapper.insert(log);
}補充一下基于MP生成的RocketmqTransactionLog 類代碼:
@TableName("rocketmq_transaction_log")
@ApiModel(value = "RocketmqTransactionLog對象", description = "")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RocketmqTransactionLog implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "ID", type = IdType.AUTO)
private Integer id;
private String transactionId;
private String log;
}(5) 完成account、product監聽事件
然后我們就可以實現用戶服務和商品服務的監聽事件了,一旦生產者提交事務消息之后,這幾個消費者都會收到這個topic(主題)的消息,進而完成當前服務的業務邏輯。
先來看看實現扣款的用戶服務,我們的監聽器繼承了RocketMQListener,基于@RocketMQMessageListener注解設置它訂閱的主題為createByRocketMQ,一旦收到這個主題的消息時這個監聽器就會執行onMessage方法,我們的邏輯很簡單,就是獲取消息的內容完成扣款,唯一需要注意的就是線程安全問題。我們的壓測的情況下,單用戶可能會頻繁創建訂單,在并發期間同一個用戶的扣款消息可能同時到達扣款服務中,這就導致單位時間內扣款服務從數據庫中查詢到相同的余額,執行相同的扣款邏輯,導致金額少扣了。

所以我們必須保證扣款操作互斥和原子化,考慮到筆者當前項目環境是單體,所以就用簡單的synchronized 關鍵字解決問題。
@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SubtracAmountListener implements RocketMQListener<OrderDto> {
@Resource
private AccountMapper accountMapper;
//強制轉為runTimeException
@SneakyThrows
@Override
public void onMessage(OrderDto orderDto) {
log.info("賬戶服務收到消息,開始消費");
QueryWrapper<Account> query = new QueryWrapper<>();
query.eq("account_code", orderDto.getAccountCode());
//解決單體服務下線程安全問題
synchronized (this){
Account account = accountMapper.selectOne(query);
BigDecimal subtract = account.getAmount().subtract(orderDto.getAmount());
if (subtract.compareTo(BigDecimal.ZERO)<0){
throw new Exception("用戶余額不足");
}
account.setAmount(subtract);
log.info("更新賬戶服務,請求參數:{}", JSON.toJSONString(account));
accountMapper.updateById(account);
}
}
}然后就說商品服務,邏輯也很簡單,也同樣要注意一下線程安全問題:
@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group2")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ProductSubtractListener implements RocketMQListener<OrderDto> {
@Resource
private ProductMapper productMapper;
@Override
public void onMessage(OrderDto orderDto) {
log.info(" 產品服務收到消息,開始消費");
QueryWrapper<Product> queryWrapper=new QueryWrapper<>();
queryWrapper.eq("product_code",orderDto.getProductCode());
synchronized (this){
Product product = productMapper.selectOne(queryWrapper);
if (product.getCount()<orderDto.getCount()){
throw new RuntimeException("庫存不足");
}
product.setCount(product.getCount()-orderDto.getCount());
log.info("更新產品庫存信息,請求參數:{}", JSON.toJSONString(product));
productMapper.updateById(product);
}
}
}三、基于幾個測試用例驗證MQ半消息事務
1. 前置準備與說明
完整編碼工作后,自測是非常有必要的,我們日常完成開發任務后,都會結合需求場景以及功能編排一些自測用例查看最終結果是否與預期一致。 需要注意的是由于訂單業務邏輯較為復雜,很多業務場景一篇博客是不可能全部覆蓋,所以這里我們就測試一下基于RocketMQ實現分布式事務常見的幾個問題場景是否和預期一致。
在測試前我們必須做好前置準備工作,準備功能測試時涉及到的SQL語句,以本次用戶購買產品的業務為例,涉及到訂單表、用戶賬戶信息表、產品表、以及生產者本地事務日志表。
SELECT * FROM t_order to2 ;
SELECT * from account a ;
SELECT * from product p ;
SELECT * FROM rocketmq_transaction_log rtl ;在每次測試完成之后,我們希望數據能夠還原,所以這里也需要準備一下每次測試結束后的更新語句,由于訂單表和消息日志表都是主鍵自增,考慮到這兩張表只涉及插入,所以筆者為了重置主鍵的值采取的是truncate語句。
truncate table t_order;
truncate rocketmq_transaction_log ;
UPDATE account set amount=10000 ;
UPDATE product set count=10000;2. 測試正常消費
第一個用例是查看所有服務都正常的情況下,訂單表是否有數據,用戶表的用戶是否會正常扣款,以及商品表庫存是否會扣減。
測試前,我們先查看訂單表,確認沒有數據

查看我們的測試用戶,錢包額度為10000

再查看庫存表,可以看到數量為1000

確認完數據之后,我們就可以測試服務是否按照預期的方式執行,將所有服務啟動。

我們通過網關發起調用,請求地址如下:
http://localhost:8090/order/order/createOrderByMQ請求參數如下,從參數可以看出這個請求意為用戶代碼(accountCode)為demoData這個用戶希望購買1個(count)產品代碼(productCode)為P001的產品,該產品當前售價(price)為1元。
{
"accountCode": "0932897",
"productCode": "P003",
"count": 1
}調用完成后,查看訂單表,訂單數據生成無誤:
圖片
查看用戶服務是否完成用戶扣款,扣款無誤:

查看產品表,可以看到產品數量也準確扣減:

3. 測試生產者commit提交失敗
我們希望測試一下發送完half消息之后,執行本地事務完成,但是未提交commit請求時,MQServer是否會調用回查邏輯。
為了完成這一點我們必須按照以下兩個步驟執行:
- 在訂單服務提交事務消息處打個斷點。

- 發起請求,當代碼執行到這里的時候通過jps定位到進程號,將其強制殺死。如下所示,我們的代碼執行到了提交事務消息這一步:

我們通過jps定位并將其殺死::

完成這些步驟后,我們再次將服務啟動,等待片刻之后可以發現,MQServer會調用checkLocalTransaction回查生產者本地事務的情況。我們放行這塊代碼讓程序執行下去,最后再查看數據庫中的數據結果是否符合預期。

4. 測試消費者消費失敗
測試消費者執行報錯后是否會進行重試,這一點就比較好測試了,我們在消費者監聽器中插入隨便插入一個報錯查看其是否會不斷重試。這里筆者就不多做演示,實驗結果是會進行不斷重試,當重試次數達到閾值時會將結果存到死信隊列中。

四、壓測MQ和Seata的性能
由于MQ是采用異步消費的形式解耦了服務間的業務,而我們的Seata采用默認的AT模式每次執行分布式事務時都會需要借助undo-log、全局鎖等的方式保證最終一致性。所以理論上RocketMQ的性能肯定是高于Seata的,對此我們不妨使用Jmeter進行壓測來驗證一下。
本次壓測只用了1000個并發,MQ和seata的壓測結果如下,可以看到MQ無論從執行時間還是成功率都遠遠優秀于Seata的。
MQ的壓測結果:

Seata的壓測結果,可以看到大量的數據因為lock_table鎖超時而導致失敗,所以整體性能表現非常差勁:

五、詳解RocketMQ落地分布式事務常見問題
1. RocketMQ 如何保證事務的最終一致性
最終一致性是一種允許軟狀態存在的分布式事務解決方案,RocketMQ 保證事務最終一致性的方式主要是依賴生產者本地事務和消息可靠發送的原子性來最大努力保證最終一致性,注意這里筆者所強調的盡最大努力交付。
之所以說是最大努力交付是說RocketMQ是通過保證生產者事務和消息發送可靠性的原子性和一致性,由此保證消費者一定能夠消費到消息,理想情況下,只要消費者能夠正確消費消息,事務結果最終是可以保證一致性的,但是復雜的系統因素消費者可能會存在消費失敗的情況,此時事務最終一致性就無法保證,業界的做法是通過手動操作或者腳本等方式完成數據補償。

2. 什么是half消息
half消息即半消息,和普通消息的區別是該消息不會立馬被消費者消費,原因是half消息的存在是為了保證生產者本地事務和消費者的原子性和一致性,其過程如上文所介紹,初始發送的half消息是存儲在MQ一個內存隊列中(并未投遞到topic中),只有生產者本地事務成功并發送commit通知后,這個消息才會被持久化到commitLog同時提交到topic隊列中,此時消費者才能夠消費該消息并執行本地事務。
3. 為什么要先發送half消息再執行本地事務?先執行本地事務,成功后在發送不行嗎?
先發送half消息的原因是為了盡可能確保生產者和消息隊列通信正常,只有通信正常了才能確保生產者本地事務和消息發送的原子性和一致性,由此保證分布式事務的可靠性。
先執行本地事務,執行成功后再發送存在一個問題,試想一下,假設我們本地事務執行成功,但是發送的消息因為網絡波動等諸多原因導致MQ沒有收到消息,此時生產者和消費者的分布式事務就會出現數據不一致問題。

而half消息則不同,它會優先發送一個消費者感知不到的half消息確認通信可達,然后執行本地事務后降消息設置未commit讓消費者消費,即使說commit消息未收到,因為half消息的存在,MQ在指定超時先限制后也可以通過回查的方式到生產者事務表查詢執行情況。
4. 如果mq收到half消息,準備發送success信號的消息給生產者,但因為網絡波動導致生產者沒有收到這個消息要怎么辦?
此時生產者就會認為half消息發送失敗,本地事務不執行,隨著時間推移MQ長時間沒收到commit或者rollback消息就會回查生產者消息日志表,明確沒看到數據則知曉生產者本地事務執行失敗,直接rollback掉half消息,而消費者全程無感知,業務上的一致性也是可以保證。

5. MQ沒有收到生產者(訂單服務)的commit或者rollback信號怎么保證事務最終一致性?
常規的做法就是建立一張表記錄消息狀態,只要我們訂單信息插入成功就需要日志一下這條數據,所以我們必須保證訂單數據插入和日志插入表中的原子性,確保生產者的事務和消息日志的ACID:

6. 如果生產者執行本地事務失敗了怎么辦?
這一點前面的部分也已經說明,首先將本地會事務回滾,并向消息隊列提交一個rollback的請求不提交half消息,消息就不會被消費者消費,保證最終一致性。
7. 前面說的都是事務流程?這和事務消息如何保證數據最終一致性有什么關系?
生產者和消息隊列事務流程可以確保生產者和消息隊列發送的一致性,確保寫操作都是同時成功或者失敗。只有保證兩者正常通信,才能確保消費者可以消費MQ中的消息從而完成數據最終一致性。
8. 消費者提交本地事務失敗了怎么辦?
我們都知道消息隊列只能保證消息可靠性,而無法保證分布式事務的強一致性,出現這種情況,消費者 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 會在一定時間后,繼續將這條消息推送給消費者,消費者就可以繼續執行本地事務并提交了,直到成功消息隊列會進行N次重試,如果還是失敗,則可以到死信隊列中查看失敗消息,然后通過補償機制實現分布式事務最終一致性。

































