第二十三章:WebSocket 实时通信

博客v1.0系列教程(Dioxus)博客 v1.0 系列教程 (Dioxus)

第二十三章:WebSocket 实时通信

1. WebSocket vs 传统 HTTP

| 特性 | HTTP 轮询 | WebSocket | |------|----------|-----------| | 连接方式 | 每次请求新建 | 一次连接持续复用 | | 数据流向 | 客户端请求→服务端响应 | 全双工双向 | | 实时性 | 取决于轮询间隔 | 毫秒级推送 | | 开销 | 每次请求包含完整 HTTP 头 | 连接后只传业务数据 | | 典型场景 | REST API | 实时聊天、通知、协作 |

2. 基础 WebSocket 连接

2.1 使用 web-sys 连接

use wasm_bindgen::prelude::*;
use web_sys::{MessageEvent, WebSocket};

#[wasm_bindgen]
extern "C" {
    #[wasm_bindgen(js_namespace = console)]
    fn log(s: &str);
}

fn connect_websocket(url: &str) -> WebSocket {
    let ws = WebSocket::new(url).unwrap_throw();

    // 连接打开
    let onopen = Closure::wrap(Box::new(move |_| {
        log("[WS] 连接已建立");
    }) as Box<dyn Fn(_)>);
    ws.set_onopen(Some(onopen.as_ref().unchecked_ref()));
    onopen.forget(); // 防止被 GC 回收

    // 接收消息
    let onmessage = Closure::wrap(Box::new(move |e: MessageEvent| {
        if let Some(text) = e.data().as_string() {
            log(&format!("[WS] 收到: {text}"));
        }
    }) as Box<dyn Fn(_)>);
    ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
    onmessage.forget();

    // 连接关闭
    let onclose = Closure::wrap(Box::new(move |_| {
        log("[WS] 连接已关闭");
    }) as Box<dyn Fn(_)>);
    ws.set_onclose(Some(onclose.as_ref().unchecked_ref()));
    onclose.forget();

    // 错误处理
    let onerror = Closure::wrap(Box::new(move |e: Event| {
        log(&format!("[WS] 错误: {:?}", e));
    }) as Box<dyn Fn(_)>);
    ws.set_onerror(Some(onerror.as_ref().unchecked_ref()));
    onerror.forget();

    ws
}

2.2 封装为 Hook

use dioxus::prelude::*;
use futures_channel::mpsc;
use wasm_bindgen::prelude::*;

#[derive(Clone, PartialEq)]
enum WsStatus {
    Disconnected,
    Connecting,
    Connected,
    Error(String),
}

struct WsMessage {
    text: String,
}

fn use_websocket(url: &str) -> (
    Signal<WsStatus>,
    Signal<Vec<WsMessage>>,
    impl Fn(String),
) {
    let status = use_signal(|| WsStatus::Disconnected);
    let messages = use_signal(|| Vec::<WsMessage>::new());
    let mut send_tx = use_signal(|| Option::<mpsc::UnboundedSender<String>>::None);

    let url = url.to_string();

    use_effect(move || {
        status.set(WsStatus::Connecting);

        let ws = WebSocket::new(&url).unwrap_throw();
        let msgs = messages.clone();

        // 发送通道
        let (tx, mut rx) = mpsc::unbounded::<String>();
        send_tx.set(Some(tx));

        // onopen
        let st = status.clone();
        let onopen = Closure::wrap(Box::new(move |_| {
            st.set(WsStatus::Connected);
            log("[WS] 已连接");
        }) as Box<dyn Fn(_)>);
        ws.set_onopen(Some(onopen.as_ref().unchecked_ref()));
        onopen.forget();

        // onmessage
        let onmessage = Closure::wrap(Box::new(move |e: MessageEvent| {
            if let Some(text) = e.data().as_string() {
                msgs.write().push(WsMessage { text });
            }
        }) as Box<dyn Fn(_)>);
        ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
        onmessage.forget();

        // onclose(自动重连)
        let st2 = status.clone();
        let onclose = Closure::wrap(Box::new(move |_| {
            st2.set(WsStatus::Disconnected);
            log("[WS] 断开,准备重连...");
            // 可以实现自动重连逻辑
        }) as Box<dyn Fn(_)>);
        ws.set_onclose(Some(onclose.as_ref().unchecked_ref()));
        onclose.forget();

        // 发送消息(通过 channel 接收)
        let ws_send = ws.clone();
        let _send_task = spawn(async move {
            use futures_util::StreamExt;
            while let Some(msg) = rx.next().await {
                let _ = ws_send.send_with_str(&msg);
            }
        });
    });

    let send_fn = move |msg: String| {
        if let Some(ref tx) = &*send_tx.read() {
            let _ = tx.unbounded_send(msg);
        }
    };

    (status, messages, send_fn)
}

3. 实时评论

3.1 评论组件

