国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

RabbitMQ工作模式-Publish/Subscribe發布與訂閱模式

開發 架構
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

訂閱模式類型

訂閱模式示例圖:

前面2個案例中,只有3個角色:

  • P:生產者,也就是要發送消息的程序
  • C:消費者:消息的接受者,會一直等待消息到來。
  • queue:消息隊列,圖中紅色部分

而在訂閱模型中,多了一個exchange角色,而且過程略有變化:

  • P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)
  • C:消費者,消息的接受者,會一直等待消息到來。
  • Queue:消息隊列,接收消息、緩存消息。
  • Exchange:交換機,圖中的X。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
  • Fanout:廣播,將消息交給所有綁定到交換機的隊列
  • Direct:定向,把消息交給符合指定routing key 的隊列
  • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

Publish/Subscribe發布與訂閱模式

1、模式說明

發布訂閱模式:

每個消費者監聽自己的隊列。

生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收 到消息

2、案例

(1)生產者

package com.lijw.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author Aron.li
 * @date 2022/3/3 8:16
 */
public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設置參數
        factory.setHost("127.0.0.1"); // ip  默認值 localhost
        factory.setPort(5672); //端口  默認值 5672
        factory.setVirtualHost("/test"); //虛擬機 默認值 /
        factory.setUsername("libai"); // 用戶名 默認 guest
        factory.setPassword("libai"); //密碼 默認值 guest
        //3. 創建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創建Channel
        Channel channel = connection.createChannel();
        //5. 創建交換機
        /*
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
           參數:
            1. exchange:交換機名稱
            2. type:交換機類型
                DIRECT("direct"):定向
                FANOUT("fanout"):扇形(廣播),發送消息到每一個與之綁定隊列。
                TOPIC("topic") 通配符的方式
                HEADERS("headers") 參數匹配
            3. durable:是否持久化
            4. autoDelete:自動刪除
            5. internal:內部使用。 一般false
            6. arguments:參數
        */
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
        //6. 創建隊列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);
        // 7. 綁定隊列和交換機
        /*
            queueBind(String queue, String exchange, String routingKey)
            參數:
                1. queue:隊列名稱
                2. exchange:交換機名稱
                3. routingKey:路由鍵,綁定規則
                    如果交換機的類型為fanout ,routingKey設置為""
         */
        channel.queueBind(queue1Name, exchangeName, "");
        channel.queueBind(queue2Name, exchangeName, "");
        //8. 發送消息至交換機,由交換機分發消息
        String body = "日志信息: 肥仔白調用了findAll方法...日志級別: INFO....";
        channel.basicPublish(exchangeName, "", null, body.getBytes());
        //9. 釋放資源
        channel.close();
        connection.close();
        
    }
}

執行生產者,我們可以查看一下創建的 交換機 以及 隊列信息:

下面再來看看隊列,如下:

下面我們繼續來寫兩個消費者接收消息。

(2)消費者1:讀取隊列1的消息

package com.lijw.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */
public class Consumer_PubSub1 {

    //定義接收隊列的名稱
    final static String queueName = "test_fanout_queue1";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設置參數
        factory.setHost("127.0.0.1"); // ip  默認值 localhost
        factory.setPort(5672); //端口  默認值 5672
        factory.setVirtualHost("/test"); //虛擬機 默認值 /
        factory.setUsername("libai"); // 用戶名 默認 guest
        factory.setPassword("libai"); //密碼 默認值 guest
        //3. 創建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創建Channel
        Channel channel = connection.createChannel();
        //5. 創建隊列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        參數:
            1. queue:隊列名稱
            2. durable:是否持久化,當mq重啟之后,還在
            3. exclusive:
                * 是否獨占。只能有一個消費者監聽這隊列
                * 當Connection關閉時,是否刪除隊列
            4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
            5. arguments:參數。

         */
        channel.queueDeclare(queueName, true, false, false, null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        參數:
            1. queue:隊列名稱
            2. autoAck:是否自動確認
            3. callback:回調對象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回調方法,當收到消息后,會自動執行該方法
                1. consumerTag:標識
                2. envelope:獲取一些信息,交換機,路由key...
                3. properties:配置信息
                4. body:數據
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收隊列的數據 body: " + new String(body));
            }
        };
        channel.basicConsume(queueName,true,consumer);

        //不需要關閉資源,因為消費者需要持續監聽隊列信息
    }
}

(3)消費者2:讀取隊列2的消息

