国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

RabbitMQ工作模式-Routing路由模式

開發(fā) 架構(gòu)
Routing模式要求隊(duì)列在綁定交換機(jī)時要指定Routing key,消息會轉(zhuǎn)發(fā)到符合Routing key的隊(duì)列。

Routing路由模式

1、模式說明

路由模式特點(diǎn):

  • 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)。
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時,也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會接收到消息。

圖解:

  • P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時,會指定一個routing key。
  • X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
  • C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
  • C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息

2。案例

在編碼上與 Publish/Subscribe發(fā)布與訂閱模式 的區(qū)別是交換機(jī)的類型為:Direct,還有隊(duì)列綁定交換機(jī)的時候需要指定routing key。

在寫案例之前,我們首先定義一下需求:

  • 生產(chǎn)者:發(fā)送兩條消息,一條消息的用于插入數(shù)據(jù),另一條消息用于更新數(shù)據(jù)。
  • 消費(fèi)者1:接收插入數(shù)據(jù)的消息,進(jìn)行數(shù)據(jù)插入。
  • 消費(fèi)者2:接收更新數(shù)據(jù)的消息,進(jìn)行數(shù)據(jù)更新。

(1)生產(chǎn)者

package com.lijw.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author Aron.li
 * @date 2022/3/3 8:16
 */
public class Producer_Routing {
    //交換機(jī)名稱
    static final String DIRECT_EXCHAGE = "direct_exchange";
    //隊(duì)列名稱
    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
    //隊(duì)列名稱
    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設(shè)置參數(shù)
        factory.setHost("127.0.0.1"); // ip  默認(rèn)值 localhost
        factory.setPort(5672); //端口  默認(rèn)值 5672
        factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /
        factory.setUsername("libai"); // 用戶名 默認(rèn) guest
        factory.setPassword("libai"); //密碼 默認(rèn)值 guest
        //3. 創(chuàng)建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //5. 創(chuàng)建交換機(jī)
        /*
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
           參數(shù):
            1. exchange:交換機(jī)名稱
            2. type:交換機(jī)類型
                DIRECT("direct"):定向
                FANOUT("fanout"):扇形(廣播),發(fā)送消息到每一個與之綁定隊(duì)列。
                TOPIC("topic") 通配符的方式
                HEADERS("headers") 參數(shù)匹配
            3. durable:是否持久化
            4. autoDelete:自動刪除
            5. internal:內(nèi)部使用。 一般false
            6. arguments:參數(shù)
        */
        channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT, true, false, false, null);
        // 6.聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時候自動刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
        // 7. 綁定隊(duì)列和交換機(jī)
        /*
            queueBind(String queue, String exchange, String routingKey)
            參數(shù):
                1. queue:隊(duì)列名稱
                2. exchange:交換機(jī)名稱
                3. routingKey:路由鍵,綁定規(guī)則
                    如果交換機(jī)的類型為fanout ,routingKey設(shè)置為""
         */
        channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
        channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");
        //8. 發(fā)送消息至交換機(jī),由交換機(jī)分發(fā)消息
        // 發(fā)送信息
        String message = "新增了商品。路由模式;routing key 為 insert " ;
        /**
         * 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage
         * 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱
         * 參數(shù)3:消息其它屬性
         * 參數(shù)4:消息內(nèi)容
         */
        channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
        System.out.println("已發(fā)送消息:" + message);
        // 發(fā)送信息
        message = "修改了商品。路由模式;routing key 為 update" ;
        /**
         * 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage
         * 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱
         * 參數(shù)3:消息其它屬性
         * 參數(shù)4:消息內(nèi)容
         */
        channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());
        System.out.println("已發(fā)送消息:" + message);
        //9. 釋放資源
        channel.close();
        connection.close();
    }
}

執(zhí)行發(fā)送消息:

發(fā)送消息之后,我們來看看聲明好的交換機(jī):

