refactor(backend): clean up sse mapping and handler logic

This commit is contained in:
spinline
2026-02-03 21:45:24 +03:00
parent c3431db35f
commit 251da58a82
2 changed files with 213 additions and 232 deletions

View File

@@ -71,77 +71,48 @@ pub async fn add_torrent_handler(
} }
} }
pub async fn handle_torrent_action( /// Helper function to handle secure deletion of torrent data
State(state): State<AppState>, async fn delete_torrent_with_data(
Json(payload): Json<TorrentActionRequest>, client: &xmlrpc::RtorrentClient,
) -> impl IntoResponse { hash: &str,
tracing::info!( ) -> Result<&'static str, (StatusCode, String)> {
"Received action: {} for hash: {}",
payload.action,
payload.hash
);
// Special handling for delete_with_data
if payload.action == "delete_with_data" {
let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path);
// 1. Get Base Path // 1. Get Base Path
let path_xml = match client.call("d.base_path", &[&payload.hash]).await { let path_xml = client.call("d.base_path", &[hash]).await.map_err(|e| {
Ok(xml) => xml, (
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to call rTorrent: {}", e), format!("Failed to call rTorrent: {}", e),
) )
.into_response() })?;
}
};
let path = match xmlrpc::parse_string_response(&path_xml) { let path = xmlrpc::parse_string_response(&path_xml).map_err(|e| {
Ok(p) => p, (
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to parse path: {}", e), format!("Failed to parse path: {}", e),
) )
.into_response() })?;
}
};
// 1.5 Get Default Download Directory (Sandbox Root) // 1.5 Get Default Download Directory (Sandbox Root)
let root_xml = match client.call("directory.default", &[]).await { let root_xml = client.call("directory.default", &[]).await.map_err(|e| {
Ok(xml) => xml, (
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to get valid download root: {}", e), format!("Failed to get valid download root: {}", e),
) )
.into_response() })?;
}
};
let root_path_str = match xmlrpc::parse_string_response(&root_xml) { let root_path_str = xmlrpc::parse_string_response(&root_xml).map_err(|e| {
Ok(p) => p, (
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to parse root path: {}", e), format!("Failed to parse root path: {}", e),
) )
.into_response() })?;
}
};
// Resolve Paths (Canonicalize) to prevent .. traversal and symlink attacks // Resolve Paths (Canonicalize) to prevent .. traversal and symlink attacks
let root_path = match std::fs::canonicalize(std::path::Path::new(&root_path_str)) { let root_path = std::fs::canonicalize(std::path::Path::new(&root_path_str)).map_err(|e| {
Ok(p) => p, (
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
format!("Invalid download root configuration (on server): {}", e), format!("Invalid download root configuration (on server): {}", e),
) )
.into_response() })?;
}
};
// Check if target path exists before trying to resolve it // Check if target path exists before trying to resolve it
let target_path_raw = std::path::Path::new(&path); let target_path_raw = std::path::Path::new(&path);
@@ -151,26 +122,22 @@ pub async fn handle_torrent_action(
target_path_raw target_path_raw
); );
// If file doesn't exist, we just remove the torrent entry // If file doesn't exist, we just remove the torrent entry
if let Err(e) = client.call("d.erase", &[&payload.hash]).await { client.call("d.erase", &[hash]).await.map_err(|e| {
return ( (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to erase torrent: {}", e), format!("Failed to erase torrent: {}", e),
) )
.into_response(); })?;
}
return (StatusCode::OK, "Torrent removed (Data not found)").into_response(); return Ok("Torrent removed (Data not found)");
} }
let target_path = match std::fs::canonicalize(target_path_raw) { let target_path = std::fs::canonicalize(target_path_raw).map_err(|e| {
Ok(p) => p, (
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
format!("Invalid data path: {}", e), format!("Invalid data path: {}", e),
) )
.into_response() })?;
}
};
tracing::info!( tracing::info!(
"Delete request: Target='{:?}', Root='{:?}'", "Delete request: Target='{:?}', Root='{:?}'",
@@ -184,31 +151,28 @@ pub async fn handle_torrent_action(
"Security Risk: Attempted to delete path outside download directory: {:?}", "Security Risk: Attempted to delete path outside download directory: {:?}",
target_path target_path
); );
return ( return Err((
StatusCode::FORBIDDEN, StatusCode::FORBIDDEN,
"Security Error: Cannot delete files outside default download directory", "Security Error: Cannot delete files outside default download directory".to_string(),
) ));
.into_response();
} }
// SECURITY CHECK: Ensure we are not deleting the root itself // SECURITY CHECK: Ensure we are not deleting the root itself
if target_path == root_path { if target_path == root_path {
return ( return Err((
StatusCode::BAD_REQUEST, StatusCode::BAD_REQUEST,
"Security Error: Cannot delete the download root directory itself", "Security Error: Cannot delete the download root directory itself".to_string(),
) ));
.into_response();
} }
// 2. Erase Torrent first // 2. Erase Torrent first
if let Err(e) = client.call("d.erase", &[&payload.hash]).await { client.call("d.erase", &[hash]).await.map_err(|e| {
tracing::warn!("Failed to erase torrent entry: {}", e); tracing::warn!("Failed to erase torrent entry: {}", e);
return ( (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to erase torrent: {}", e), format!("Failed to erase torrent: {}", e),
) )
.into_response(); })?;
}
// 3. Delete Files via Native FS // 3. Delete Files via Native FS
let delete_result = if target_path.is_dir() { let delete_result = if target_path.is_dir() {
@@ -218,16 +182,35 @@ pub async fn handle_torrent_action(
}; };
match delete_result { match delete_result {
Ok(_) => return (StatusCode::OK, "Torrent and data deleted").into_response(), Ok(_) => Ok("Torrent and data deleted"),
Err(e) => { Err(e) => {
tracing::error!("Failed to delete data at {:?}: {}", target_path, e); tracing::error!("Failed to delete data at {:?}: {}", target_path, e);
return ( Err((
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to delete data: {}", e), format!("Failed to delete data: {}", e),
) ))
.into_response();
} }
} }
}
pub async fn handle_torrent_action(
State(state): State<AppState>,
Json(payload): Json<TorrentActionRequest>,
) -> impl IntoResponse {
tracing::info!(
"Received action: {} for hash: {}",
payload.action,
payload.hash
);
let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path);
// Special handling for delete_with_data
if payload.action == "delete_with_data" {
return match delete_torrent_with_data(&client, &payload.hash).await {
Ok(msg) => (StatusCode::OK, msg).into_response(),
Err((status, msg)) => (status, msg).into_response(),
};
} }
let method = match payload.action.as_str() { let method = match payload.action.as_str() {
@@ -237,7 +220,6 @@ pub async fn handle_torrent_action(
_ => return (StatusCode::BAD_REQUEST, "Invalid action").into_response(), _ => return (StatusCode::BAD_REQUEST, "Invalid action").into_response(),
}; };
let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path);
match client.call(method, &[&payload.hash]).await { match client.call(method, &[&payload.hash]).await {
Ok(_) => (StatusCode::OK, "Action executed").into_response(), Ok(_) => (StatusCode::OK, "Action executed").into_response(),
Err(e) => { Err(e) => {

View File

@@ -1,62 +1,54 @@
use crate::xmlrpc::{parse_multicall_response, RtorrentClient, XmlRpcError}; use crate::xmlrpc::{parse_multicall_response, RtorrentClient, XmlRpcError};
use crate::AppState;
use axum::extract::State;
use axum::response::sse::{Event, Sse}; use axum::response::sse::{Event, Sse};
use futures::stream::{self, Stream}; use futures::stream::{self, Stream};
use shared::{AppEvent, Torrent, TorrentStatus}; use shared::{AppEvent, Torrent, TorrentStatus};
use std::convert::Infallible; use std::convert::Infallible;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
// Helper (should be moved to utils) // Constants for rTorrent fields to ensure query and parser stay in sync
fn parse_size(s: &str) -> i64 { const RTORRENT_FIELDS: &[&str] = &[
s.parse().unwrap_or(0) "", // 0: default (ignored)
} "main", // 1: view
"d.hash=", // 0 -> row index starts after view
pub async fn fetch_torrents(client: &RtorrentClient) -> Result<Vec<Torrent>, XmlRpcError> { "d.name=", // 1
// d.multicall2("", "main", ...) "d.size_bytes=", // 2
let params = vec![ "d.bytes_done=", // 3
"", "d.down.rate=", // 4
"main", "d.up.rate=", // 5
"d.hash=",
"d.name=",
"d.size_bytes=",
"d.bytes_done=",
"d.down.rate=",
"d.up.rate=",
"d.state=", // 6 "d.state=", // 6
"d.complete=", // 7 "d.complete=", // 7
"d.message=", // 8 "d.message=", // 8
"d.left_bytes=", // 9 "d.left_bytes=", // 9
"d.creation_date=", // 10 "d.creation_date=", // 10
"d.hashing=", // 11 "d.hashing=", // 11
]; ];
let xml = client.call("d.multicall2", &params).await?; fn parse_long(s: Option<&String>) -> i64 {
s.map(|v| v.parse().unwrap_or(0)).unwrap_or(0)
}
if xml.trim().is_empty() { fn parse_string(s: Option<&String>) -> String {
return Err(XmlRpcError::Parse("Empty response from SCGI".to_string())); s.cloned().unwrap_or_default()
} }
let rows = parse_multicall_response(&xml)?; /// Converts a raw row of strings from rTorrent XML-RPC into a generic Torrent struct
fn from_rtorrent_row(row: Vec<String>) -> Torrent {
// Indexes correspond to the params list below (excluding the first two view/target args)
let hash = parse_string(row.get(0));
let name = parse_string(row.get(1));
let size = parse_long(row.get(2));
let completed = parse_long(row.get(3));
let down_rate = parse_long(row.get(4));
let up_rate = parse_long(row.get(5));
let torrents = rows let state = parse_long(row.get(6));
.into_iter() let is_complete = parse_long(row.get(7));
.map(|row| { let message = parse_string(row.get(8));
// row map indexes: let left_bytes = parse_long(row.get(9));
// 0: hash, 1: name, 2: size, 3: completed, 4: down_rate, 5: up_rate let added_date = parse_long(row.get(10));
// 6: state, 7: complete, 8: message, 9: left_bytes, 10: added, 11: hashing let is_hashing = parse_long(row.get(11));
let hash = row.get(0).cloned().unwrap_or_default();
let name = row.get(1).cloned().unwrap_or_default();
let size = parse_size(row.get(2).unwrap_or(&"0".to_string()));
let completed = parse_size(row.get(3).unwrap_or(&"0".to_string()));
let down_rate = parse_size(row.get(4).unwrap_or(&"0".to_string()));
let up_rate = parse_size(row.get(5).unwrap_or(&"0".to_string()));
let state = parse_size(row.get(6).unwrap_or(&"0".to_string()));
let is_complete = parse_size(row.get(7).unwrap_or(&"0".to_string()));
let message = row.get(8).cloned().unwrap_or_default();
let left_bytes = parse_size(row.get(9).unwrap_or(&"0".to_string()));
let added_date = parse_size(row.get(10).unwrap_or(&"0".to_string()));
let is_hashing = parse_size(row.get(11).unwrap_or(&"0".to_string()));
let percent_complete = if size > 0 { let percent_complete = if size > 0 {
(completed as f64 / size as f64) * 100.0 (completed as f64 / size as f64) * 100.0
@@ -97,15 +89,22 @@ pub async fn fetch_torrents(client: &RtorrentClient) -> Result<Vec<Torrent>, Xml
error_message: message, error_message: message,
added_date, added_date,
} }
}) }
.collect();
pub async fn fetch_torrents(client: &RtorrentClient) -> Result<Vec<Torrent>, XmlRpcError> {
let xml = client.call("d.multicall2", RTORRENT_FIELDS).await?;
if xml.trim().is_empty() {
return Err(XmlRpcError::Parse("Empty response from SCGI".to_string()));
}
let rows = parse_multicall_response(&xml)?;
let torrents = rows.into_iter().map(from_rtorrent_row).collect();
Ok(torrents) Ok(torrents)
} }
use crate::AppState;
use axum::extract::State; // Import from crate root
pub async fn sse_handler( pub async fn sse_handler(
State(state): State<AppState>, State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> { ) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {