国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

如何基于Netty實現即時消息下發

開發 前端
采用輪詢拉取消息就會出現上面的場景,而如果采用長連接的方式,服務器可以與客戶端建立一條實時的連接,服務器有新消息可以直接推送給客戶端,不需要等待客戶端請求,這樣既保證了實時性,整個系統的抗壓能力也優于大量輪詢的方式。

想象一個場景,你與女友在網上聊天,她問了一句:你愛我嗎?然后很忐忑地等你回答。可等了好一段時間,你才收到她的消息,趕緊回復了一句:愛,愛你一萬年。又過了好久,你女友才收到你的回復。這時,你說的是什么已經不重要了,準備回去跪鍵盤吧。可以說這樣的用戶體驗非常的糟糕。

采用輪詢拉取消息就會出現上面的場景,而如果采用長連接的方式,服務器可以與客戶端建立一條實時的連接,服務器有新消息可以直接推送給客戶端,不需要等待客戶端請求,這樣既保證了實時性,整個系統的抗壓能力也優于大量輪詢的方式。

什么是長連接通信

那么,什么是長連接呢?我們都知道短連接是什么,比如我們熟悉的 HTTP 協議,就是使用短連接的方式來請求數據的,它先是建立連接,然后進行數據傳輸,最后關閉連接。而且只能由客戶端主動發起請求,數據傳輸之后,連接就關閉了,服務端無法主動給客戶端發送數據。

而長連接是和短連接相對的,它的過程是:建立連接—>數據傳輸...(保持連接)……數據傳輸—>關閉連接。客戶端與服務端建立連接之后,客戶端和服務端保持住連接不斷開,就可以一直在這個連接上傳輸數據,直到一方主動關閉連接。

圖片圖片

如何建立長連接通信

那怎么建立長連接通信呢?我們常見的網絡服務例如 Tomcat、Apache 等主要都是面向短連接的,對長連接支持不是很好。而且長連接需要服務端長期保持連接,如果有大量的連接同時在線,服務端的壓力會非常大,所以,就需要一套高性能的網絡框架來支撐。幸運的是,有 Netty 這樣的異步網絡框架來幫助我們管理連接。

你可能多少了解過 Netty,它是基于事件驅動的,易開發、易維護、高性能,完全滿足長連接通信的需求。我們使用 Netty 實現我們的服務端,當然也可以實現客戶端,但是我們的客戶端一般會根據不同的平臺采用不同的實現方案。

有服務端,客戶端之后,我們還不可以進行通信,因為缺少一個通信協議。我們知道,進行短連接通信的時候采用的是 HTTP 協議,而這次我們要采用 MQTT,一個物聯網的標準信息傳輸協議。它是一個十分輕量級的發布/訂閱模型協議,占用網絡帶寬極小,因為它的固定消息頭只占 2 字節,已經被廣泛應用于電信、汽車、工業制造等領域。

服務端、協議、客戶端,我們都已經知道采用的方案了,來看下整體系統結構:

圖片圖片

現在,我們一起動手實現這樣一個消息下發服務端吧。我們只需要引入 Netty 的 jar 包即可,代碼如下:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.54.Final</version>
</dependency>

啟動一個 Netty 服務,如同我們正常啟動 Java 程序一樣:

public static void main(String[] args) {
    int port = 1883;
    if (args.length >= 1) {
        port = Integer.parseInt(args[1]);
    }
    NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(MqttEncoder.INSTANCE);
                        pipeline.addLast(new MqttDecoder());
                        //處理MQTT消息
                        pipeline.addLast(MyMqttHandlers.INSTANCE);
                    }
                });
        //啟動服務
        ChannelFuture future = serverBootstrap.bind(port).sync();
        System.out.println("MQTT server start success,port=" + port);
        future.channel().closeFuture().sync();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

通過這段代碼我們啟動了 Netty 長連接服務,服務端口是 1883,客戶端可以通過這個端口連接到服務端,Netty 本身已經實現了 TCP 連接的建立、管理以及 MQTT 協議的編碼和解碼等,我們只需要按照需求實現自己的業務邏輯即可。上述代碼中,我們只需要實現 MyMqttHandlers.INSTANCE,來完成我們對客戶端連接的認證、Topic 訂閱、消息發布、心跳檢測等等。

上面的 MyMqttHandlers 類,是我們自定義的 Netty Handler,用來處理 MQTT 業務數據,它繼承自 Netty 的適配器 SimpleChannelInboundHandler,僅需覆寫我們關心的方法 channelRead0,根據不同的 MQTT 報文類型做處理。