package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */
public class Consumer_PubSub2 {
    //定義接收隊列的名稱
    final static String queueName = "test_fanout_queue2";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設置參數
        factory.setHost("127.0.0.1"); // ip  默認值 localhost
        factory.setPort(5672); //端口  默認值 5672
        factory.setVirtualHost("/test"); //虛擬機 默認值 /
        factory.setUsername("libai"); // 用戶名 默認 guest
        factory.setPassword("libai"); //密碼 默認值 guest
        //3. 創建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創建Channel
        Channel channel = connection.createChannel();
        //5. 創建隊列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        參數:
            1. queue:隊列名稱
            2. durable:是否持久化,當mq重啟之后,還在
            3. exclusive:
                * 是否獨占。只能有一個消費者監聽這隊列
                * 當Connection關閉時,是否刪除隊列
            4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
            5. arguments:參數。
         */
        channel.queueDeclare(queueName, true, false, false, null);
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        參數:
            1. queue:隊列名稱
            2. autoAck:是否自動確認
            3. callback:回調對象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回調方法,當收到消息后,會自動執行該方法
                1. consumerTag:標識
                2. envelope:獲取一些信息,交換機,路由key...
                3. properties:配置信息
                4. body:數據
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收隊列的數據 body: " + new String(body));
            }
        };
        channel.basicConsume(queueName,true,consumer);
        //不需要關閉資源,因為消費者需要持續監聽隊列信息
    }
}

3、測試

啟動所有消費者,然后使用生產者發送消息;在每個消費者對應的控制臺可以查看到生產者發送的所有消息;到達廣播的效果。

  • 消費者1接收到的消息:

  • 消費者2接收到的消息:

從結果來看,生產者只需要發送一條消息,其余的消費者全部收到了消息,達到了廣播的效果。

4、小結

交換機需要與隊列進行綁定,綁定之后;一個消息可以被多個消費者都收到。

發布訂閱模式與工作隊列模式的區別:

  • 工作隊列模式不用定義交換機,而發布/訂閱模式需要定義交換機。
  • 發布/訂閱模式的生產方是面向交換機發送消息,工作隊列模式的生產方是面向隊列發送消息(底層使用默認交換機)。
  • 發布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁 定到默認的交換機 。
責任編輯:姜華 來源: 今日頭條
相關推薦

2022-08-15 09:02:22

Redis模式訂閱消息

2023-11-20 08:54:38

2025-01-09 11:15:47

2022-06-27 13:56:10

設計模式緩存分布式系統

2022-12-02 07:28:58

Event訂閱模式Spring

2009-11-05 10:07:37

WCF設計模式

2024-03-28 08:07:42

RabbitMQ訂閱模式

2021-08-02 17:21:08

設計模式訂閱

2024-07-29 08:34:18

C++訂閱者模式線程

2013-10-31 14:30:44

CloudaAPI

2023-12-04 08:24:23

2023-01-11 08:22:22

RabbitMQ通信模型

2024-05-31 08:53:56

2021-04-18 21:07:32

門面模式設計

2023-11-07 12:09:44

TopicKafka

2025-03-11 09:30:00

2012-08-30 09:07:33

設計模式

2021-04-14 09:02:22

模式 設計建造者

2023-05-17 08:16:04

RabbitMQ消息傳遞

2012-10-08 11:18:38

企業應用架構工作單元模式
點贊
收藏

51CTO技術棧公眾號

