国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

完美解決,RocketMQ如何支持多事務消息?

開發 前端
本文解決了在 RocketMQ 2.1.0 版本以后,無法簡單使用多個 @RocketMQTransactionListener? 的問題。通過引入事務消息處理接口 TransactionMessageHandler,我們將原有的事務處理器改造成了一個分發器,使得在 DailyMart 項目中可以輕松處理多事務消息的場景。

今天我們將解決使用RocketMQ事務消息時可能遇到的一個常見問題:如何讓其支持多事務消息?

1. 問題背景

在實際開發中,我們常常會面臨多事務消息的場景,例如在DailyMart的訂單模塊中,用戶支付后需要調用庫存服務進行庫存扣減,而在訂單確認收貨后需要調用用戶服務實現積分贈送。這兩個業務邏輯都需要通過事務消息來保證分布式事務。

為了處理這種情況,我們可能會考慮在訂單模塊中創建兩個事務消息監聽器,分別用于處理庫存扣減和積分贈送的事務處理和事務回查。

@Component
@Slf4j
//處理訂單支付的事務監聽器
public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
    //處理訂單支付邏輯
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
      //檢查訂單處理邏輯
   }
}

@Component
@Slf4j
//處理訂單收貨的事務監聽器
public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
   }
}

然而,當我們信心滿滿地完成業務邏輯編寫并啟動服務時,可能會遇到如下錯誤:rocketMQTemplate already exists RocketMQLocalTransactionListener

圖片圖片

在rocketmq-spring-boot-starter版本低于2.1.0的項目中,可以使用多個 @RocketMQTransactionListener 監聽不同的 txProducerGroup 來發送不同類型的事務消息到topic。然而,從 RocketMQ-Spring 2.1.0 版本開始,注解 @RocketMQTransactionListener 不能設置 txProducerGroup、ak、sk,這些值均需與對應的 RocketMQTemplate 保持一致。通過閱讀源碼 RocketMQTransactionConfiguration#registerTransactionListener() 方法,也可得知在RocketMQ如果已經存在了 RocketMQTransactionListener 則會出現上述錯誤。

圖片圖片

2. 如何解決

為了在保證系統只有一個 RocketMQTransactionListener 的前提下實現多事務消息,我們可以將 RocketMQLocalTransactionListener 不處理具體業務邏輯,而是將其作為一個分發器使用。

在生產者發送事務消息時指定對應的事務處理器 ,并將事務處理器放置在消息頭上發送出去,在 RocketMQTransactionListener 中根據消息頭選擇具體的事務處理器來實現業務邏輯。

具體實現如下:

2.1 定義事務消息處理接口

首先,定義公共的事務消息處理接口,所有事務消息都實現此接口而非 RocketMQ 默認的 RocketMQLocalTransactionListener。

public interface TransactionMessageHandler {
    
    /**
    * 執行本地事務
    * @param payload 消息體
    * @param arg 參數
    */
    RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg);
    
    /**
     * 檢查本地執行狀態
     * @param payload 消息體
     * @return 執行結果
     */
    RocketMQLocalTransactionState checkLocalTransaction(Object payload);
    
}

2.2 修改事務消息發送工具類,指定消息處理器

public <T extends RemoteDomainEvent> TransactionSendResult sendTransaction(String topic, String tag, T message, Class<? extends TransactionMessageHandler> transactionMessageListener) {  
  if(transactionMessageListener == null){
    throw new IllegalArgumentException("transactionMessageListener must not null");
  }
  
  String destination = buildDestination(topic, tag);

  Message<T> sendMessage = MessageBuilder.withPayload(message)
    .setHeader(RocketMQHeaders.KEYS, message.getKey())
    .setHeader(SOURCE_HEADER, message.getSource())
    .setHeader(TRANSACTION_MESSAGE_HEADER, transactionMessageListener.getSimpleName())
    .build();
  TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, sendMessage, null);

  log.info("[{}]事務消息[{}]發送結果[{}]", destination, JSONObject.toJSON(message),JSONObject.toJSON(sendResult));

  return sendResult;
}

2.3 修改RocketMQ事務消息監聽器

@Slf4j
@RocketMQTransactionListener
public class DefaultRocketMQTransactionListener implements RocketMQLocalTransactionListener {
    
    private final Map<String, TransactionMessageHandler> transactionMessageHandlerMap;
    
