第二十三章:WebSocket 实时通信
博客v1.0系列教程(Dioxus)博客 v1.0 系列教程 (Dioxus)
第二十三章:WebSocket 实时通信
1. WebSocket vs 传统 HTTP
| 特性 | HTTP 轮询 | WebSocket | |------|----------|-----------| | 连接方式 | 每次请求新建 | 一次连接持续复用 | | 数据流向 | 客户端请求→服务端响应 | 全双工双向 | | 实时性 | 取决于轮询间隔 | 毫秒级推送 | | 开销 | 每次请求包含完整 HTTP 头 | 连接后只传业务数据 | | 典型场景 | REST API | 实时聊天、通知、协作 |
2. 基础 WebSocket 连接
2.1 使用 web-sys 连接
use wasm_bindgen::prelude::*;
use web_sys::{MessageEvent, WebSocket};
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = console)]
fn log(s: &str);
}
fn connect_websocket(url: &str) -> WebSocket {
let ws = WebSocket::new(url).unwrap_throw();
// 连接打开
let onopen = Closure::wrap(Box::new(move |_| {
log("[WS] 连接已建立");
}) as Box<dyn Fn(_)>);
ws.set_onopen(Some(onopen.as_ref().unchecked_ref()));
onopen.forget(); // 防止被 GC 回收
// 接收消息
let onmessage = Closure::wrap(Box::new(move |e: MessageEvent| {
if let Some(text) = e.data().as_string() {
log(&format!("[WS] 收到: {text}"));
}
}) as Box<dyn Fn(_)>);
ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
onmessage.forget();
// 连接关闭
let onclose = Closure::wrap(Box::new(move |_| {
log("[WS] 连接已关闭");
}) as Box<dyn Fn(_)>);
ws.set_onclose(Some(onclose.as_ref().unchecked_ref()));
onclose.forget();
// 错误处理
let onerror = Closure::wrap(Box::new(move |e: Event| {
log(&format!("[WS] 错误: {:?}", e));
}) as Box<dyn Fn(_)>);
ws.set_onerror(Some(onerror.as_ref().unchecked_ref()));
onerror.forget();
ws
}
2.2 封装为 Hook
use dioxus::prelude::*;
use futures_channel::mpsc;
use wasm_bindgen::prelude::*;
#[derive(Clone, PartialEq)]
enum WsStatus {
Disconnected,
Connecting,
Connected,
Error(String),
}
struct WsMessage {
text: String,
}
fn use_websocket(url: &str) -> (
Signal<WsStatus>,
Signal<Vec<WsMessage>>,
impl Fn(String),
) {
let status = use_signal(|| WsStatus::Disconnected);
let messages = use_signal(|| Vec::<WsMessage>::new());
let mut send_tx = use_signal(|| Option::<mpsc::UnboundedSender<String>>::None);
let url = url.to_string();
use_effect(move || {
status.set(WsStatus::Connecting);
let ws = WebSocket::new(&url).unwrap_throw();
let msgs = messages.clone();
// 发送通道
let (tx, mut rx) = mpsc::unbounded::<String>();
send_tx.set(Some(tx));
// onopen
let st = status.clone();
let onopen = Closure::wrap(Box::new(move |_| {
st.set(WsStatus::Connected);
log("[WS] 已连接");
}) as Box<dyn Fn(_)>);
ws.set_onopen(Some(onopen.as_ref().unchecked_ref()));
onopen.forget();
// onmessage
let onmessage = Closure::wrap(Box::new(move |e: MessageEvent| {
if let Some(text) = e.data().as_string() {
msgs.write().push(WsMessage { text });
}
}) as Box<dyn Fn(_)>);
ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
onmessage.forget();
// onclose(自动重连)
let st2 = status.clone();
let onclose = Closure::wrap(Box::new(move |_| {
st2.set(WsStatus::Disconnected);
log("[WS] 断开,准备重连...");
// 可以实现自动重连逻辑
}) as Box<dyn Fn(_)>);
ws.set_onclose(Some(onclose.as_ref().unchecked_ref()));
onclose.forget();
// 发送消息(通过 channel 接收)
let ws_send = ws.clone();
let _send_task = spawn(async move {
use futures_util::StreamExt;
while let Some(msg) = rx.next().await {
let _ = ws_send.send_with_str(&msg);
}
});
});
let send_fn = move |msg: String| {
if let Some(ref tx) = &*send_tx.read() {
let _ = tx.unbounded_send(msg);
}
};
(status, messages, send_fn)
}
3. 实时评论
3.1 评论组件
#[component]
fn LiveComments(article_slug: String) -> Element {
let ws_url = format!("ws://localhost:5051/ws/comments/{article_slug}");
let (status, messages, send) = use_websocket(&ws_url);
let input = use_signal(String::new);
rsx! {
div { class: "space-y-4",
h3 { class: "font-semibold", "实时评论" }
// 连接状态
div { class: "flex items-center gap-2",
match status() {
WsStatus::Connected => rsx! {
span { class: "w-2 h-2 rounded-full bg-green-500" }
span { class: "text-xs", "已连接" }
},
WsStatus::Connecting => rsx! {
span { class: "w-2 h-2 rounded-full bg-yellow-500 animate-pulse" }
span { class: "text-xs", "连接中..." }
},
WsStatus::Error(_) => rsx! {
span { class: "w-2 h-2 rounded-full bg-red-500" }
span { class: "text-xs", "连接失败" }
},
_ => rsx! { span {} },
}
}
// 消息列表
div { class: "space-y-3 max-h-96 overflow-y-auto",
for msg in messages.read().iter() {
CommentBubble { text: msg.text.clone() }
}
}
// 输入框
div { class: "flex gap-2",
input {
class: "flex-1 border rounded-lg px-3 py-2 text-sm",
value: "{input}",
oninput: move |e| input.set(e.value()),
onkeydown: move |e| {
if e.key() == Key::Enter && !input().is_empty() {
send(input());
input.set(String::new());
}
},
placeholder: "输入评论..."
}
button {
class: "px-4 py-2 rounded-lg bg-blue-500 text-white text-sm",
disabled: matches!(status(), WsStatus::Disconnected | WsStatus::Connecting),
onclick: move |_| {
if !input().is_empty() {
send(input());
input.set(String::new());
}
},
"发送"
}
}
}
}
}
4. 在线状态与打字指示器
4.1 用户在线状态
// 心跳检测
fn use_heartbeat(ws_send: impl Fn(String)) {
use_interval(30_000, move || {
ws_send(serde_json::json!({
"type": "ping",
"timestamp": chrono::Utc::now().to_rfc3339()
}).to_string());
});
}
// 在线用户列表
#[component]
fn OnlineUsers() -> Element {
let (status, messages, send) = use_websocket("ws://localhost:5051/ws/presence");
let online_users = use_signal(|| Vec::<String>::new());
// 解析消息中的在线用户列表
use_effect(move || {
for msg in messages.read().iter() {
if let Ok(data) = serde_json::from_str::<serde_json::Value>(&msg.text) {
if data["type"] == "presence" {
if let Some(users) = data["users"].as_array() {
let names: Vec<String> = users
.iter()
.filter_map(|u| u["name"].as_str().map(String::from))
.collect();
online_users.set(names);
}
}
}
}
});
rsx! {
div { class: "space-y-2",
h4 { class: "text-sm font-medium",
"在线 ({online_users.read().len()})"
}
for user in online_users.read().iter() {
div { class: "flex items-center gap-2 text-sm",
span { class: "w-2 h-2 rounded-full bg-green-500" }
span { "{user}" }
}
}
}
}
}
4.2 打字指示器
#[component]
fn TypingIndicator(article_slug: String) -> Element {
let (_, messages, send) = use_websocket(&format!("ws://localhost:5051/ws/typing/{article_slug}"));
let typing_users = use_signal(|| Vec::<String>::new());
let last_typing = use_signal(|| Instant::now());
// 发送打字状态
let send_typing = move || {
if last_typing().elapsed() > std::time::Duration::from_secs(2) {
send(serde_json::json!({"type": "typing"}).to_string());
last_typing.set(Instant::now());
}
};
// 解析收到的打字状态
use_effect(move || {
for msg in messages.read().iter() {
// 更新正在输入的用户列表
}
});
rsx! {
if !typing_users.read().is_empty() {
p { class: "text-xs", style: "color: var(--tertiary);",
"{typing_users.read().join(\", \")} 正在输入..."
}
}
}
}
5. 连接管理
5.1 自动重连
fn use_reconnecting_websocket(url: impl Fn() -> String) -> (
Signal<WsStatus>,
Signal<Vec<WsMessage>>,
impl Fn(String),
) {
let status = use_signal(|| WsStatus::Disconnected);
let messages = use_signal(|| Vec::<WsMessage>::new());
let reconnect_count = use_signal(|| 0u32);
let max_retries = 5;
let url = url();
use_effect(move || {
if matches!(*status.read(), WsStatus::Connected) {
return;
}
let retries = reconnect_count();
if retries >= max_retries {
status.set(WsStatus::Error("重试次数已用完".to_string()));
return;
}
// 指数退避
let delay = std::time::Duration::from_secs(
2u64.pow(retries.min(4)) // 2, 4, 8, 16, 16...
);
spawn(async move {
tokio::time::sleep(delay).await;
// 重新连接
reconnect_count += 1;
});
});
(status, messages, |_| {})
}
5.2 连接健康检查
fn use_ws_healthcheck(ws_send: impl Fn(String)) -> Signal<bool> {
let healthy = use_signal(|| true);
let last_pong = use_signal(|| Instant::now());
// 每 30 秒发送 ping
use_interval(30_000, move || {
ws_send(serde_json::json!({"type": "ping"}).to_string());
});
// 检查是否收到 pong
use_interval(10_000, move || {
if last_pong().elapsed() > std::time::Duration::from_secs(60) {
healthy.set(false);
}
});
healthy
}
6. 消息序列化
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type")]
enum WsIncoming {
#[serde(rename = "message")]
Message {
id: String,
user: String,
content: String,
timestamp: String,
},
#[serde(rename = "presence")]
Presence {
users: Vec<String>,
},
#[serde(rename = "typing")]
Typing {
user: String,
},
#[serde(rename = "pong")]
Pong,
#[serde(rename = "error")]
Error { code: u16, message: String },
}
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
enum WsOutgoing {
#[serde(rename = "message")]
Message { content: String, article_slug: String },
#[serde(rename = "ping")]
Ping,
#[serde(rename = "typing")]
Typing,
}
// 解析消息
fn handle_message(text: &str, messages: &Signal<Vec<WsMessage>>) {
if let Ok(msg) = serde_json::from_str::<WsIncoming>(text) {
match msg {
WsIncoming::Message { content, user, .. } => {
messages.write().push(WsMessage { text: format!("{user}: {content}") });
}
WsIncoming::Pong => {
// 更新健康检查
}
WsIncoming::Error { message, .. } => {
console_log!("[WS] 服务器错误: {message}");
}
_ => {}
}
}
}
7. 完整集成示例
#[component]
fn LiveArticlePage(slug: String) -> Element {
let ws_url = format!("ws://localhost:5051/ws/article/{slug}");
let (status, messages, send) = use_reconnecting_websocket(move || ws_url.clone());
rsx! {
div { class: "max-w-4xl mx-auto",
// 连接状态栏
div { class: "sticky top-0 z-10 px-4 py-1",
style: "background: var(--card); border-bottom: 1px solid var(--border);",
div { class: "flex items-center justify-between",
match status() {
WsStatus::Connected => rsx! {
span { class: "text-xs text-green-600",
"🟢 实时已连接"
}
},
WsStatus::Connecting => rsx! {
span { class: "text-xs text-yellow-600",
"🟡 连接中..."
}
},
WsStatus::Error(ref e) => rsx! {
span { class: "text-xs text-red-600",
"🔴 {e}"
}
},
_ => rsx! { span {} },
}
// 在线人数
span { class: "text-xs", style: "color: var(--tertiary);",
"👥 6 人在线"
}
}
}
// 文章内容
ArticleContent { slug: slug.clone() }
// 实时评论区
LiveComments { article_slug: slug.clone() }
}
}
}
8. 安全与注意事项
| 问题 | 解决方案 | |------|---------| | 跨域连接 | 服务端配置 CORS 或使用同源地址 | | 连接泄露 | 组件卸载时主动关闭 WebSocket | | 消息频率 | 客户端和服务端都做频率限制 | | 鉴权 | 在 URL 或首次消息中传递 token | | 断线重连 | 指数退避,限制最大重试次数 | | 内存占用 | 限制消息列表长度,定期清理旧消息 |
// 组件卸载时清理
use_effect(move || {
let ws = connect_websocket(&url);
// 返回清理函数
|| {
ws.close().ok();
log("[WS] 连接已清理");
}
});
9. 小结
- WebSocket 实现全双工实时通信,适合评论、通知、在线状态等场景
web-sys::WebSocket是 WASM 环境的底层 API- 封装为
use_websocketHook,组件只需关心消息收发 - 自动重连、健康检查、心跳机制确保连接稳定
- JSON 序列化消息,用
#[serde(tag = "type")]区分消息类型 - 组件卸载时主动关闭连接,防止内存泄漏
- 服务端和客户端都需要做频率限制和鉴权
dioxuswebsocketrealtimelive-chatpush