#[component]
fn LiveComments(article_slug: String) -> Element {
    let ws_url = format!("ws://localhost:5051/ws/comments/{article_slug}");
    let (status, messages, send) = use_websocket(&ws_url);
    let input = use_signal(String::new);

    rsx! {
        div { class: "space-y-4",
            h3 { class: "font-semibold", "实时评论" }

            // 连接状态
            div { class: "flex items-center gap-2",
                match status() {
                    WsStatus::Connected => rsx! {
                        span { class: "w-2 h-2 rounded-full bg-green-500" }
                        span { class: "text-xs", "已连接" }
                    },
                    WsStatus::Connecting => rsx! {
                        span { class: "w-2 h-2 rounded-full bg-yellow-500 animate-pulse" }
                        span { class: "text-xs", "连接中..." }
                    },
                    WsStatus::Error(_) => rsx! {
                        span { class: "w-2 h-2 rounded-full bg-red-500" }
                        span { class: "text-xs", "连接失败" }
                    },
                    _ => rsx! { span {} },
                }
            }

            // 消息列表
            div { class: "space-y-3 max-h-96 overflow-y-auto",
                for msg in messages.read().iter() {
                    CommentBubble { text: msg.text.clone() }
                }
            }

            // 输入框
            div { class: "flex gap-2",
                input {
                    class: "flex-1 border rounded-lg px-3 py-2 text-sm",
                    value: "{input}",
                    oninput: move |e| input.set(e.value()),
                    onkeydown: move |e| {
                        if e.key() == Key::Enter && !input().is_empty() {
                            send(input());
                            input.set(String::new());
                        }
                    },
                    placeholder: "输入评论..."
                }
                button {
                    class: "px-4 py-2 rounded-lg bg-blue-500 text-white text-sm",
                    disabled: matches!(status(), WsStatus::Disconnected | WsStatus::Connecting),
                    onclick: move |_| {
                        if !input().is_empty() {
                            send(input());
                            input.set(String::new());
                        }
                    },
                    "发送"
                }
            }
        }
    }
}

4. 在线状态与打字指示器

4.1 用户在线状态

// 心跳检测
fn use_heartbeat(ws_send: impl Fn(String)) {
    use_interval(30_000, move || {
        ws_send(serde_json::json!({
            "type": "ping",
            "timestamp": chrono::Utc::now().to_rfc3339()
        }).to_string());
    });
}

// 在线用户列表
#[component]
fn OnlineUsers() -> Element {
    let (status, messages, send) = use_websocket("ws://localhost:5051/ws/presence");
    let online_users = use_signal(|| Vec::<String>::new());

    // 解析消息中的在线用户列表
    use_effect(move || {
        for msg in messages.read().iter() {
            if let Ok(data) = serde_json::from_str::<serde_json::Value>(&msg.text) {
                if data["type"] == "presence" {
                    if let Some(users) = data["users"].as_array() {
                        let names: Vec<String> = users
                            .iter()
                            .filter_map(|u| u["name"].as_str().map(String::from))
                            .collect();
                        online_users.set(names);
                    }
                }
            }
        }
    });

    rsx! {
        div { class: "space-y-2",
            h4 { class: "text-sm font-medium",
                "在线 ({online_users.read().len()})"
            }
            for user in online_users.read().iter() {
                div { class: "flex items-center gap-2 text-sm",
                    span { class: "w-2 h-2 rounded-full bg-green-500" }
                    span { "{user}" }
                }
            }
        }
    }
}

4.2 打字指示器

#[component]
fn TypingIndicator(article_slug: String) -> Element {
    let (_, messages, send) = use_websocket(&format!("ws://localhost:5051/ws/typing/{article_slug}"));
    let typing_users = use_signal(|| Vec::<String>::new());
    let last_typing = use_signal(|| Instant::now());

    // 发送打字状态
    let send_typing = move || {
        if last_typing().elapsed() > std::time::Duration::from_secs(2) {
            send(serde_json::json!({"type": "typing"}).to_string());
            last_typing.set(Instant::now());
        }
    };

    // 解析收到的打字状态
    use_effect(move || {
        for msg in messages.read().iter() {
            // 更新正在输入的用户列表
        }
    });

    rsx! {
        if !typing_users.read().is_empty() {
            p { class: "text-xs", style: "color: var(--tertiary);",
                "{typing_users.read().join(\", \")} 正在输入..."
            }
        }
    }
}

5. 连接管理

5.1 自动重连

fn use_reconnecting_websocket(url: impl Fn() -> String) -> (
    Signal<WsStatus>,
    Signal<Vec<WsMessage>>,
    impl Fn(String),
) {
    let status = use_signal(|| WsStatus::Disconnected);
    let messages = use_signal(|| Vec::<WsMessage>::new());
    let reconnect_count = use_signal(|| 0u32);
    let max_retries = 5;

    let url = url();

    use_effect(move || {
        if matches!(*status.read(), WsStatus::Connected) {
            return;
        }

        let retries = reconnect_count();
        if retries >= max_retries {
            status.set(WsStatus::Error("重试次数已用完".to_string()));
            return;
        }

        // 指数退避
        let delay = std::time::Duration::from_secs(
            2u64.pow(retries.min(4)) // 2, 4, 8, 16, 16...
        );

        spawn(async move {
            tokio::time::sleep(delay).await;
            // 重新连接
            reconnect_count += 1;
        });
    });

    (status, messages, |_| {})
}