(2)消費(fèi)者1:專門接收 insert 的消息

package com.lijw.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */
public class Consumer_Routing1 {

    //隊(duì)列名稱
    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設(shè)置參數(shù)
        factory.setHost("127.0.0.1"); // ip  默認(rèn)值 localhost
        factory.setPort(5672); //端口  默認(rèn)值 5672
        factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /
        factory.setUsername("libai"); // 用戶名 默認(rèn) guest
        factory.setPassword("libai"); //密碼 默認(rèn)值 guest
        //3. 創(chuàng)建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //5. 創(chuàng)建隊(duì)列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        參數(shù):
            1. queue:隊(duì)列名稱
            2. durable:是否持久化,當(dāng)mq重啟之后,還在
            3. exclusive:
                * 是否獨(dú)占。只能有一個消費(fèi)者監(jiān)聽這隊(duì)列
                * 當(dāng)Connection關(guān)閉時,是否刪除隊(duì)列
            4. autoDelete:是否自動刪除。當(dāng)沒有Consumer時,自動刪除掉
            5. arguments:參數(shù)。

         */
        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        參數(shù):
            1. queue:隊(duì)列名稱
            2. autoAck:是否自動確認(rèn)
            3. callback:回調(diào)對象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回調(diào)方法,當(dāng)收到消息后,會自動執(zhí)行該方法
                1. consumerTag:標(biāo)識
                2. envelope:獲取一些信息,交換機(jī),路由key...
                3. properties:配置信息
                4. body:數(shù)據(jù)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收隊(duì)列的數(shù)據(jù) body: " + new String(body));
            }
        };
        channel.basicConsume(DIRECT_QUEUE_INSERT,true,consumer);

        //不需要關(guān)閉資源,因?yàn)橄M(fèi)者需要持續(xù)監(jiān)聽隊(duì)列信息
    }
}

(3)消費(fèi)者2:專門接收 update 的消息

package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */
public class Consumer_Routing2 {
    //隊(duì)列名稱
    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設(shè)置參數(shù)
        factory.setHost("127.0.0.1"); // ip  默認(rèn)值 localhost
        factory.setPort(5672); //端口  默認(rèn)值 5672
        factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /
        factory.setUsername("libai"); // 用戶名 默認(rèn) guest
        factory.setPassword("libai"); //密碼 默認(rèn)值 guest
        //3. 創(chuàng)建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //5. 創(chuàng)建隊(duì)列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        參數(shù):
            1. queue:隊(duì)列名稱
            2. durable:是否持久化,當(dāng)mq重啟之后,還在
            3. exclusive:
                * 是否獨(dú)占。只能有一個消費(fèi)者監(jiān)聽這隊(duì)列
                * 當(dāng)Connection關(guān)閉時,是否刪除隊(duì)列
            4. autoDelete:是否自動刪除。當(dāng)沒有Consumer時,自動刪除掉
            5. arguments:參數(shù)。
         */
        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        參數(shù):
            1. queue:隊(duì)列名稱
            2. autoAck:是否自動確認(rèn)
            3. callback:回調(diào)對象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回調(diào)方法,當(dāng)收到消息后,會自動執(zhí)行該方法
                1. consumerTag:標(biāo)識
                2. envelope:獲取一些信息,交換機(jī),路由key...
                3. properties:配置信息
                4. body:數(shù)據(jù)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收隊(duì)列的數(shù)據(jù) body: " + new String(body));
            }
        };
        channel.basicConsume(DIRECT_QUEUE_UPDATE,true,consumer);
        //不需要關(guān)閉資源,因?yàn)橄M(fèi)者需要持續(xù)監(jiān)聽隊(duì)列信息
    }
}

3、測試

啟動所有消費(fèi)者,然后使用生產(chǎn)者發(fā)送消息;在消費(fèi)者對應(yīng)的控制臺可以查看到生產(chǎn)者發(fā)送對應(yīng)routing key對應(yīng)隊(duì)列的消息;到達(dá)按照需要接收的效果。

  • 消費(fèi)者1 收到了 insert 的消息

  • 消費(fèi)者2 收到了 update 的消息