    public DefaultRocketMQTransactionListener(Map<String, TransactionMessageHandler> transactionMessageHandlerMap) {
        this.transactionMessageHandlerMap = transactionMessageHandlerMap;
    }
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("消費者收到事務消息[{}]", JSONObject.toJSON(message));
        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);
        
        if (null == listenerName) {
            throw new RuntimeException("not params transactionMessageListener");
        }
        
        RocketMQLocalTransactionState state;
        Object payload = message.getPayload();
        try {
            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);
            if (null == messageHandler) {
                throw new RuntimeException("not match condition TransactionMessageHandler");
            }
            state = messageHandler.executeLocalTransaction(payload, arg);
        } catch (Exception e) {
            log.error("rocket transaction message executeLocal error:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        
        return state;
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("消費者收到事務回查消息[{}]", JsonUtils.obj2String(message.getHeaders()));
        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);
        if (null == listenerName) {
            throw new RuntimeException("not params transactionMessageListener");
        }
        RocketMQLocalTransactionState state;
        try {
            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);
            if (null == messageHandler) {
                throw new RuntimeException("not match condition TransactionMessageHandler");
            }
            state = messageHandler.checkLocalTransaction(message.getPayload());
        } catch (Exception e) {
            log.error("rocket transaction message executeLocal error:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        
        return state;
    }
    
}

在上述代碼中,根據消息頭中的TRANSACTION_MESSAGE_HEADER參數選擇對應的事務處理器來處理事務消息。

在 DailyMart 中有一個公共組件 dailymart-rocketmq-spring-boot-starter 專門用于 RocketMQ 消息發送監聽的封裝,因此我們也將事務消息的處理邏輯封裝到了此組件中。

圖片圖片

2.4 修改事務消息處理邏輯

所有的事務消息處理邏輯都實現 TransactionMessageHandler 接口,以訂單支付的處理邏輯為例:

@Component
@Slf4j
public class OrderPaidTransactionConsumer implements TransactionMessageHandler {
    
    @Resource
    private TransactionTemplate transactionTemplate;
    
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg) {
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);
        ...
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Object payload) {
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);
        ...
    }
    
}

2.5 修改事務消息發送邏輯,指定事務處理器

TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID", orderPaidEvent, OrderPaidTransactionConsumer.class);

小結

本文解決了在 RocketMQ 2.1.0 版本以后,無法簡單使用多個 @RocketMQTransactionListener 的問題。通過引入事務消息處理接口 TransactionMessageHandler,我們將原有的事務處理器改造成了一個分發器,使得在 DailyMart 項目中可以輕松處理多事務消息的場景。

責任編輯:武曉燕 來源: JAVA日知錄
相關推薦

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2023-07-17 08:34:03

RocketMQ消息初體驗

2021-10-03 21:41:13

RocketMQKafkaPulsar

2021-04-15 09:17:01

SpringBootRocketMQ

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2021-03-04 06:49:53

RocketMQ事務

2023-09-04 08:00:53

提交事務消息

2025-04-29 04:00:00

分布式事務事務消息

2022-07-04 11:06:02

RocketMQ事務消息實現

2024-06-13 09:25:14

2024-08-06 09:55:25

2024-10-22 08:01:15

2014-03-25 10:57:42

Android消息推送方案

2024-12-04 15:38:43

2021-02-02 11:01:31

RocketMQ消息分布式

2023-12-15 13:08:00

RocketMQ中間件消費順序

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2022-12-22 10:03:18

消息集成

2021-04-27 07:52:18

RocketMQ消息投遞
點贊
收藏

51CTO技術棧公眾號

