初步可用

This commit is contained in:
2025-08-18 08:46:39 +08:00
parent 5c6f074ef3
commit 108d4439db
5 changed files with 550 additions and 60 deletions

View File

@@ -79,7 +79,7 @@ impl Default for ConnectionConfig {
fn default() -> Self {
Self {
host: "192.168.1.100".to_string(),
port: 8080,
port: 6666,
timeout: 10,
auto_reconnect: true,
reconnect_interval: 5,

View File

@@ -3,17 +3,20 @@ use futures_util::{SinkExt, StreamExt};
use log::{debug, error, info, warn};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::task::JoinHandle;
use tokio::time::{interval, sleep, Duration};
use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream, MaybeTlsStream};
pub type WebSocket = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
#[derive(Clone)]
pub struct ConnectionService {
config: Arc<RwLock<ConnectionConfig>>,
state: Arc<RwLock<PlayerState>>,
tx: Option<mpsc::UnboundedSender<PlaybackCommand>>,
websocket: Arc<Mutex<Option<WebSocket>>>,
stop_signal: Arc<AtomicBool>,
task_handle: Option<JoinHandle<()>>,
}
impl ConnectionService {
@@ -23,12 +26,14 @@ impl ConnectionService {
state: Arc::new(RwLock::new(PlayerState::default())),
tx: None,
websocket: Arc::new(Mutex::new(None)),
stop_signal: Arc::new(AtomicBool::new(false)),
task_handle: None,
}
}
pub async fn connect(&mut self) -> Result<(), String> {
let config = self.config.read().await.clone();
let url = format!("ws://{}:{}/ws", config.host, config.port);
let url = format!("ws://{}:{}", config.host, config.port);
info!("尝试连接到视频播放器: {}", url);
@@ -92,12 +97,17 @@ impl ConnectionService {
let websocket = self.websocket.clone();
let state = self.state.clone();
let config = self.config.clone();
let stop_flag = self.stop_signal.clone();
tokio::spawn(async move {
let handle = tokio::spawn(async move {
// 心跳检测
let mut heartbeat_interval = interval(Duration::from_secs(30));
loop {
if stop_flag.load(Ordering::Relaxed) {
info!("停止消息处理任务");
break;
}
tokio::select! {
// 处理发送的命令
command = rx.recv() => {
@@ -124,6 +134,7 @@ impl ConnectionService {
message = Self::receive_message(&websocket) => {
match message {
Ok(msg) => {
if msg.is_empty() { continue; }
if let Err(e) = Self::handle_player_message(&state, msg).await {
error!("处理播放器消息失败: {}", e);
}
@@ -139,6 +150,8 @@ impl ConnectionService {
}
}
});
self.task_handle = Some(handle);
}
async fn send_command(
@@ -148,10 +161,29 @@ impl ConnectionService {
let mut ws_guard = websocket.lock().await;
if let Some(ws) = ws_guard.as_mut() {
let command_json = serde_json::to_string(&command)
// Convert internal command enum to player-compatible wire format
let payload = match command.clone() {
PlaybackCommand::Play => serde_json::json!({"type": "play"}),
PlaybackCommand::Pause => serde_json::json!({"type": "pause"}),
PlaybackCommand::Stop => serde_json::json!({"type": "stop"}),
PlaybackCommand::Seek { position } => serde_json::json!({"type": "seek", "position": position}),
PlaybackCommand::SetVolume { volume } => serde_json::json!({"type": "setVolume", "volume": volume}),
PlaybackCommand::SetLoop { enabled } => serde_json::json!({"type": "setLoop", "enabled": enabled}),
PlaybackCommand::ToggleFullscreen => serde_json::json!({"type": "toggleFullscreen"}),
PlaybackCommand::LoadVideo { path } => serde_json::json!({"type": "loadVideo", "path": path}),
PlaybackCommand::SetPlaylist { videos } => serde_json::json!({"type": "setPlaylist", "videos": videos}),
PlaybackCommand::PlayFromPlaylist { index } => serde_json::json!({"type": "playFromPlaylist", "index": index}),
};
let message = serde_json::json!({
"type": "command",
"data": payload
});
let message_text = serde_json::to_string(&message)
.map_err(|e| format!("序列化命令失败: {}", e))?;
ws.send(Message::Text(command_json))
ws.send(Message::Text(message_text))
.await
.map_err(|e| format!("发送消息失败: {}", e))?;
@@ -203,7 +235,9 @@ impl ConnectionService {
}
}
} else {
Err("WebSocket 连接不可用".to_string())
// 未连接时避免刷屏日志:短暂等待并返回空消息
sleep(Duration::from_millis(300)).await;
Ok(String::new())
}
}
@@ -215,29 +249,83 @@ impl ConnectionService {
return Ok(());
}
// 尝试解析播放器状态更新
match serde_json::from_str::<PlayerState>(&message) {
Ok(new_state) => {
let mut current_state = state.write().await;
current_state.playback_status = new_state.playback_status;
current_state.position = new_state.position;
current_state.duration = new_state.duration;
current_state.volume = new_state.volume;
current_state.is_looping = new_state.is_looping;
current_state.is_fullscreen = new_state.is_fullscreen;
if let Some(video) = new_state.current_video {
current_state.current_video = Some(video);
}
debug!("更新播放器状态");
Ok(())
}
Err(e) => {
warn!("无法解析播放器状态: {} - 消息: {}", e, message);
Ok(()) // 不是致命错误,继续运行
}
// 解析播放器发来的 { type: "status", data: { ... } }
let value: serde_json::Value = serde_json::from_str(&message)
.map_err(|e| format!("解析JSON失败: {}", e))?;
let msg_type = value.get("type").and_then(|v| v.as_str()).unwrap_or("");
if msg_type != "status" {
// 忽略非状态消息
return Ok(());
}
let data = value.get("data").ok_or_else(|| "缺少data字段".to_string())?;
// 将字符串状态映射为内部枚举
let playback_status = match data.get("playback_status").and_then(|v| v.as_str()).unwrap_or("stopped") {
"playing" => crate::models::PlaybackStatus::Playing,
"paused" => crate::models::PlaybackStatus::Paused,
"loading" => crate::models::PlaybackStatus::Loading,
_ => crate::models::PlaybackStatus::Stopped,
};
let connection_status = match data.get("connection_status").and_then(|v| v.as_str()).unwrap_or("disconnected") {
"connected" => crate::models::ConnectionStatus::Connected,
"connecting" => crate::models::ConnectionStatus::Connecting,
"disconnected" => crate::models::ConnectionStatus::Disconnected,
other => crate::models::ConnectionStatus::Error(other.to_string()),
};
let position = data.get("position").and_then(|v| v.as_f64()).unwrap_or(0.0);
let duration = data.get("duration").and_then(|v| v.as_f64()).unwrap_or(0.0);
let volume = data.get("volume").and_then(|v| v.as_f64()).unwrap_or(50.0);
let is_looping = data.get("is_looping").and_then(|v| v.as_bool()).unwrap_or(false);
let is_fullscreen = data.get("is_fullscreen").and_then(|v| v.as_bool()).unwrap_or(false);
let current_video = if let Some(v) = data.get("current_video") {
let path = v.get("path").and_then(|s| s.as_str()).unwrap_or("").to_string();
if path.is_empty() { None } else {
Some(crate::models::VideoInfo {
title: v.get("title").and_then(|s| s.as_str()).unwrap_or("").to_string(),
path,
duration: v.get("duration").and_then(|n| n.as_f64()),
size: v.get("size").and_then(|n| n.as_u64()),
format: v.get("format").and_then(|s| s.as_str()).map(|s| s.to_string()),
})
}
} else { None };
let playlist = if let Some(arr) = data.get("playlist").and_then(|a| a.as_array()) {
arr.iter().filter_map(|v| {
let path = v.get("path").and_then(|s| s.as_str()).unwrap_or("").to_string();
if path.is_empty() { None } else {
Some(crate::models::VideoInfo {
title: v.get("title").and_then(|s| s.as_str()).unwrap_or("").to_string(),
path,
duration: v.get("duration").and_then(|n| n.as_f64()),
size: v.get("size").and_then(|n| n.as_u64()),
format: v.get("format").and_then(|s| s.as_str()).map(|s| s.to_string()),
})
}
}).collect()
} else { vec![] };
let current_playlist_index = data.get("current_playlist_index").and_then(|n| n.as_u64()).map(|n| n as usize);
let mut current_state = state.write().await;
current_state.connection_status = connection_status;
current_state.playback_status = playback_status;
current_state.position = position;
current_state.duration = duration;
current_state.volume = volume;
current_state.is_looping = is_looping;
current_state.is_fullscreen = is_fullscreen;
current_state.current_video = current_video;
current_state.playlist = playlist;
current_state.current_playlist_index = current_playlist_index;
debug!("更新播放器状态");
Ok(())
}
async fn handle_reconnection(
@@ -274,6 +362,9 @@ impl ConnectionService {
pub async fn disconnect(&mut self) {
info!("断开连接");
// 通知后台任务停止并尝试结束
self.stop_signal.store(true, Ordering::Relaxed);
// 关闭 WebSocket 连接
{
let mut ws_guard = self.websocket.lock().await;
@@ -283,6 +374,14 @@ impl ConnectionService {
*ws_guard = None;
}
// 停止后台任务
if let Some(handle) = self.task_handle.take() {
handle.abort();
}
// 重置停止标志,方便下次连接
self.stop_signal.store(false, Ordering::Relaxed);
// 更新状态
{
let mut state = self.state.write().await;