国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

聊聊 Redis 的發(fā)布訂閱設(shè)計與實現(xiàn)

數(shù)據(jù)庫 Redis 開發(fā)
關(guān)于 Redis 發(fā)布訂閱的設(shè)計與實現(xiàn),本質(zhì)上就是通過一個個鏈表管理訂閱者,通過pub指令定位到channel后將消息遍歷發(fā)送到對應(yīng)客戶端socket上。

借一個午休的時光整理一下關(guān)于redis發(fā)布訂閱源碼的設(shè)計與實現(xiàn),通過本文的閱讀,你將會對發(fā)布訂閱模型的設(shè)計思想以及對哨兵間選舉通信的流程有著更底層的視角。

一、詳解redis中發(fā)布訂閱的設(shè)計

1. channel的設(shè)計

redis服務(wù)端啟動時,會初始化一個記錄channel以及channel訂閱者的鍵值對結(jié)構(gòu),它用channel的名稱作為key,用一個鏈表記錄這個訂閱這個channel的客戶端:

對此我們給出redis初始化函數(shù)initServer的代碼片段,可以看到其內(nèi)部調(diào)用dictCreate方法為pubsub_channels 這個記錄channel和channel訂閱者的指針初始化了一個頻道名稱為key,鏈表為value的字典:

void initServer(void) {
    //......
    // 初始化pubsub_channels存儲頻道信息,keylistDictType用頻道名稱作為key,訂閱者list作為value
    server.pubsub_channels = dictCreate(&keylistDictType,NULL);
 //......
}

2. pub/sub的實現(xiàn)

當(dāng)客戶端1通過SUBSCRIBE mychannel訂閱mychannel這個頻道,本質(zhì)上就是redis服務(wù)端解析SUBSCRIBE指令并調(diào)用subscribeCommand函數(shù),該方法會檢查這個channel是否存在,如果不存在則則以channel名稱為key,初始化一個鏈表作為value,將訂閱這個channel的客戶端追加到鏈表中。反之,如果channel存在則直接將客戶端信息存入鏈表即可:

圖片==圖片==

對此我們給出對應(yīng)的源碼實現(xiàn),該函數(shù)subscribeCommand位于pubsub.c可以看到其入口邏輯就是遍歷參數(shù)得到當(dāng)前客戶端想訂閱的頻道,然后調(diào)用pubsubSubscribeChannel將該客戶端追加到這個頻道的鏈表上:

void subscribeCommand(redisClient *c) {
    int j;
    //遍歷頻道將該客戶端存入
    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
     //將當(dāng)前客戶端標(biāo)識為做了發(fā)布訂閱   
    c->flags |= REDIS_PUBSUB;
}

我們步入pubsubSubscribeChannel方法即可看到上圖所說明的邏輯,如果對應(yīng)的頻道不存在,則初始化然后將客戶端追加到鏈表中,反之直接追加到鏈表中:

int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    //頻道添加到pubsub_channels中
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        //查看這個頻道的訂閱者鏈表是否存在
        de = dictFind(server.pubsub_channels,channel);
        //如果頻道不存在,則直接初始化鏈表
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        
        //將客戶端追加的鏈表尾巴
        listAddNodeTail(clients,c);
    }
   //......
}

同理,當(dāng)我們通過redis客戶端鍵入publish mychannel "hello"向mychannel 發(fā)送一個hello消息時,redis服務(wù)端會解析這條publish指令并調(diào)用publishCommand完成消息發(fā)布,通知到各個訂閱者:

圖片==圖片==

我們給出publishCommand的源碼,位于pubsub.c這個源代碼文件中,可以看到這段代碼會將channel和對應(yīng)的消息傳入pubsubPublishMessage方法中,并返回接收者數(shù):

void publishCommand(redisClient *c) {
    //發(fā)布消息返回接收者 PUBLISH <channel> <message>,返回接收者的數(shù)量
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
   //......
}

步入pubsubPublishMessage即可看到發(fā)布消息的核心邏輯,可以看到這個方法用receivers來記錄接收的通知者,它會先進(jìn)行精準(zhǔn)匹配,到pubsub_channels找到和channel名字一致的channel并向該channel的訂閱者發(fā)布消息,然后在進(jìn)行模糊匹配,遍歷所有的channel找到模糊匹配上的channel并向訂閱者發(fā)布消息:

int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    
    //查找名字相同的channel
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
       //......
  //移動至訂閱者鏈表首部
        listRewind(list,&li);
        //遍歷并向這些訂閱者發(fā)布消息
        while ((ln = listNext(&li)) != NULL) {
            redisClient *c = ln->value;

          //......
          //發(fā)布消息
            addReplyBulk(c,message);
            //接收數(shù)++
            receivers++;
        }
    }
    
    if (listLength(server.pubsub_patterns)) {
     //移動至channel鏈表首部
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        //遍歷channel
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
   //找到匹配的channel并發(fā)布消息
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
               //......
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
    }
    return receivers;
}

3. 哨兵如何利用發(fā)布訂閱完成消息通信的

關(guān)于pub/sub模式,redis中的哨兵就很好的利用這種模式進(jìn)行溝通和選舉等各個工作,當(dāng)我們的redis以哨兵的方式啟動時,redis會定期執(zhí)行哨兵的定時任務(wù),該任務(wù)會在檢查連接時檢查發(fā)布訂閱master的連接是否為空,若為空則調(diào)用異步連接綁定的方式訂閱master的"__sentinel__:hello"頻道,而該頻道主要負(fù)責(zé)下面這些工作:

  • Sentinel 實例的發(fā)現(xiàn)與信息交換:每個 Sentinel 實例會定期通過 __sentinel__:hello 頻道發(fā)布自己的信息,包括 Sentinel 的 IP 地址、端口、運行 ID、當(dāng)前配置的紀(jì)元(epoch)等。 其他 Sentinel 實例會訂閱這個頻道,從而感知到其他 Sentinel 的存在,并獲取它們的信息。 監(jiān)控主從節(jié)點的狀態(tài):
  • Sentinel 實例通過 __sentinel__:hello 頻道共享它們對 Redis 主節(jié)點和從節(jié)點的監(jiān)控信息:例如,某個 Sentinel 實例檢測到主節(jié)點不可用時,會通過這個頻道通知其他 Sentinel 實例,以便它們確認(rèn)并共同決定是否進(jìn)行故障轉(zhuǎn)移。 故障轉(zhuǎn)移的協(xié)調(diào):

在故障轉(zhuǎn)移過程中,Sentinel 實例會通過 __sentinel__:hello 頻道交換信息,協(xié)調(diào)誰來執(zhí)行故障轉(zhuǎn)移操作,并確保只有一個 Sentinel 實例負(fù)責(zé)執(zhí)行。

void sentinelReconnectInstance(sentinelRedisInstance *ri) {
    //......
   
    //檢查發(fā)布訂閱是否為空
    if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) {
     //若為空則pc指針指向異步連接
        ri->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
        if (ri->pc->err) {
         //......
        } else {//如果沒有報錯,則訂閱__sentinel__:hello頻道
            int retval;

            //......
            //哨兵訂閱 __sentinel__:hello 頻道(也就是下面的常量SENTINEL_HELLO_CHANNEL),通過sentinelReceiveHelloMessages處理回調(diào)
            retval = redisAsyncCommand(ri->pc,
                sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
                    SENTINEL_HELLO_CHANNEL);
          //......
            }
        }
    }
   //......
}

通過master的hello頻道,哨兵會定期publish自己的信息到hello頻道,其他哨兵就可以基于這個頻道發(fā)現(xiàn)其他的哨兵由此完成通信:

對此我們給出哨兵定期發(fā)送hello的函數(shù)入口sentinelSendPeriodicCommands,這個方法會被定期執(zhí)行,其內(nèi)部邏輯一旦檢查到pub/sub時間間隔過長時就會發(fā)送調(diào)用sentinelSendHello向hello頻道發(fā)送當(dāng)前哨兵的信息讓其他哨兵感知:

void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    //......
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
       //......
    } else if ((now - ri->last_pong_time) > ping_period) {//超過ping間隔發(fā)ping
     //......
    } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
    
        //超過pub最大間隔SENTINEL_PUBLISH_PERIOD則發(fā)送發(fā)送哨兵自身ip端口等信息到hello頻道
        sentinelSendHello(ri);
    }
}

步入sentinelSendHello即可看到我們上文所說的邏輯,可以看到當(dāng)前哨兵會組裝個人信息通過異步連接cc指針維護的連接信息PUBLISH 個人信息到hello頻道:

int sentinelSendHello(sentinelRedisInstance *ri) {
   //......
   //獲取當(dāng)前哨兵ip
    if (sentinel.announce_ip) {
        announce_ip = sentinel.announce_ip;
    } else {
        if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1)
            return REDIS_ERR;
        announce_ip = ip;
    }
    //獲取當(dāng)前哨兵端口
    announce_port = sentinel.announce_port ?
                    sentinel.announce_port : server.port;

    //將數(shù)據(jù)拼接到payload中
    snprintf(payload,sizeof(payload),
        "%s,%d,%s,%llu," /* Info about this sentinel. */
        "%s,%s,%d,%llu", /* Info about current master. */
        announce_ip, announce_port, server.runid,
        (unsigned long long) sentinel.current_epoch,
        /* --- */
        master->name,master_addr->ip,master_addr->port,
        (unsigned long long) master->config_epoch);
    //將組裝的哨兵信息publish到hello頻道(SENTINEL_HELLO_CHANNEL就是hello頻道的常量變量值)   
    retval = redisAsyncCommand(ri->cc,
        sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
            SENTINEL_HELLO_CHANNEL,payload);
    //......
}

二、小結(jié)

自此我們將發(fā)現(xiàn)redis發(fā)布訂閱的設(shè)計與實現(xiàn),本質(zhì)上就是通過一個個鏈表管理訂閱者,通過pub指令定位到channel后將消息遍歷發(fā)送到對應(yīng)客戶端socket上,這里筆者也簡單的補充一句,從源碼中我們可以看到redis的發(fā)布訂閱模型沒有持久化機制,所以對于可靠性要求高的場景筆者還是不太建議使用pub/sub。

責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2025-03-20 09:54:47

2025-05-22 08:15:00

2023-05-26 08:24:17

短信渠道模型

2020-09-15 10:25:13

Redis命令Java

2022-10-18 08:28:38

運營活動實現(xiàn)邏輯整體協(xié)作

2024-01-10 08:16:08

Redis集成JMS

2025-02-19 10:27:48

哨兵Redis故障轉(zhuǎn)移

2025-01-23 08:53:15

2024-07-02 11:42:53

SpringRedis自定義

2021-08-05 06:54:05

觀察者訂閱設(shè)計

2020-01-02 09:57:09

Redis訂閱發(fā)布

2022-12-02 07:28:58

Event訂閱模式Spring

2009-11-05 10:07:37

WCF設(shè)計模式

2024-05-14 08:03:51

C#EventArgs?屬性

2023-12-14 10:10:09

pythonRedis開發(fā)

2024-11-04 08:00:00

Netty客戶端

2024-10-11 11:50:05

Redis適用場景

2023-02-10 08:59:42

業(yè)務(wù)技術(shù)核心

2022-08-15 09:02:22

Redis模式訂閱消息

2021-06-01 06:59:58

運維Jira設(shè)計
點贊
收藏

51CTO技術(shù)棧公眾號

