国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

異步Rust:構建實時消息代理服務器

開發 前端
我們已經探索了在Rust中創建一個簡單的消息代理,并使用WebSocket客戶端對其進行測試。這個例子突出了Rust在構建高效、并發的網絡應用程序方面的能力。

在本文中,我們將深入研究使用Rust構建實時消息代理服務器,展示其強大的并發特性。我們將使用Warp作為web服務器,并使用Tokio來管理異步任務。此外,我們將創建一個WebSocket客戶端來測試代理服務器的功能。

設計圖如下:

圖片圖片

構建消息代理服務器

消息代理服務器允許客戶端為主題生成事件并訂閱它們。它使用Warp作為HTTP和WebSocket服務器,使用Tokio作為異步運行時。

使用以下命令創建一個Rust項目:

cargo new real-ime-message

在Cargo.toml文件中加入以下依賴項:

[dependencies]
futures-util = "0.3.30"
tokio = {version = "1.35.1", features = ["full"]}
tokio-tungstenite = "0.21.0"
url = "2.5.0"
warp = "0.3.6"

在src/main.rs文件中定義一個Broker結構體:

use std::{
    collections::{HashMap, VecDeque},
    sync::Arc,
};

use futures_util::{SinkExt, StreamExt};
use tokio::sync::{
    mpsc::{self, UnboundedSender},
    RwLock,
};
use warp::{filters::ws::Message, Filter};

type Topic = String;
type Event = String;
type WsSender = UnboundedSender<warp::ws::Message>;

struct Broker {
    events: Arc<RwLock<HashMap<Topic, VecDeque<Event>>>>,
    subscribers: Arc<RwLock<HashMap<Topic, Vec<WsSender>>>>,
}
  • events:存儲每個主題的事件。
  • subscribers:跟蹤每個主題的訂閱者。

創建一個新的Broker實例:

impl Broker {
    fn new() -> Self {
        Broker {
            events: Arc::new(RwLock::new(HashMap::new())),
            subscribers: Arc::new(RwLock::new(HashMap::new())),
        }
    }
}

定義發布事件的方法produce:

impl Broker {
    ......

    async fn produce(&self, topic: Topic, event: Event) {
        let mut events = self.events.write().await;
        events
            .entry(topic.clone())
            .or_default()
            .push_back(event.clone());

        // 異步通知所有訂閱者
        let subscribers_list;
        {
            let subscribers = self.subscribers.read().await;
            subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();
        }

        for ws_sender in subscribers_list {
            // 將事件發送到WebSocket客戶端
            let _ = ws_sender.send(warp::ws::Message::text(event.clone()));
        }
    }
}

這個方法主要是將事件添加到相應的主題,然后將新事件通知所有訂閱者。

定義subscribe方法,來管理新的訂閱:

impl Broker {
    ......

    pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {
        let (ws_sender, mut ws_receiver) = socket.split();

        let (tx, mut rx) = mpsc::unbounded_channel::<Message>();

        {
            let mut subs = self.subscribers.write().await;
            subs.entry(topic).or_default().push(tx);
        }

        tokio::task::spawn(async move {
            while let Some(result) = ws_receiver.next().await {
                match result {
                    Ok(message) => {
                        // 處理有效的消息
                        if message.is_text() {
                            println!(
                                "Received message from client: {}",
                                message.to_str().unwrap()
                            );
                        }
                    }
                    Err(e) => {
                        // 處理錯誤
                        eprintln!("WebSocket error: {:?}", e);
                        break;
                    }
                }
            }
            println!("WebSocket connection closed");
        });

        tokio::task::spawn(async move {
            let mut sender = ws_sender;

            while let Some(msg) = rx.recv().await {
                let _ = sender.send(msg).await;
            }
        });
    }
}

這個方法主要是將WebSocket拆分為發送方和接收方,將訂閱者添加到訂閱者列表中,處理傳入的WebSocket消息。

main函數代碼如下:

#[tokio::main]
async fn main() {
    let broker = Arc::new(Broker::new());
    let broker_clone1 = Arc::clone(&broker);
    let broker_clone2 = Arc::clone(&broker);

    let produce = warp::path!("produce" / String)
        .and(warp::post())
        .and(warp::body::json())
        .and(warp::any().map(move || Arc::clone(&broker_clone1)))
        .and_then(
            move |topic: String, event: Event, broker_clone2: Arc<Broker>| async move {
                broker_clone2.produce(topic, event).await;
                Ok::<_, warp::Rejection>(warp::reply())
            },
        );

    let subscribe = warp::path!("subscribe" / String).and(warp::ws()).map(
        move |topic: String, ws: warp::ws::Ws| {
            let broker_clone3 = Arc::clone(&broker_clone2);
            ws.on_upgrade(move |socket| async move {
                broker_clone3.subscribe(topic.clone(), socket).await;
            })
        },
    );

    let routes = produce.or(subscribe);

    println!("Broker server running at http://127.0.0.1:3030");
    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}

