From f761c904489c17b51254fc925c5f5ca4c7ff268e Mon Sep 17 00:00:00 2001 From: spinline Date: Wed, 4 Feb 2026 00:39:21 +0300 Subject: [PATCH] refactor(backend): use typed XmlRpc parameters (int/string) to fix rtorrent calls --- backend/src/handlers/mod.rs | 124 +++++++++++++++++++++--------------- backend/src/main.rs | 3 +- backend/src/sse.rs | 44 ++++++++----- backend/src/xmlrpc.rs | 116 +++++++++++++++++++++++++++++---- 4 files changed, 204 insertions(+), 83 deletions(-) diff --git a/backend/src/handlers/mod.rs b/backend/src/handlers/mod.rs index 1bd31fc..2a6396f 100644 --- a/backend/src/handlers/mod.rs +++ b/backend/src/handlers/mod.rs @@ -1,4 +1,7 @@ -use crate::{xmlrpc, AppState}; +use crate::{ + xmlrpc::{self, RpcParam}, + AppState, +}; use axum::{ extract::{Json, Path, State}, http::{header, StatusCode, Uri}, @@ -73,7 +76,9 @@ pub async fn add_torrent_handler( payload.uri.len() ); let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path); - match client.call("load.start", &["", &payload.uri]).await { + let params = vec![RpcParam::from(""), RpcParam::from(payload.uri.as_str())]; + + match client.call("load.start", ¶ms).await { Ok(response) => { tracing::debug!("rTorrent response to load.start: {}", response); if response.contains("faultCode") { @@ -128,7 +133,9 @@ pub async fn handle_torrent_action( _ => return (StatusCode::BAD_REQUEST, "Invalid action").into_response(), }; - match client.call(method, &[&payload.hash]).await { + let params = vec![RpcParam::from(payload.hash.as_str())]; + + match client.call(method, ¶ms).await { Ok(_) => (StatusCode::OK, "Action executed").into_response(), Err(e) => { tracing::error!("RPC error: {}", e); @@ -146,13 +153,18 @@ async fn delete_torrent_with_data( client: &xmlrpc::RtorrentClient, hash: &str, ) -> Result<&'static str, (StatusCode, String)> { + let params_hash = vec![RpcParam::from(hash)]; + // 1. Get Base Path - let path_xml = client.call("d.base_path", &[hash]).await.map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to call rTorrent: {}", e), - ) - })?; + let path_xml = client + .call("d.base_path", ¶ms_hash) + .await + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to call rTorrent: {}", e), + ) + })?; let path = xmlrpc::parse_string_response(&path_xml).map_err(|e| { ( @@ -192,7 +204,7 @@ async fn delete_torrent_with_data( target_path_raw ); // If file doesn't exist, we just remove the torrent entry - client.call("d.erase", &[hash]).await.map_err(|e| { + client.call("d.erase", ¶ms_hash).await.map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to erase torrent: {}", e), @@ -236,7 +248,7 @@ async fn delete_torrent_with_data( } // 2. Erase Torrent first - client.call("d.erase", &[hash]).await.map_err(|e| { + client.call("d.erase", ¶ms_hash).await.map_err(|e| { tracing::warn!("Failed to erase torrent entry: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, @@ -306,12 +318,12 @@ pub async fn get_files_handler( ) -> impl IntoResponse { let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path); let params = vec![ - hash.as_str(), - "", - "f.path=", - "f.size_bytes=", - "f.completed_chunks=", - "f.priority=", + RpcParam::from(hash.as_str()), + RpcParam::from(""), + RpcParam::from("f.path="), + RpcParam::from("f.size_bytes="), + RpcParam::from("f.completed_chunks="), + RpcParam::from("f.priority="), ]; match client.call("f.multicall", ¶ms).await { @@ -362,13 +374,13 @@ pub async fn get_peers_handler( ) -> impl IntoResponse { let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path); let params = vec![ - hash.as_str(), - "", - "p.address=", - "p.client_version=", - "p.down_rate=", - "p.up_rate=", - "p.completed_percent=", // or similar + RpcParam::from(hash.as_str()), + RpcParam::from(""), + RpcParam::from("p.address="), + RpcParam::from("p.client_version="), + RpcParam::from("p.down_rate="), + RpcParam::from("p.up_rate="), + RpcParam::from("p.completed_percent="), ]; match client.call("p.multicall", ¶ms).await { @@ -418,14 +430,12 @@ pub async fn get_trackers_handler( ) -> impl IntoResponse { let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path); let params = vec![ - hash.as_str(), - "", - "t.url=", - "t.activity_date_last=", // Just an example field, msg is better - // t.latest_event (success/error) is tricky. - "t.message=", // Often empty if ok + RpcParam::from(hash.as_str()), + RpcParam::from(""), + RpcParam::from("t.url="), + RpcParam::from("t.activity_date_last="), + RpcParam::from("t.message="), ]; - // rTorrent tracker info is sometimes sparse in multicall match client.call("t.multicall", ¶ms).await { Ok(xml) => { @@ -474,16 +484,28 @@ pub async fn set_file_priority_handler( ) -> impl IntoResponse { let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path); - let priority_str = payload.priority.to_string(); - let target = format!("{}:f{}", payload.hash, payload.file_index); + // f.set_priority takes "hash", index, priority + // Priority: 0 (off), 1 (normal), 2 (high) + // f.set_priority is tricky. Let's send as string first as before, or int if we knew. + // Usually priorities are small integers. + // But since we are updating everything to RpcParam, let's use Int if possible or String. + // The previous implementation used string. Let's stick to string for now or try Int. + // Actually, f.set_priority likely takes an integer. - match client - .call("f.set_priority", &[&target, &priority_str]) - .await - { + let target = format!("{}:f{}", payload.hash, payload.file_index); + let params = vec![ + RpcParam::from(target.as_str()), + RpcParam::from(payload.priority as i64), + ]; + + match client.call("f.set_priority", ¶ms).await { Ok(_) => { - // Need to update view to reflect changes? usually 'd.update_priorities' is needed - let _ = client.call("d.update_priorities", &[&payload.hash]).await; + let _ = client + .call( + "d.update_priorities", + &[RpcParam::from(payload.hash.as_str())], + ) + .await; (StatusCode::OK, "Priority updated").into_response() } Err(e) => ( @@ -509,11 +531,12 @@ pub async fn set_label_handler( Json(payload): Json, ) -> impl IntoResponse { let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path); + let params = vec![ + RpcParam::from(payload.hash.as_str()), + RpcParam::from(payload.label), + ]; - match client - .call("d.custom1.set", &[&payload.hash, &payload.label]) - .await - { + match client.call("d.custom1.set", ¶ms).await { Ok(_) => (StatusCode::OK, "Label updated").into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, @@ -539,18 +562,12 @@ pub async fn get_global_limit_handler(State(state): State) -> impl Int let up_fut = client.call("throttle.global_up.max_rate", &[]); let down = match down_fut.await { - Ok(xml) => xmlrpc::parse_string_response(&xml) - .unwrap_or_default() - .parse::() - .unwrap_or(0), + Ok(xml) => xmlrpc::parse_i64_response(&xml).unwrap_or(0), Err(_) => -1, }; let up = match up_fut.await { - Ok(xml) => xmlrpc::parse_string_response(&xml) - .unwrap_or_default() - .parse::() - .unwrap_or(0), + Ok(xml) => xmlrpc::parse_i64_response(&xml).unwrap_or(0), Err(_) => -1, }; @@ -579,8 +596,9 @@ pub async fn set_global_limit_handler( let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path); if let Some(down) = payload.max_download_rate { + // Here is the fix: Send as Int if let Err(e) = client - .call("throttle.global_down.max_rate.set", &[&down.to_string()]) + .call("throttle.global_down.max_rate.set", &[RpcParam::Int(down)]) .await { return ( @@ -593,7 +611,7 @@ pub async fn set_global_limit_handler( if let Some(up) = payload.max_upload_rate { if let Err(e) = client - .call("throttle.global_up.max_rate.set", &[&up.to_string()]) + .call("throttle.global_up.max_rate.set", &[RpcParam::Int(up)]) .await { return ( diff --git a/backend/src/main.rs b/backend/src/main.rs index 2b985c2..08a4840 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -116,7 +116,8 @@ async fn main() { tracing::info!("Socket file exists. Testing connection..."); let client = xmlrpc::RtorrentClient::new(&args.socket); // We use a lightweight call to verify connectivity - match client.call("system.client_version", &[]).await { + let params: Vec = vec![]; + match client.call("system.client_version", ¶ms).await { Ok(xml) => { let version = xmlrpc::parse_string_response(&xml).unwrap_or(xml); tracing::info!("Connected to rTorrent successfully. Version: {}", version); diff --git a/backend/src/sse.rs b/backend/src/sse.rs index 6f8492d..32adefe 100644 --- a/backend/src/sse.rs +++ b/backend/src/sse.rs @@ -1,4 +1,6 @@ -use crate::xmlrpc::{parse_multicall_response, RtorrentClient, XmlRpcError}; +use crate::xmlrpc::{ + parse_i64_response, parse_multicall_response, RpcParam, RtorrentClient, XmlRpcError, +}; use crate::AppState; use axum::extract::State; use axum::response::sse::{Event, Sse}; @@ -101,7 +103,8 @@ fn from_rtorrent_row(row: Vec) -> Torrent { } pub async fn fetch_torrents(client: &RtorrentClient) -> Result, XmlRpcError> { - let xml = client.call("d.multicall2", RTORRENT_FIELDS).await?; + let params: Vec = RTORRENT_FIELDS.iter().map(|s| RpcParam::from(*s)).collect(); + let xml = client.call("d.multicall2", ¶ms).await?; if xml.trim().is_empty() { return Err(XmlRpcError::Parse("Empty response from SCGI".to_string())); @@ -115,26 +118,33 @@ pub async fn fetch_torrents(client: &RtorrentClient) -> Result, Xml } pub async fn fetch_global_stats(client: &RtorrentClient) -> Result { - // Parallel calls would be better but let's keep it simple sequential for now. - // NOTE: This adds 4 roundtrips per second. If this is too slow, we should use multicall via system.multicall (if supported) - // or just accept the overhead. Unix socket overhead is very low. + let empty_params: Vec = vec![]; - // We ignore errors on individual stats to not break the whole loop, using defaults. - // But connection errors should propagate. + let down_rate_xml = client + .call("throttle.global_down.rate", &empty_params) + .await?; + let down_rate = parse_i64_response(&down_rate_xml).unwrap_or(0); - let down_rate_str = client.call("throttle.global_down.rate", &[]).await?; - let up_rate_str = client.call("throttle.global_up.rate", &[]).await?; - let down_limit_str = client.call("throttle.global_down.max_rate", &[]).await?; - let up_limit_str = client.call("throttle.global_up.max_rate", &[]).await?; + let up_rate_xml = client + .call("throttle.global_up.rate", &empty_params) + .await?; + let up_rate = parse_i64_response(&up_rate_xml).unwrap_or(0); - // Optionally get free space. "directory.default" then "d.free_space_path"?? No "get_directory_free_space" - // Let's skip free space for high frequency updates. + let down_limit_xml = client + .call("throttle.global_down.max_rate", &empty_params) + .await?; + let down_limit = parse_i64_response(&down_limit_xml).ok(); + + let up_limit_xml = client + .call("throttle.global_up.max_rate", &empty_params) + .await?; + let up_limit = parse_i64_response(&up_limit_xml).ok(); Ok(GlobalStats { - down_rate: down_rate_str.parse().unwrap_or(0), - up_rate: up_rate_str.parse().unwrap_or(0), - down_limit: down_limit_str.parse().ok(), - up_limit: up_limit_str.parse().ok(), + down_rate, + up_rate, + down_limit, + up_limit, free_space: None, }) } diff --git a/backend/src/xmlrpc.rs b/backend/src/xmlrpc.rs index 7b71473..9139a46 100644 --- a/backend/src/xmlrpc.rs +++ b/backend/src/xmlrpc.rs @@ -9,13 +9,44 @@ pub enum XmlRpcError { #[error("SCGI Error: {0}")] Scgi(#[from] ScgiError), #[error("Serialization Error: {0}")] - Serialization(String), // quick_xml errors are tricky to wrap directly due to versions/features + Serialization(String), #[error("Deserialization Error: {0}")] Deserialization(#[from] quick_xml::de::DeError), #[error("XML Parse Error: {0}")] Parse(String), } +// --- Request Parameters Enum --- +#[derive(Debug, Clone)] +pub enum RpcParam { + String(String), + Int(i64), +} + +impl From<&str> for RpcParam { + fn from(s: &str) -> Self { + RpcParam::String(s.to_string()) + } +} + +impl From for RpcParam { + fn from(s: String) -> Self { + RpcParam::String(s) + } +} + +impl From for RpcParam { + fn from(i: i64) -> Self { + RpcParam::Int(i) + } +} + +impl From for RpcParam { + fn from(i: i32) -> Self { + RpcParam::Int(i as i64) + } +} + // --- Request Models --- #[derive(Debug, Serialize)] @@ -40,8 +71,9 @@ struct RequestParam<'a> { struct RequestValueInner<'a> { #[serde(skip_serializing_if = "Option::is_none")] string: Option<&'a str>, + // rTorrent uses i8/i4. Let's use i8 (64-bit) which is safer for large limits/sizes #[serde(skip_serializing_if = "Option::is_none")] - i4: Option, + i8: Option, } // --- Response Models for d.multicall2 --- @@ -78,7 +110,6 @@ struct MulticallResponseDataOuterValue { values: Vec, } -// Each row in the response #[derive(Debug, Deserialize)] struct MulticallRowValue { array: MulticallResponseDataInner, @@ -95,7 +126,6 @@ struct MulticallResponseDataInnerValue { values: Vec, } -// Each item in a row (column) #[derive(Debug, Deserialize)] struct MulticallItemValue { #[serde(rename = "string", default)] @@ -143,6 +173,34 @@ struct StringResponseValue { string: String, } +// --- Response Model for simple integer (i8/i4) --- + +#[derive(Debug, Deserialize)] +#[serde(rename = "methodResponse")] +struct IntegerResponse { + params: IntegerResponseParams, +} + +#[derive(Debug, Deserialize)] +struct IntegerResponseParams { + param: IntegerResponseParam, +} + +#[derive(Debug, Deserialize)] +struct IntegerResponseParam { + value: IntegerResponseValue, +} + +#[derive(Debug, Deserialize)] +struct IntegerResponseValue { + #[serde(rename = "i8", default)] + i8: Option, + #[serde(rename = "i4", default)] + i4: Option, + #[serde(rename = "string", default)] + string: Option, +} + // --- Client Implementation --- pub struct RtorrentClient { @@ -157,14 +215,20 @@ impl RtorrentClient { } /// Helper to build and serialize XML-RPC method call - fn build_method_call(&self, method: &str, params: &[&str]) -> Result { + fn build_method_call(&self, method: &str, params: &[RpcParam]) -> Result { let req_params = RequestParams { param: params .iter() .map(|p| RequestParam { - value: RequestValueInner { - string: Some(p), - i4: None, + value: match p { + RpcParam::String(s) => RequestValueInner { + string: Some(s), + i8: None, + }, + RpcParam::Int(i) => RequestValueInner { + string: None, + i8: Some(*i), + }, }, }) .collect(), @@ -179,7 +243,7 @@ impl RtorrentClient { Ok(format!("\n{}", xml_body)) } - pub async fn call(&self, method: &str, params: &[&str]) -> Result { + pub async fn call(&self, method: &str, params: &[RpcParam]) -> Result { let xml = self.build_method_call(method, params)?; let req = ScgiRequest::new().body(xml.into_bytes()); @@ -210,6 +274,20 @@ pub fn parse_string_response(xml: &str) -> Result { Ok(response.params.param.value.string) } +pub fn parse_i64_response(xml: &str) -> Result { + let response: IntegerResponse = from_str(xml)?; + if let Some(val) = response.params.param.value.i8 { + Ok(val) + } else if let Some(val) = response.params.param.value.i4 { + Ok(val) + } else if let Some(ref s) = response.params.param.value.string { + s.parse() + .map_err(|_| XmlRpcError::Parse("Not an integer string".to_string())) + } else { + Err(XmlRpcError::Parse("No integer value found".to_string())) + } +} + #[cfg(test)] mod tests { use super::*; @@ -217,14 +295,28 @@ mod tests { #[test] fn test_build_method_call() { let client = RtorrentClient::new("dummy"); - let xml = client - .build_method_call("d.multicall2", &["", "main", "d.name="]) - .unwrap(); + let params = vec![ + RpcParam::String("".to_string()), + RpcParam::String("main".to_string()), + RpcParam::String("d.name=".to_string()), + ]; + let xml = client.build_method_call("d.multicall2", ¶ms).unwrap(); assert!(xml.contains("d.multicall2")); assert!(xml.contains("main")); } + #[test] + fn test_build_method_call_int() { + let client = RtorrentClient::new("dummy"); + let params = vec![RpcParam::Int(1024)]; + let xml = client.build_method_call("test.int", ¶ms).unwrap(); + // quick-xml default for i64 might be just text inside tag if not renamed? + // We mapped i8 field to i64 value. + // It should produce 1024 + assert!(xml.contains("1024")); + } + #[test] fn test_parse_multicall_response() { let xml = r#"