日本亚洲欧洲色α| 久久免费视频网站| 国产视频一区二区三区在线播放| 日韩av黄色在线| 婷婷久久综合九色国产成人| 日本一区二区三区视频在线观看| 欧美一区二区三区红桃小说| 亚洲裸体xxxx| 色综合久久久| 国产欧美精品区一区二区三区| 国产91社区| 欧美美女啪啪| 亚洲色图18p| 一区二区高清不卡| 中文字幕一区日韩精品欧美| 亚洲 欧洲 日韩| 欧美精品福利| 欧美亚洲国产日本| 亚洲精品三区| 精品国内亚洲2022精品成人| 国产精品7m凸凹视频分类| 在线a欧美视频| 欧美四级在线| 欧美美女喷水视频| 国产va在线| 久久品道一品道久久精品| 中文字幕一区二区三区有限公司| 精品二区视频| 97夜夜澡人人双人人人喊| 国产精品免费大片| 欧美精品videosex性欧美| 久久天天久久| 色婷婷久久一区二区| 性欧美18~19sex高清播放| 亚洲高清av在线| 在线观看av网站永久| 久久精品久久精品| 国产日韩欧美一区二区| 日韩一级毛片| 国产精品看片资源| 日韩在线麻豆| 2024亚洲男人天堂| 国产欧美一区二区三区米奇| 欧美大秀在线观看| 亚洲免费一区三区| 欧美日本国产在线| 日韩欧美中文字幕一区二区三区| 中文字幕日韩欧美| 欧美日韩破处视频| 久久在精品线影院精品国产| www.久久热| 欧美激情精品久久久久久久变态| 欧美影院视频| 国a精品视频大全| 粉嫩的18在线观看极品精品| 隔壁老王国产在线精品| 国产传媒欧美日韩成人精品大片| 国产成人精品综合| 羞羞色午夜精品一区二区三区| 亚洲va久久久噜噜噜| 亚洲黄页一区| 夜夜爽www精品| 大白屁股一区二区视频| mm1313亚洲国产精品无码试看| 国产精品视频yy9299一区| 国产一级二级在线| 欧美日韩一区 二区 三区 久久精品| 国产欧美久久久久久久久| 日韩电影大片中文字幕| 国产精品1区在线| 国产97在线观看| 欧美1区视频| 亚洲第一精品区| 91一区二区三区在线播放| av黄色免费| 欧美久久久久中文字幕| 亚洲欧美在线成人| 日韩av成人在线| 性欧美xxxx大乳国产app| 国产精品69久久久| 一区二区三区四区中文字幕| 在线a人片免费观看视频| 亚洲最新av在线| 九九视频精品全部免费播放| 精品国产电影| 成人精品小蝌蚪| av在线电影网站| 91精品视频网| 久久伊人久久| 高清视频一区| 国产在线看一区| 韩国版免费三体| 精品欧美乱码久久久久久1区2区| 欧美成人福利| 国产成人精品自拍| 99精品国产一区二区三区不卡| 视频黄页在线| 日韩黄色高清视频| 欧美日韩xxxx| 亚洲精品在线免费看| 国产精品久久久久久久久久免费看 | 亚洲第一福利网| 精品国产三区在线| 国产精品久久久对白| 国产成人亚洲综合a∨婷婷图片| 免费一级淫片| 一道本无吗dⅴd在线播放一区| 久久一区91| 亚洲人成无码网站久久99热国产 | 国产99久久久久久免费看农村| 裸体av在线| 亚洲图片制服诱惑| 欧美日韩国内| 日韩av卡一卡二| 欧美不卡一二三| 日本久久一二三四| 日本黄色片一级片| 欧美亚洲日本国产| 久久亚洲道色| 欧美日韩福利在线| 欧美精品黑人性xxxx| 日韩精品免费一区二区夜夜嗨| 警花观音坐莲激情销魂小说| 岛国av一区二区| 粉嫩的18在线观看极品精品| 欧美一级特黄aaaaaa在线看片| 在线免费av一区| 欧美午夜寂寞| 国产91在线视频观看| 亚洲国产精品yw在线观看 | 亚洲精品成人av| 亚洲91视频| www.com黄色片| 欲色天天网综合久久| 视频一区二区欧美| 欧美孕妇孕交xxⅹ孕妇交| 2019中文字幕在线观看| 成人av一区二区三区| 97在线超碰| 免费在线观看91| 欧美在线免费视屏| 在线亚洲a色| 激情综合色综合啪啪开心| 久久久久久久久91| 高清不卡一区二区在线| av免费看在线| 国产成人精品福利一区二区三区 | 国产福利片在线| 国产综合在线视频| 国产亚洲精品bt天堂精选| 中文字幕日本一区二区| 中文字幕一区二区三区四区五区六区| 欧美高清视频一二三区 | 色妞www精品视频| 93在线视频精品免费观看| 日本xxxx高清色视频| 91国产高清在线| 国产精品久久久久久亚洲伦| 亚洲图色一区二区三区| 日韩av综合在线观看| 久久精品视频导航| 国产亚洲综合性久久久影院| 综合成人在线| 一区二区三区韩国| 2025国产精品视频| 亚洲综合免费观看高清完整版 | 亚洲成人你懂的| 国产亚洲一卡2卡3卡4卡新区| 亚洲四虎av| 国产精品免费久久久久久| 亚洲国产欧美日韩另类综合| 国产欧美日韩| 神马久久久久| 鲁丝一区二区三区免费| 精品国产精品网麻豆系列| 麻豆国产精品一区二区三区| 综合另类专区| 免费黄色福利视频| 91a在线视频| 五月激情丁香一区二区三区| 国一区二区在线观看| 成人a在线视频免费观看| 亚洲人成77777| 中文字幕日韩精品有码视频| 国产精品色呦呦| 在线观看免费一区二区| 秋霞在线视频| 丝袜老师办公室里做好紧好爽| 国产91|九色| 欧美午夜精品伦理| 麻豆精品在线看| 91丨精品丨国产| 国内福利写真片视频在线 | 亚洲一区二区在线| 欧美mv日韩mv亚洲| 91免费看`日韩一区二区| 国产精品免费大片| 国产三区视频在线观看| 黄色成人在线看| 91中文字幕一区|