實現WebSocket客戶端

WebSocket客戶端將模擬一個訂閱主題和接收消息的真實用戶。

在src/bin目錄下,創建一個ws_cli.rs文件。在文件中定義websocket_client函數,建立WebSocket連接并管理消息:

use futures_util::{sink::SinkExt, stream::StreamExt};
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::{sleep, Duration};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use url::Url;

async fn websocket_client(topic_url: &str) {
    // 解析要連接WebSocket服務器的URL
    let url = Url::parse(topic_url).expect("Invalid URL");

    // 連接到WebSocket服務器
    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

    println!("WebSocket client connected");

    let (mut write, mut read) = ws_stream.split();
    let message = Arc::new(RwLock::new(String::new()));
    let message_1 = message.clone();
    // 生成一個任務來處理傳入的消息
    tokio::spawn(async move {
        let msg_lock = message_1.clone();
        while let Some(message) = read.next().await {
            match message {
                Ok(msg) => {
                    let mut ms = msg_lock.write().await;
                    *ms = msg.to_text().unwrap().to_string();
                    println!("Received message: {}", msg.to_text().unwrap());
                }
                Err(e) => {
                    eprintln!("Error receiving message: {:?}", e);
                    break;
                }
            }
        }
    });

    // 發送消息
    loop {
        let msg_lock = message.clone();
        let ms = msg_lock.read().await;
        if let Err(e) = write.send(Message::Text(ms.to_string())).await {
            eprintln!("Error sending message: {:?}", e);
            break;
        }
        sleep(Duration::from_secs(5)).await;
    }
}

main函數代碼如下:

#[tokio::main]
async fn main() {
    websocket_client("ws://127.0.0.1:3030/subscribe/newtopic").await;
}

測試

執行如下命令運行消息代理服務器:

cargo run --bin real-ime-message

執行結果:

Broker server running at http://127.0.0.1:3030

然后打開一個新的命令行,執行如下命令運行WebSocket客戶端:

cargo run --bin ws_cli

執行結果:

WebSocket client connected

向http://127.0.0.1:3030/produce/newtopic接口發送post請求,如圖:

圖片圖片

客戶端接收到消息:

WebSocket client connected
Received message: This is a new event

總結

我們已經探索了在Rust中創建一個簡單的消息代理,并使用WebSocket客戶端對其進行測試。這個例子突出了Rust在構建高效、并發的網絡應用程序方面的能力。

責任編輯:武曉燕 來源: coding到燈火闌珊
相關推薦

2024-11-21 09:18:08

2009-02-10 15:42:00

代理服務器代理服務器設置

2024-02-20 14:53:01

2009-08-18 11:04:50

代理服務器設置代理服務器地址

2018-11-05 09:34:43

2009-12-16 16:41:44

Linux代理服務器

2012-09-18 09:55:28

2010-09-17 10:07:17

SIP協議SIP代理服務器

2009-02-27 13:13:00

代理服務器代理服務器軟件代理服務器設置

2011-08-17 11:26:10

2009-02-12 15:43:00

CCProxy代理服務器

2009-10-10 16:50:33

2009-11-24 19:36:34

代理服務器

2019-04-08 08:39:47

Nginx代理服務器

2010-03-09 11:21:24

代理服務器工作原理域名服務器工作原理

2018-04-17 12:10:40

2009-12-03 18:07:48

Squid代理服務器

2009-02-06 11:12:00

代理服務器代理服務器應用

2009-02-12 14:04:00

代理服務器LINUX架設服務器

2010-03-12 16:33:12

Python抓站
點贊
收藏

51CTO技術棧公眾號