4、小結(jié)

Routing模式要求隊(duì)列在綁定交換機(jī)時要指定routing key,消息會轉(zhuǎn)發(fā)到符合routing key的隊(duì)列。

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2023-11-10 09:22:06

2021-04-18 21:07:32

門面模式設(shè)計

2021-04-14 09:02:22

模式 設(shè)計建造者

2010-08-06 09:17:37

RIP路由協(xié)議

2023-05-17 08:16:04

RabbitMQ消息傳遞

2012-10-08 11:18:38

企業(yè)應(yīng)用架構(gòu)工作單元模式

2023-09-26 01:21:34

2021-08-11 17:22:11

設(shè)計模式單例

2022-08-15 11:21:48

戴爾

2009-12-14 17:49:44

路由選擇協(xié)議

2025-04-21 04:00:00

2021-06-03 09:18:25

裝飾器模式包裝

2025-06-03 01:43:00

2010-08-05 13:04:05

路由器

2021-07-05 12:33:31

混合工作ITPagerDuty

2021-04-19 21:25:48

設(shè)計模式到元

2024-01-01 08:19:32

模式History前端

2023-06-05 08:14:17

RabbitMQ兔子MQ開源

2020-12-07 11:23:22

云計算混合云

2020-12-03 10:51:45

云計算混合云IT
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