.....
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) {
    switch (msg.fixedHeader().messageType()) {
        case CONNECT:
            connect(ctx, (MqttConnectMessage) msg);
            break;
        case SUBSCRIBE:
            subscribe(ctx, (MqttSubscribeMessage) msg);
            break;
        case PINGREQ:
            pingReq(ctx);
            break;
        //...處理其他報文
        default:
    }
}
....

通信報文處理

怎么處理這些報文呢?MQTT 采用的是發布訂閱模式的消息通信協議,通過交換預定義的 MQTT 控制報文來通信。這里簡單介紹下 MQTT 協議的內容,因為在我們進行編碼的時候需要解析消息內容、回復 ACK 消息、發布消息,了解消息結構,可以更好地編碼。

MQTT 控制報文由固定報頭、可變報頭、有效載荷三部分組成,具體格式如下表:

圖片

根據 MQTT 3.1.1 規定,固定報頭的控制報文類型共有 14 種,我們這次主要使用 CONNECT(連接服務端)、SUBSCRIBE(訂閱主題)、PUBLISH(發布消息)、PINGRESP(心跳響應)這四種報文以及對應的 ACK 報文。

CONNECT 報文如何處理?客戶端到服務端的網絡連接建立后,客戶端發送給服務端的第一個報文必須是 CONNECT 報文,這個報文傳輸設備標識、用戶標識、密碼等信息,通過這個報文,服務端需判斷要不要和客戶端連接,常用的方法就是鑒權。如果校驗失敗,就可以在 ACK 報文中設置狀態碼 CONNECTION_REFUSED_xxx;檢驗成功,則設置為 CONNECTION_ACCEPTED。鑒權成功之后,我們就可以把該設備的信息入庫保存,實際場景中,我們把設備的實時狀態維護在 Redis 中,保證高的吞吐量。

代碼如下:

private void connect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
    String clientIdentifier = msg.payload().clientIdentifier();
    String userName = msg.payload().userName();
    String password = new String(msg.payload().passwordInBytes());
    //此處可以鑒權
    System.out.println(clientIdentifier + " " + userName + " " + password);
    //此處保存用戶和連接之間的關系
    ChannelManager.saveChannelMapping(clientIdentifier, ctx.channel());
    MqttFixedHeader connAckFixedHeaderRes = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
    //連接成功設置為MqttConnectReturnCode.CONNECTION_ACCEPTED,失敗可以返回其他狀態碼
    MqttConnAckVariableHeader connAckVariableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false);
    MqttConnAckMessage ackMessage = new MqttConnAckMessage(connAckFixedHeaderRes, connAckVariableHeader);
    ctx.channel().writeAndFlush(ackMessage);
}

SUBSCRIBE 報文一般作為 CONNECT 之后的下一個報文,客戶端上報它需要的 Topic,服務端可以根據客戶端的訂閱情況,針對性地推送消息,這個可以是廣播的 Topic(所有用戶都可以收到同一個消息的副本),也可以是點對點的(只有一個用戶收到此消息)。同時,服務端需要存儲 Topic 到 Channel 的關系。代碼如下:

private void subscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) {
    List<MqttTopicSubscription> topics = msg.payload().topicSubscriptions();
    //存儲客戶端訂閱的主題
    ChannelManager.saveTopics(ctx.channel(),
            topics.stream().map(MqttTopicSubscription::topicName).collect(Collectors.toList()));
    System.out.println("訂閱成功:" + topics);
    MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
    MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(msg.variableHeader().messageId(), null);
    MqttSubAckPayload payload = new MqttSubAckPayload();
    MqttSubAckMessage ackMessage = new MqttSubAckMessage(header, variableHeader, payload);
    ctx.writeAndFlush(ackMessage);
}

PUBLISH 報文是我們最終的目標報文,服務端需要根據客戶端訂閱的 Topic 發送這個報文,由于在處理訂閱消息時,已經保存了 Topic 和 Channel 的映射,所以推送消息就簡單了,只需要找到 Topic 下所有的 Channel,就可以直接寫消息到 Channel 中即可,代碼如下:

List<Channel> channels = ChannelManager.listChannels(topic);
channels.forEach(channel -> {
    MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, 0);
    ByteBuf payload = Unpooled.copiedBuffer(messageData, StandardCharsets.UTF_8);
    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
    MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(fixedHeader, variableHeader, payload);
    channel.writeAndFlush(mqttPublishMessage);
});

為何要處理 PINGREQ 報文?鏈路上如果長時間沒有數據傳輸,可能會被運營商把鏈路回收了,所以設備需要在保活期間內至少發送一個報文,如果沒有實際的數據需要傳輸,那么較小的 PINGREQ 就是最佳選擇。連接保活時間的取值范圍一般為 30 秒~1200 秒。這個可以根據實際情況,不斷調整這個值,我的選擇是是 60 秒,如果網絡環境好,可以設置 500 秒以上。