欧美激情专区| 色综合久久中文字幕| 精品av综合导航| 在线成人激情黄色| 国产精品欧美久久| 91精品国产品国语在线不卡| 欧美色图亚洲自拍| 亚洲日本一区二区三区在线| 亚洲一区二区三区四区在线观看| 欧美精品一区在线发布| 电影一区二区在线观看| 日韩午夜电影在线观看| 免费观看v片在线观看| 国产91精品精华液一区二区三区| 成人中心免费视频| 久久69成人| 亚洲欧美国产精品久久久久久久 | 欧美日韩一区二区视频在线 | 欧美浪妇xxxx高跟鞋交| 樱桃视频免费看| 国产人妖乱国产精品人妖| 成人黄色片免费| 日本中文在线一区| 91夜夜未满十八勿入爽爽影院| 亚洲日本va| 欧美成人第一页| 日韩精品视频一区二区三区| 精品国产美女在线| 成人性生交大片免费观看网站| 91麻豆精品国产91| 黄色精品在线观看| 日韩一二三区视频| 多野结衣av一区| 精品久久久久久久久久久久久久久久久| 明星裸体视频一区二区| 精品二区久久| 午夜精品一区二区在线观看| 另类国产ts人妖高潮视频| 亚洲伊人久久大香线蕉av| 香蕉视频国产精品| 91精品黄色| 国产精品外国| 激情小视频网站| 久久久久国产精品人| 久草在线在线视频| 午夜精品福利久久久| 日本视频在线观看| 亚洲色图13p| 欧美女王vk| 鲁鲁视频www一区二区| 国产成人免费高清| 国产美女免费观看| 欧美一区国产二区| 香蕉成人在线| 国产精品xxxxx| 免费看黄色91| 国产一级视频| 精品国内二区三区| 麻豆精品久久| 国产综合第一页| 久久亚洲综合色| 国产视频第一页在线观看| 一本色道久久综合狠狠躁篇的优点| 美国成人xxx| 亚洲无玛一区| 在线观看视频91| 国产精品毛片久久久| 正在播放91九色| 欧美日韩国产中字| 性感美女一区二区在线观看| 日韩高清一级| 一区二区三区av| 午夜精品久久久久久久久久| 精品69视频一区二区三区| 国产精品国产三级欧美二区| 欧美国产日韩亚洲一区| 欧美wwww| 国产精品视频色| 国产精品久久免费看| 老司机成人影院| 欧美亚州在线观看| 欧美日韩一卡二卡三卡| 亚洲欧美精品午睡沙发| 国产亚洲欧美在线| 日本在线中文字幕一区二区三区| 成人免费黄色网| 天天亚洲美女在线视频| 免费看av成人| 成人au免费视频影院| 欧美伊久线香蕉线新在线| 久久综合色之久久综合| 日韩av影片| 国产资源第一页| 亚洲国产成人一区| 丝瓜av网站精品一区二区| 噜噜噜在线观看播放视频| 成人高清在线观看| 欧美日韩美女一区二区| 久久中文在线| 蜜桃视频在线观看免费视频| 丝袜足脚交91精品| 亚洲欧美精品在线| 亚洲国产裸拍裸体视频在线观看乱了中文| 黄色成人免费看| 久久天天躁狠狠躁夜夜躁2014| 在线女人免费视频| 国产91在线亚洲| 欧美在线亚洲在线| 在线播放国产精品二区一二区四区| 秋霞电影网一区二区| xvideos.蜜桃一区二区| 95在线视频| 午夜欧美福利视频| 国产精品久久国产三级国电话系列| 亚洲欧美激情视频| 欧美激情中文字幕一区二区| 成人羞羞在线观看网站| 精品无人乱码一区二区三区| 色网在线视频| 人人妻人人添人人爽欧美一区| 国产日韩在线一区二区三区| 久久手机免费视频| 欧美精品一区二区久久久| 亚洲激情图片qvod| 日韩欧美精品在线| 91超碰免费在线| 日本1区2区| 日本在线xxx| 国产精品一区二区三区四区五区| 日韩视频在线免费| 欧美日韩欧美一区二区| 久久精品一级爱片| 麻豆成人av在线| 亚洲三级国产| 亚洲情侣在线| 欧美一区三区| 日本久久成人网| 99久久亚洲国产日韩美女| 中文字幕在线播放网址| 欧美白人做受xxxx视频| 国产黄色影视| 777.av| 欧美狂欢多p性派对| 国产三区在线视频| 美女黄色片网站| 日韩av电影免费在线| 欧美福利一区二区三区| 国产乱码精品一区二区三区不卡| 91情侣在线视频| 国产精品久久77777| 国产精品一区二区久久| 国产成人涩涩涩视频在线观看| 国产精品高清在线观看| 国产精品v片在线观看不卡| 国产成人高潮免费观看精品| 国内精品视频久久| 国产91ⅴ在线精品免费观看| 亚洲18私人小影院| 97婷婷涩涩精品一区| 国产免费一区二区三区在线观看| 国产精品影片在线观看| 国产欧美日韩在线播放| 一区二区免费在线观看| 国产精品12345| 成年人在线播放| 国产黄色在线网站| 成人国产一区| 亚洲最大av| 国产91在线观看| 天天综合色天天综合| 亚洲二区在线播放视频| 久久久久国产精品www| 国产传媒一区二区三区| 亚洲日本精品一区| 369你懂的电影天堂| 精品众筹模特私拍视频| 国产亚洲一区| 粉嫩嫩av羞羞动漫久久久| 亚洲一区视频在线| 色诱女教师一区二区三区| 成人日韩在线电影| 桥本有菜av在线| 久久精品蜜桃| 日韩精品一区二区三区中文字幕 | 性久久久久久| 国产欧美精品一区二区三区四区 | 一级女性全黄久久生活片免费| 一区二区在线视频播放| 日韩精品久久久久久久电影99爱| 污视频在线免费观看网站| 超碰在线亚洲| 国产激情精品久久久第一区二区 | 欧美极品色图| 国内av免费| 久久99国产精品二区高清软件| 亚洲调教一区| 成人精品国产福利| 精品国产免费视频| 久久99精品久久久久久久青青日本 | √最新版天堂资源网在线|