亚洲bt欧美bt日本bt| 欧美13一14另类| 国产精品秘入口| 天堂影院一区二区| 曰韩少妇与小伙激情| 亚洲精选一区| 97热精品视频官网| 韩国成人二区| 亚洲成人av中文| 国产二级片在线观看| 在线亚洲精品| 国产精品视频在线播放| 成人黄色91| 日韩av在线影院| 黄色av网址在线免费观看| 久久久高清一区二区三区| 欧美日韩电影一区二区三区| 国产一区二区三区天码| 久久久精品视频成人| 肉肉视频在线观看| 亚洲第一搞黄网站| 亚洲爆乳无码专区| 狠狠色丁香久久婷婷综| 久久精彩视频| 欧美激情1区2区| 国产精品成人观看视频国产奇米| 91麻豆精品国产综合久久久 | 中国色在线日|韩| 欧美亚洲日本一区| 91在线网站| 亚洲乱码日产精品bd| 色婷婷综合久久久久中文字幕| 精品在线你懂的| 麻豆一区区三区四区产品精品蜜桃| 色爱综合网欧美| 国产精品成人免费视频| 九九在线精品| 欧美精品18videos性欧美| 福利视频一区| 最近2019中文字幕在线高清 | 奇门遁甲1982国语版免费观看高清| 亚洲欧美一级| 中文字幕亚洲自拍| 97精品国产99久久久久久免费| 亚洲国产精品电影在线观看| 麻豆福利在线观看| 日韩欧美国产电影| 欧美hdxxxxx| 日韩av在线资源| 免费观看亚洲| 国产亚洲视频在线观看| 成人毛片免费| 色综合久久88色综合天天看泰| 欧美xxxx黑人又粗又长密月| 忘忧草精品久久久久久久高清| 国产99视频在线观看| 国产91久久精品一区二区| 97精品国产97久久久久久免费| 久久99国产精品久久99大师| 色偷偷9999www| 亚洲一区二区三区久久久| 久久精品99国产精品酒店日本| av成人在线网站| 欧美激情综合色| 九九综合在线| 亚洲va久久久噜噜噜| 国语精品一区| 亚洲精品中文字幕乱码三区不卡| 日韩电影网1区2区| 人妻夜夜添夜夜无码av | 成人xxxxx| 欧美成人一品| 日韩中文一区二区三区| av成人免费| 欧美老肥婆性猛交视频| 久久av免费看| 国产三区二区一区久久| 激情综合色播激情啊| 成人一区二区三| 欧美特级www| 最近在线中文字幕| 欧美精品18videos性欧| 午夜精品999| 欧美日韩大片一区二区三区| 国产成人aaa| 国产福利电影网| 欧美日韩高清一区二区| 黑人巨大精品| 日本精品免费一区二区三区| 狠狠入ady亚洲精品| 欧美一区二区三区综合| 亚洲美女在线国产| а_天堂中文在线| 欧美专区国产专区| 日韩电影在线观看网站| 国产精品自拍视频在线| 欧美日本免费一区二区三区| 综合毛片免费视频| 国产精品99免视看9| 日韩精品成人一区二区在线| 一区二区三区韩国| 91精品麻豆日日躁夜夜躁| 高清一区二区中文字幕| 国产一区二区三区四区hd| 久久久久久电影| 91麻豆一二三四在线| 欧美激情一级二级| 久久综合网络一区二区| 国产视频三区| 尤物九九久久国产精品的特点| 91高清一区| 亚洲熟妇av一区二区三区| 蜜桃av一区二区| 成人免费观看www在线| 日韩三级av在线播放| 国产乱人伦精品一区| 日韩影片在线播放| 亚洲综合一区二区三区| 成人直播视频| 91九色在线免费视频| 欧美日韩久久精品| 超碰成人在线免费观看| 亚洲成av人影院| 婷婷六月激情| 亚洲欧美另类自拍| 亚洲欧美网站在线观看| 在线视频日韩一区| 亚洲精品一区二区三区婷婷月| 伊人久久大香线蕉综合四虎小说 | 色婷婷狠狠五月综合天色拍| 水蜜桃亚洲精品| 91成人国产精品| 国产va免费精品观看精品视频 | 日本一区二区三级电影在线观看| 麻豆映画在线观看| 欧美视频免费在线| 嗯用力啊快一点好舒服小柔久久| 中文字幕色呦呦| 精品国精品自拍自在线| 国内精品久久久久久久影视蜜臀| 成人永久免费网站| 久久全国免费视频| 久久精品水蜜桃av综合天堂| jizzjizz少妇亚洲水多| 成年人黄色在线观看| 精品福利一区二区三区 | 欧美久久久久| 一区二区三区四区在线免费视频| 国内精久久久久久久久久人| 久久精品一区蜜桃臀影院| 老司机精品视频网| 国产69精品久久久久999小说| 亚洲乱码一区二区| 国产成人aaa| 国产精品一区二区精品| 国产精品亚洲αv天堂无码| 久久综合色影院| 欧美极品aⅴ影院| 国产毛片精品| jlzzjlzz欧美| 成人免费在线视频网站| 偷拍日韩校园综合在线| 一二三区不卡| 欧美jizz18性欧美| 色爱区成人综合网| 国产亚洲欧洲高清| 国产亚洲精品中文字幕| 青草久久视频| 先锋影音欧美性受| 久久伊人资源站| 亚洲第一区中文99精品| 国产精品资源在线看| 国产精品一级在线观看| 亚洲四虎av| 96国产粉嫩美女| 欧美一区二区三区日韩| 国产美女娇喘av呻吟久久| 国产午夜亚洲精品一级在线| 91黑丝在线| 99影视tv| 日韩精品久久久久| 久久久久国产成人精品亚洲午夜| 无码日韩精品一区二区免费| 天堂在线中文字幕| 性欧美videosex高清少妇| 亚洲欧美日韩第一区| 久久久91精品国产一区二区精品| 国产一区国产二区国产三区| sese在线视频| 日韩成人午夜影院| 欧美亚洲伦理www| 欧美日韩国产精品成人| 国产99久久久国产精品潘金| 狠狠一区二区三区| www.黄在线观看| 777精品久无码人妻蜜桃| 国产精品午夜视频| 日韩精品视频在线| 最新成人av在线| 久久尤物视频|