實際在線上運行的時候,我發現有些客戶端就是一直無法連接上,這時可以再結合短輪詢做個備用方案,當多次嘗試之后,無法連接上 MQTT 服務,可以暫時啟動短輪詢,保證用戶可以收到消息。

好了,到目前我們已經處理完核心功能了,其它類型的控制報文和處理流程也類似,這里就不再贅述了,總體報文交換流程如下圖:

圖片圖片

長連接通信測試

我們來試一下效果吧,首先我們需要模擬一個客戶端,同樣的,也可以使用 Netty 實現一個客戶端,主要流程和服務端差不多,有一點需要注意的是,客戶端需要定時發送心跳到服務端,以保證鏈路不會因為長時間空閑被系統斷開。

測試流程如下:

  1. 啟動服務端,端口在 1883
  2. 啟動客戶端,連接到服務端 127.0.0.1:1883
  3. 客戶端訂閱 Topic,名稱為 demo
  4. 服務端每隔 1 秒向 demo 發送一個當前時間的消息

看下運行效果:

圖片圖片

圖片

總結

以上就是我今天的分享,通過 Netty 和 MQTT,我們可以實現一個高性能的消息下發系統,當然,我今天講的是最基本的功能實現,當連接數超過一臺機器的上限時,就需要設計一個可擴展的架構。我把整體的知識點匯總成一張思維導圖,供你參考。

圖片圖片

責任編輯:武曉燕 來源: 程序員技術充電站
相關推薦

2023-08-14 08:01:12

websocket8g用戶

2021-03-25 08:29:33

SpringBootWebSocket即時消息

2020-10-09 12:45:19

創建消息即時消息編程語言

2020-10-09 15:00:56

實時消息編程語言

2020-03-31 12:21:20

JSON即時消息編程語言

2019-09-29 15:25:13

CockroachDBGoJavaScript

2019-10-28 20:12:40

OAuthGuard中間件編程語言

2020-10-16 14:40:20

即時消息Home頁面編程語言

2020-10-19 16:20:38

即時消息Conversatio編程語言

2020-10-12 09:20:13

即時消息Access頁面編程語言

2015-03-18 15:37:19

社交APP場景

2020-10-10 20:51:10

即時消息編程語言

2009-06-29 09:06:42

微軟Web版MSN

2010-05-24 09:51:37

System Cent

2021-02-05 07:28:11

SpringbootNettyWebsocke

2010-05-20 17:45:46

OCS 2007 R2

2021-12-03 00:02:01

通訊工具即時

2021-11-24 08:55:38

代理網關Netty

2022-08-30 11:41:53

網絡攻擊木馬

2015-03-09 10:33:14

即時通信管道過濾
點贊
收藏

51CTO技術棧公眾號