亚洲黄色www网站| av网站一区| 国产福利视频一区二区三区| 亚洲国产欧洲综合997久久 | 欧美午夜视频网站| 麻豆免费在线视频| 亚洲人成网在线播放| 日韩精品一区二区三区中文在线 | 美女黄色片视频| 亚洲国产欧美日韩另类综合| 日本在线免费网| 久久99国产精品久久99大师 | 亚洲欧洲中文天堂| 99国产精品免费网站| 92福利视频午夜1000合集在线观看 | 麻豆精品久久久| 成人国产精品一区二区| 久久三级福利| 成人av一区二区三区| 濑亚美莉一二区在线视频| 亚洲国产美女搞黄色| 中文字幕亚洲乱码| 欧美一区二区三区不卡| 午夜爽爽视频| 久久精品一级爱片| av777777| 色94色欧美sute亚洲线路一ni | 久久精品在线免费观看| 91手机在线观看| 欧美一级三级| 亚洲天堂男人天堂| 欧美久久香蕉| 中文字幕一区二区三区四区五区六区 | 午夜精品免费视频| 欧美极品欧美精品欧美图片| 久久蜜桃av一区精品变态类天堂 | 亚洲一区在线不卡| 国产一区二区三区精品视频| 国产精品av在线| 99综合99| 欧美日韩色一区| 最猛黑人系列在线播放| 欧美一区二区精品在线| 免费大片在线观看www| 欧美黑人巨大精品一区二区| 91黄色小视频| 国产精品普通话对白| 在线视频三级| 日韩有码在线观看| 91免费版在线| 另类一区二区三区| 亚洲欧美在线网| 亚洲人成欧美中文字幕| 久久99国产精品久久| 亚洲一区av| 成人网免费视频| 性欧美暴力猛交69hd| 成人精品免费网站| 成人看片网页| 国内一区二区三区在线视频| 国产偷国产偷精品高清尤物| 色戒汤唯在线| 韩国黄色一级大片| 欧美性猛交xxxxx水多| 黄色片网站在线| 欧美激情第六页| 欧美网站一区二区| 欧美成人xxxxx| 欧美日韩一区国产| 东京久久高清| 亚洲黄色网址在线观看| 欧美性猛交xxxx乱大交3| 日韩在线你懂得| 日韩欧美一区二区三区四区| 婷婷久久综合九色综合伊人色| 91麻豆精品| 亚洲电影免费| 欧美亚洲愉拍一区二区| 日韩精品一级| 情侣黄网站免费看| 亚洲精品一二区| 国内精品国产成人| 最近中文字幕免费mv2018在线| 成人欧美一区二区三区在线观看 | 福利在线一区二区| 欧美一三区三区四区免费在线看 | 激情欧美一区二区三区黑长吊| 一区二区在线播放视频| 亚洲第一精品区| 欧美三级在线视频| 精品日本12videosex| 一区二区三区视频网| 日韩一区二区在线视频| 紧缚捆绑精品一区二区| 日韩经典av| 日本视频一区二区在线观看| 91精品国产综合久久久久久久久久| 欧美午夜一区| 内衣办公室在线| 成人av播放| 91福利国产成人精品照片| 欧美jjzz| 理论片午午伦夜理片在线播放| 国产一级精品aaaaa看| 欧美性感一区二区三区| 欧美精品麻豆| 久操视频在线观看| 日本一区精品| 亚洲国产精品一区二区久| 麻豆精品蜜桃视频网站| 91在线三级| 17c丨国产丨精品视频| 亚洲精品成人网| 男人的天堂亚洲一区| 免费在线国产视频| 国产日韩第一页| www日韩欧美| 中文字幕在线观看不卡视频| 国产成人精品999在线观看| 一级特黄特色的免费大片| 亚洲a成v人在线观看| 欧美日韩精品欧美日韩精品 | 婷婷久久免费视频| www.国产区| 国产69精品久久久久9999| 中文字幕av不卡| 精品国产1区| 国产一级二级三级在线观看| 久久精品aaaaaa毛片| 日韩欧美一区二区视频| 国产成人午夜精品5599| 国产麻豆一区二区三区| 蜜桃免费在线视频| 国产精品一二区| 欧美剧情电影在线观看完整版免费励志电影 | 99久久99| 精品国产乱码91久久久久久网站| 国产一区二区免费视频| 精品国产三区在线| 免费av高清| 久久99蜜桃综合影院免费观看| 亚洲成人1234| 国产一区二区在线观看免费| 黄色日韩网站| 一级视频在线免费观看| 精品日本一区二区| 亚洲人在线视频| 亚洲日本va在线观看| 在线精品观看| 久久久久北条麻妃免费看| 久久久久久99精品| 中日韩免视频上线全都免费| 激情小视频在线观看| 九九热久久66| 欧美成人亚洲成人日韩成人| 亚洲一区二区三区三| 亚洲三级视频| 国外成人福利视频| 蜜臀在线观看| 中文字幕不卡每日更新1区2区| 欧美日韩ab片| 欧美日韩久久久一区| 波多野结衣视频一区| 欧美高清视频手机在在线| a级片国产精品自在拍在线播放| 国产精品999视频| 国产一区二区香蕉| 亚洲国产精品视频在线观看 | 韩国在线视频一区| 99热99re6国产在线播放| 国产小黄视频| 亚洲日本精品国产第一区| 97在线视频免费看| 精品国产91乱码一区二区三区 | www.欧美色图| 国产精品久久久久蜜臀| 国产一区二区网| 成人福利网站在线观看| 亚洲人成电影在线播放| 精品国产1区2区| 99久久精品免费看| 雨宫琴音一区二区在线| 成人午夜在线| 国产成人l区| 日本xxxxxx| 糖心vlog在线免费观看| 91在线直播亚洲| 欧美成人高清视频| 精品久久久久香蕉网| 亚洲制服丝袜一区| av一区二区三区在线| 亚洲免费在线| 欧美热在线视频精品999| 日本电影欧美片| 日p在线观看| 91免费版在线观看| 男人的天堂狠狠干| 伊人久久大香线蕉成人综合网| 91在线视频导航| 51色欧美片视频在线观看|