91pron在线| 欧美/亚洲一区| 午夜免费福利在线观看| 国产黄色片在线观看| 欧美日韩123区| 影院在线观看全集免费观看| 日韩欧美一级| 麻豆国产精品官网| 日韩国产精品亚洲а∨天堂免| 一区二区三区精品国产| 国产精品va在线观看视色| 日本视频在线观看一区二区三区| 久久国产精品99精品国产 | 欧美一级淫片videoshd| 色操视频在线| 大桥未久av一区二区三区| 亚洲熟妇无码另类久久久| 久久九九精品| 99久久无色码| 亚洲综合小说图片| 久久97精品久久久久久久不卡| 国产福利在线免费观看| 91国偷自产一区二区三区成为亚洲经典| 日韩精品xxxx| 久久精品国产在热久久| 国产精品国产一区二区| 成人激情开心网| 欧美激情第99页| 日韩美香港a一级毛片| 精品国产污网站| av中文字幕一区二区三区| 精品日韩美女的视频高清| 羞羞视频立即看| 国产日本亚洲高清| 久久综合九色综合88i| 国产成人亚洲综合a∨婷婷| 一区二区三区不卡在线| 日本美女视频一区二区| 少妇免费毛片久久久久久久久| 亚洲第一网站| 国产一区二区三区奇米久涩| 在线电影一区| 久精品国产欧美| 香蕉久久国产| 日韩欧美激情一区二区| 久久aⅴ国产紧身牛仔裤| 精品一区在线播放| 91成人国产| 手机在线看福利| 国产精品国产a| 久久久加勒比| 国产精品久久国产精品| 欧美xxxx性xxxxx高清| 这里是久久伊人| 成年人视频网站在线| 在线国产电影不卡| а√天堂在线官网| 亚洲电影天堂av| 蜜臀久久精品| 国产亚洲欧美另类中文| 日韩一级特黄| 性欧美xxxx| 成人午夜国产| 国产伦精品一区二区三区照片91| 99在线视频免费观看| 日韩在线免费| 日韩在线观看成人| 红杏视频成人| 91久久精品美女高潮| 国产精品最新自拍| 4444在线观看| 国产精品麻豆一区二区| 亚洲一区二区三区成人 | 69堂成人精品视频免费| 亚洲国产午夜| 性欧美18一19内谢| 久久久久久久久久久久久女国产乱| 超碰在线97免费| 亚洲18色成人| 羞羞网站在线免费观看| 日韩在线观看网站| 欧美日韩精品一区二区视频| 精品国产一二| 成人的网站免费观看| 国产美女视频黄a视频免费| 91福利在线免费观看| 涩涩视频在线播放| 1769国产精品| 久久精品一区二区国产| 亚欧在线免费观看| 欧美性生活影院| 高清av一区二区三区| 国产精品户外野外| 久热成人在线视频| 黄色三级高清在线播放| 精品国一区二区三区| 欧州一区二区三区| 黑人另类av| 亚洲国产精品ⅴa在线观看| 调教视频免费在线观看| 久久久精品欧美| 激情成人亚洲| 妞干网在线视频观看| 色视频一区二区| 亚洲成人a级片| 激情小说综合网| 国产精品无遮挡| 日韩精品极品| 99久久精品无码一区二区毛片| 成人禁用看黄a在线| 99青草视频在线播放视| 久久久久久久久久久久久久久久久久av | 日韩在线视频在线| 亚洲一区二区四区蜜桃| 成人免费在线小视频| 性伦欧美刺激片在线观看| 91免费视频黄| 久久男人av| 免费影院在线观看一区| 久久久不卡网国产精品二区| 在线观看国产原创自拍视频| 久久精品国产欧美亚洲人人爽| 综合一区二区三区| 国产精品拍拍拍| 欧美变态tickling挠脚心| 日韩欧美二区| 99视频在线免费| 久久撸在线视频| 777奇米四色成人影色区| 亚洲视频资源| 亚洲精品在线观看网站| 国内不卡的一区二区三区中文字幕| 国产精选在线观看91| 成人激情小说网站| 日本五码在线| 欧美一级高清大全免费观看| 黑人巨大精品欧美一区二区桃花岛| 国产精品香蕉国产| 中文子幕无线码一区tr| 亚洲成人av观看| 91香蕉视频网址| 日韩欧美色综合| 一本久道久久久| 四虎精品成人免费网站| 欧美怡春院一区二区三区| 久久一日本道色综合| ww久久中文字幕| 在线观看网站免费入口在线观看国内| 成人写真福利网| 亚洲成人www| 久久经典视频| 亚洲色成人一区二区三区小说| 中文字幕精品久久久久| 福利91精品一区二区三区| 精品福利在线| 91九色蝌蚪成人| 欧美激情一区二区三区四区| 一级网站免费观看| 国产成人免费91av在线| 91一区一区三区| 亚洲伊人精品酒店| 99re精彩视频| 日韩中文字幕不卡视频| 欧美激情第10页| 伊人222成人综合网| 亚洲黄色av网址| 欧美本精品男人aⅴ天堂| 麻豆亚洲精品| 二区在线播放| 日韩精品伦理第一区| 欧美一级二级三级蜜桃| 久久午夜精品一区二区| 美女精品视频| 日本不卡一区二区三区四区| 亚洲人成在线观| 91一区二区三区在线观看| 欧美日韩卡一| 国产黄视频网站| 国产精品h片在线播放| 欧美视频国产精品| 一本色道久久精品| xxx.xxx欧美| 99热久久这里只有精品| 欧美国产日韩一区二区| 欧美日韩一二| 国产一区在线观看视频| 国产99re66在线视频| 亚洲精品日韩精品| 欧美一级在线观看| 日韩精品免费专区| 色综合视频在线观看| 加勒比中文字幕精品| 九色在线91| 亚洲iv一区二区三区| 欧美精品成人一区二区三区四区| 六月婷婷一区| 亚洲在线资源| 高清欧美精品xxxxx在线看| 久久久免费看| 宅男66日本亚洲欧美视频|