欧美在线国产精品| 国产精品免费观看视频| 欧美videos大乳护士334| 自拍偷拍21p| 国产精品一区二区久激情瑜伽| 国产专区欧美专区| 日韩视频在线直播| 91精品国产综合久久精品app | 色婷婷在线播放| 欧美性xxxxx极品| 成人手机在线电影| 99精品久久只有精品| 国产精品国产三级国产aⅴ入口 | 日本大臀精品| 一区二区三区四区视频精品免费 | 亚洲欧美日韩精品久久久| 亚洲国产欧美日韩在线观看第一区| 精品偷拍各种wc美女嘘嘘| 国产免费av在线| 一本一道波多野结衣一区二区| 手机在线免费观看毛片| 懂色av一区二区夜夜嗨| 无码毛片aaa在线| 精品一区二区在线视频| 在线视频不卡一区二区三区| 丝袜美腿亚洲综合| 日韩一区免费观看| 日韩电影在线免费| 亚洲黄色成人久久久| 韩国三级电影一区二区| 裸体裸乳免费看| 99久久久久免费精品国产| aaa毛片在线观看| 亚洲品质自拍视频| 国产区av在线| 国产视频亚洲视频| xxxx日韩| 国产一区二区中文字幕免费看| 嫩草成人www欧美| 精品少妇人妻av免费久久洗澡| 国产午夜精品久久久久久久 | 欧美伊人久久久久久久久影院| 欧美另类tv| 九九热精品视频在线播放| 国产精品成人一区二区不卡| 99久久综合狠狠综合久久止| 蜜桃视频在线一区| 成人天堂av| 日韩高清av一区二区三区| 亚洲素人在线| 日本精品免费视频| 欧美日韩亚洲国产一区| 在线天堂资源www在线污| 啪一啪鲁一鲁2019在线视频| 国产精品免费看| 99riav视频| 亚洲精品suv精品一区二区| 国产尤物久久久| 青青在线免费视频| 精品久久久久国产| 国产一区二区在线观| 午夜视频久久久| 一区二区三区高清在线视频 | 粉嫩av四季av绯色av第一区| 国产亚洲精品一区二555| 精品制服美女久久| 国产午夜亚洲精品一级在线| 欧美在线3区| 亚洲综合视频在线| 里番在线播放| 久久国产精品 国产精品| 色综合天天综合色综合av | 小早川怜子影音先锋在线观看| 亚洲成人精品一区二区三区| 黄色av免费在线播放| 日本在线视频www色| 日韩激情久久| 精品亚洲欧美日韩| 国产日韩欧美一区二区三区四区| 91欧美视频网站| 成人高清视频观看www| 国产精品久久久久久久久久久久久| 2019av中文字幕| 国产精品成人v| 国产精品爱久久久久久久| 日本中文字幕成人| 成人亚洲激情网| 国产一区二区视频在线免费观看| 欧美日韩国产第一页| 奇门遁甲1982国语版免费观看高清| 欧美亚洲国产日本| 成人在线中文字幕| 国产高清自拍99| 日韩欧美一区二区在线观看| 色哺乳xxxxhd奶水米仓惠香| 国产黄色激情视频| 369你懂的电影天堂| 在线看片你懂得| 激情在线视频播放| 97久久超碰| 一区二区三区毛片免费| 日韩高清不卡在线| 成人国产精品免费观看视频| 国产精品国产三级国产有无不卡 | 国产精品久久二区二区| 都市激情亚洲色图| 欧美日韩另类一区| 国产一区二区三区视频在线观看| 欧美国产精品日韩| 成人免费视频网站入口| 91黄色在线看| 青春有你2免费观看完整版在线播放高清 | 欧美人与动牲交xxxxbbbb| 99re免费99re在线视频手机版| 久草视频在线看| 九色精品蝌蚪| 一区二区三区国产盗摄| 久久精品水蜜桃av综合天堂| 色呦呦网站一区| 欧美精品情趣视频| 国产在线精品日韩| 亚洲精品久久久中文字幕| missav|免费高清av在线看| 综合亚洲自拍| 26uuu精品一区二区在线观看| 精品国产a毛片| 亚洲最大的免费| 国产aa视频| 91精品店在线| 久久精品国产亚洲高清剧情介绍 | 成人在线综合网| 欧美老人xxxx18| 亚洲成人动漫精品| 日韩欧美高清一区| 日韩av在线资源| 亚洲成人免费在线视频| 亚洲成人av电影| 欧美日本精品一区二区三区| 不卡视频免费播放| 黄色成人在线网址| 综合久久综合久久| 日韩亚洲欧美中文高清在线| av在线不卡观看| 亚洲美女电影在线| 九色精品91| 福利微拍一区二区| 成人免费午夜电影| 羞羞网www| 手机在线一区二区三区| 亚洲一区视频在线观看视频| 51久久精品夜色国产麻豆| 青青青国产在线观看| 日本不卡一二三| 久久成人精品无人区| 精品福利一区二区三区 | 亚洲欧美精品一区二区| 亚洲图片欧洲图片日韩av| 蜜臀av在线播放| 日韩国产精品久久久久久亚洲| 欧美日韩中文字幕精品| 久久五月天婷婷| 激情影院在线| 成人18视频在线播放| 欧美激情综合亚洲一二区| 国内av免费| 狠狠综合久久| 亚洲精品wwwww| 日韩精品―中文字幕| 亚洲视频精选| 亚洲图片一区二区| 欧美一区二区在线视频观看| 在线观看v片| 最新久久zyz资源站| 国产精品一区二区三区观看| 国模私拍视频在线播放| aaa欧美日韩| 99re资源| 美女久久精品| 欧美日韩免费视频| 丰满少妇被猛烈进入高清播放| 欧美日韩国产免费观看视频| 日韩精品一区在线| 妞干网免费视频| 男女羞羞视频教学| 国产欧美一区| 亚洲精选一区二区| 成人丁香基地| 99精品欧美一区二区三区综合在线| 国产欧美精品在线播放| 99久久久国产精品免费调教网站| 欧美日韩激情小视频| 亚洲色精品三区二区一区| 在线亚洲成人| 国产精品ⅴa在线观看h| 懂色av一区| heyzo一本久久综合| 精品无码久久久久久久动漫| 国内视频在线精品| 一区二区在线电影| 一本一本久久a久久精品综合妖精|