5.2 连接健康检查

fn use_ws_healthcheck(ws_send: impl Fn(String)) -> Signal<bool> {
    let healthy = use_signal(|| true);
    let last_pong = use_signal(|| Instant::now());

    // 每 30 秒发送 ping
    use_interval(30_000, move || {
        ws_send(serde_json::json!({"type": "ping"}).to_string());
    });

    // 检查是否收到 pong
    use_interval(10_000, move || {
        if last_pong().elapsed() > std::time::Duration::from_secs(60) {
            healthy.set(false);
        }
    });

    healthy
}

6. 消息序列化

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type")]
enum WsIncoming {
    #[serde(rename = "message")]
    Message {
        id: String,
        user: String,
        content: String,
        timestamp: String,
    },
    #[serde(rename = "presence")]
    Presence {
        users: Vec<String>,
    },
    #[serde(rename = "typing")]
    Typing {
        user: String,
    },
    #[serde(rename = "pong")]
    Pong,
    #[serde(rename = "error")]
    Error { code: u16, message: String },
}

#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
enum WsOutgoing {
    #[serde(rename = "message")]
    Message { content: String, article_slug: String },
    #[serde(rename = "ping")]
    Ping,
    #[serde(rename = "typing")]
    Typing,
}

// 解析消息
fn handle_message(text: &str, messages: &Signal<Vec<WsMessage>>) {
    if let Ok(msg) = serde_json::from_str::<WsIncoming>(text) {
        match msg {
            WsIncoming::Message { content, user, .. } => {
                messages.write().push(WsMessage { text: format!("{user}: {content}") });
            }
            WsIncoming::Pong => {
                // 更新健康检查
            }
            WsIncoming::Error { message, .. } => {
                console_log!("[WS] 服务器错误: {message}");
            }
            _ => {}
        }
    }
}

7. 完整集成示例

#[component]
fn LiveArticlePage(slug: String) -> Element {
    let ws_url = format!("ws://localhost:5051/ws/article/{slug}");
    let (status, messages, send) = use_reconnecting_websocket(move || ws_url.clone());

    rsx! {
        div { class: "max-w-4xl mx-auto",
            // 连接状态栏
            div { class: "sticky top-0 z-10 px-4 py-1",
                style: "background: var(--card); border-bottom: 1px solid var(--border);",
                div { class: "flex items-center justify-between",
                    match status() {
                        WsStatus::Connected => rsx! {
                            span { class: "text-xs text-green-600",
                                "🟢 实时已连接"
                            }
                        },
                        WsStatus::Connecting => rsx! {
                            span { class: "text-xs text-yellow-600",
                                "🟡 连接中..."
                            }
                        },
                        WsStatus::Error(ref e) => rsx! {
                            span { class: "text-xs text-red-600",
                                "🔴 {e}"
                            }
                        },
                        _ => rsx! { span {} },
                    }
                    // 在线人数
                    span { class: "text-xs", style: "color: var(--tertiary);",
                        "👥 6 人在线"
                    }
                }
            }
            // 文章内容
            ArticleContent { slug: slug.clone() }
            // 实时评论区
            LiveComments { article_slug: slug.clone() }
        }
    }
}

8. 安全与注意事项

| 问题 | 解决方案 | |------|---------| | 跨域连接 | 服务端配置 CORS 或使用同源地址 | | 连接泄露 | 组件卸载时主动关闭 WebSocket | | 消息频率 | 客户端和服务端都做频率限制 | | 鉴权 | 在 URL 或首次消息中传递 token | | 断线重连 | 指数退避,限制最大重试次数 | | 内存占用 | 限制消息列表长度,定期清理旧消息 |

// 组件卸载时清理
use_effect(move || {
    let ws = connect_websocket(&url);

    // 返回清理函数
    || {
        ws.close().ok();
        log("[WS] 连接已清理");
    }
});

9. 小结

  • WebSocket 实现全双工实时通信,适合评论、通知、在线状态等场景
  • web-sys::WebSocket 是 WASM 环境的底层 API
  • 封装为 use_websocket Hook,组件只需关心消息收发
  • 自动重连、健康检查、心跳机制确保连接稳定
  • JSON 序列化消息,用 #[serde(tag = "type")] 区分消息类型
  • 组件卸载时主动关闭连接,防止内存泄漏
  • 服务端和客户端都需要做频率限制和鉴权
dioxuswebsocketrealtimelive-chatpush