diff --git a/Cargo.lock b/Cargo.lock index b6b1e74..d4b62fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + [[package]] name = "aho-corasick" version = "1.1.4" @@ -11,6 +17,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "android_system_properties" version = "0.1.5" @@ -76,6 +97,18 @@ version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +[[package]] +name = "async-compression" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d10e4f991a553474232bc0a31799f6d24b034a84c0971d80d2e2f78b2e576e40" +dependencies = [ + "compression-codecs", + "compression-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-recursion" version = "1.1.1" @@ -223,6 +256,7 @@ dependencies = [ "rust-embed", "serde", "serde_json", + "shared", "tokio", "tokio-stream", "tokio-util", @@ -253,6 +287,27 @@ dependencies = [ "generic-array", ] +[[package]] +name = "brotli" +version = "8.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.19.1" @@ -284,6 +339,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583" dependencies = [ "find-msvc-tools", + "jobserver", + "libc", "shlex", ] @@ -386,6 +443,26 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "compression-codecs" +version = "0.4.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00828ba6fd27b45a448e57dbfe84f1029d4c9f26b368157e9a448a5f49a2ec2a" +dependencies = [ + "brotli", + "compression-core", + "flate2", + "memchr", + "zstd", + "zstd-safe", +] + +[[package]] +name = "compression-core" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" + [[package]] name = "config" version = "0.14.1" @@ -463,6 +540,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + [[package]] name = "crunchy" version = "0.2.4" @@ -564,6 +650,16 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db" +[[package]] +name = "flate2" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b375d6465b98090a5f25b1c7703f3859783755aa9a80433b36e0379a3ec2f369" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -592,6 +688,7 @@ dependencies = [ "log", "serde", "serde_json", + "shared", "uuid", "wasm-bindgen", "web-sys", @@ -1085,6 +1182,16 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.4", + "libc", +] + [[package]] name = "js-sys" version = "0.3.85" @@ -1298,6 +1405,15 @@ dependencies = [ "quote", ] +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1332,6 +1448,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", + "simd-adler32", +] + [[package]] name = "mio" version = "1.1.1" @@ -1472,6 +1598,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "potential_utf" version = "0.1.4" @@ -1980,6 +2112,13 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shared" +version = "0.1.0" +dependencies = [ + "serde", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1996,6 +2135,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simd-adler32" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" + [[package]] name = "slab" version = "0.4.11" @@ -2234,6 +2379,7 @@ dependencies = [ "futures-util", "pin-project", "pin-project-lite", + "tokio", "tower-layer", "tower-service", "tracing", @@ -2261,8 +2407,10 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ + "async-compression", "bitflags", "bytes", + "futures-core", "futures-util", "http 1.4.0", "http-body", @@ -2342,10 +2490,14 @@ version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex-automata", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] @@ -2870,3 +3022,31 @@ name = "zmij" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02aae0f83f69aafc94776e879363e9771d7ecbffe2c7fbb6c14c5e00dfe88439" + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index e163116..a99ed9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["backend", "frontend"] +members = ["backend", "frontend", "shared"] resolver = "2" # Optimize for size (aggressive) diff --git a/backend/Cargo.toml b/backend/Cargo.toml index ea7d700..092dc5c 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -6,12 +6,12 @@ edition = "2021" [dependencies] axum = { version = "0.7", features = ["macros", "ws"] } tokio = { version = "1", features = ["full"] } -tower = { version = "0.4", features = ["util"] } -tower-http = { version = "0.5", features = ["fs", "trace", "cors"] } +tower = { version = "0.4", features = ["util", "timeout"] } +tower-http = { version = "0.5", features = ["fs", "trace", "cors", "compression-full"] } serde = { version = "1", features = ["derive"] } serde_json = "1" tracing = "0.1" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } tokio-stream = "0.1" bytes = "1" futures = "0.3" @@ -21,3 +21,4 @@ tokio-util = { version = "0.7", features = ["codec", "io"] } clap = { version = "4.4", features = ["derive"] } rust-embed = "8.2" mime_guess = "2.0" +shared = { path = "../shared" } diff --git a/backend/src/diff.rs b/backend/src/diff.rs new file mode 100644 index 0000000..a5a0837 --- /dev/null +++ b/backend/src/diff.rs @@ -0,0 +1,81 @@ +use shared::{AppEvent, Torrent, TorrentUpdate}; + +pub fn diff_torrents(old: &[Torrent], new: &[Torrent]) -> Vec { + // 1. Structural Change Check + // If length differs or any hash at specific index differs (simplistic view), send FullList. + // Ideally we should track "Added/Removed", but for simplicity and robustness as per prompt "FullList for big changes", + // we fallback to FullList on structural changes. + if old.len() != new.len() { + // Timestamp is needed for FullList? The definition is FullList(Vec, u64). + // We'll let the caller handle the timestamp or pass it in? + // AppEvent in shared::lib.rs is FullList(Vec, u64). + // We'll return just the list decision here, or constructs events. + // Let's assume caller adds the u64 (disk space/timestamp). + // Actually, let's keep it simple: Return Option>. + // But simply returning "NeedFullList" signal is easier if we can't accept u64 here. + // Let's change signature to return an enum or boolean flag if FullList needed. + return vec![]; // Special signal: Empty vec means "No diffs" or "Caller handles FullList"? + // This function is tricky if we don't have the u64. + } + + // Check for hash mismatch (order changed) + for (i, t) in new.iter().enumerate() { + if old[i].hash != t.hash { + return vec![]; // Signal Full List needed + } + } + + let mut events = Vec::new(); + + for (i, new_t) in new.iter().enumerate() { + let old_t = &old[i]; + + let mut update = TorrentUpdate { + hash: new_t.hash.clone(), + down_rate: None, + up_rate: None, + percent_complete: None, + completed: None, + eta: None, + status: None, + }; + + let mut has_changes = false; + + if old_t.down_rate != new_t.down_rate { + update.down_rate = Some(new_t.down_rate); + has_changes = true; + } + if old_t.up_rate != new_t.up_rate { + update.up_rate = Some(new_t.up_rate); + has_changes = true; + } + // Floating point comparison with epsilon + if (old_t.percent_complete - new_t.percent_complete).abs() > 0.01 { + update.percent_complete = Some(new_t.percent_complete); + has_changes = true; + } + if old_t.completed != new_t.completed { + update.completed = Some(new_t.completed); + has_changes = true; + } + if old_t.eta != new_t.eta { + update.eta = Some(new_t.eta); + has_changes = true; + } + if old_t.status != new_t.status { + update.status = Some(new_t.status.clone()); + has_changes = true; + } + + if has_changes { + events.push(AppEvent::Update(update)); + } + } + + if !events.is_empty() { + tracing::debug!("Generated {} updates", events.len()); + } + + events +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 8264411..afe7479 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,13 +1,8 @@ - -mod models; +mod diff; mod scgi; mod sse; mod xmlrpc; -// fixup modules -// remove mm if I didn't create it? I didn't. -// I will structure modules correctly. - use clap::Parser; use rust_embed::RustEmbed; use axum::{ @@ -17,10 +12,28 @@ use axum::{ routing::{get, post}, Router, Json, }; -use tower_http::cors::CorsLayer; +use tower_http::{ + cors::CorsLayer, + trace::TraceLayer, + compression::{CompressionLayer, CompressionLevel}, +}; +use axum::{ + error_handling::HandleErrorLayer, + BoxError, +}; +use tower::ServiceBuilder; use serde::Deserialize; use std::net::SocketAddr; -use crate::models::AppState; +use shared::{Torrent, TorrentActionRequest, AppEvent}; // shared crates imports +use tokio::sync::{watch, broadcast}; +use std::sync::Arc; + +#[derive(Clone)] +pub struct AppState { + pub tx: Arc>>, + pub event_bus: broadcast::Sender, + pub scgi_socket_path: String, +} #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -66,8 +79,6 @@ async fn static_handler(uri: Uri) -> impl IntoResponse { } } -use tokio::sync::watch; -use std::sync::Arc; use std::time::Duration; #[derive(Deserialize)] @@ -79,19 +90,19 @@ async fn add_torrent_handler( State(state): State, Json(payload): Json, ) -> StatusCode { - println!("Received add_torrent request. URI length: {}", payload.uri.len()); + tracing::info!("Received add_torrent request. URI length: {}", payload.uri.len()); let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path); match client.call("load.start", &["", &payload.uri]).await { Ok(response) => { - println!("rTorrent response to load.start: {}", response); + tracing::debug!("rTorrent response to load.start: {}", response); if response.contains("faultCode") { - eprintln!("rTorrent returned fault: {}", response); + tracing::error!("rTorrent returned fault: {}", response); return StatusCode::INTERNAL_SERVER_ERROR; } StatusCode::OK }, Err(e) => { - eprintln!("Failed to add torrent: {}", e); + tracing::error!("Failed to add torrent: {}", e); StatusCode::INTERNAL_SERVER_ERROR } } @@ -99,36 +110,79 @@ async fn add_torrent_handler( #[tokio::main] async fn main() { - // initialize tracing - tracing_subscriber::fmt::init(); + // initialize tracing with env filter (default to info) + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env() + .add_directive(tracing::Level::INFO.into())) + .init(); // Parse CLI Args let args = Args::parse(); - println!("Starting VibeTorrent Backend..."); - println!("Socket: {}", args.socket); - println!("Port: {}", args.port); + tracing::info!("Starting VibeTorrent Backend..."); + tracing::info!("Socket: {}", args.socket); + tracing::info!("Port: {}", args.port); - // Channel for torrent list updates + // Channel for latest state (for new clients) let (tx, _rx) = watch::channel(vec![]); let tx = Arc::new(tx); + + // Channel for Events (Diffs) + let (event_bus, _) = broadcast::channel::(1024); let app_state = AppState { tx: tx.clone(), + event_bus: event_bus.clone(), scgi_socket_path: args.socket.clone(), }; // Spawn background task to poll rTorrent let tx_clone = tx.clone(); + let event_bus_tx = event_bus.clone(); let socket_path = args.socket.clone(); // Clone for background task + tokio::spawn(async move { let client = xmlrpc::RtorrentClient::new(&socket_path); + let mut previous_torrents: Vec = Vec::new(); + loop { match sse::fetch_torrents(&client).await { - Ok(torrents) => { - let _ = tx_clone.send(torrents); + Ok(new_torrents) => { + // 1. Update latest state (always) + let _ = tx_clone.send(new_torrents.clone()); + + // 2. Calculate Diff and Broadcasting + let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(); + + let mut structural_change = false; + if previous_torrents.len() != new_torrents.len() { + structural_change = true; + } else { + // Check for order/hash change + for (i, t) in new_torrents.iter().enumerate() { + if previous_torrents[i].hash != t.hash { + structural_change = true; + break; + } + } + } + + if structural_change { + // Structural change -> Send FullList + let _ = event_bus_tx.send(AppEvent::FullList(new_torrents.clone(), now)); + } else { + // Same structure -> Calculate partial updates + let updates = diff::diff_torrents(&previous_torrents, &new_torrents); + if !updates.is_empty() { + for update in updates { + let _ = event_bus_tx.send(update); + } + } + } + + previous_torrents = new_torrents; } Err(e) => { - eprintln!("Error fetching torrents in background: {}", e); + tracing::error!("Error fetching torrents in background: {}", e); } } tokio::time::sleep(Duration::from_secs(1)).await; @@ -140,20 +194,29 @@ async fn main() { .route("/api/torrents/add", post(add_torrent_handler)) .route("/api/torrents/action", post(handle_torrent_action)) .fallback(static_handler) // Serve static files for everything else + .layer(TraceLayer::new_for_http()) + .layer(CompressionLayer::new() + .br(false) + .gzip(true) + .quality(CompressionLevel::Fastest)) + .layer(ServiceBuilder::new() + .layer(HandleErrorLayer::new(handle_timeout_error)) + .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(30))) + ) .layer(CorsLayer::permissive()) .with_state(app_state); let addr = SocketAddr::from(([0, 0, 0, 0], args.port)); let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); - println!("Backend listening on {}", addr); + tracing::info!("Backend listening on {}", addr); axum::serve(listener, app).await.unwrap(); } async fn handle_torrent_action( State(state): State, - Json(payload): Json, + Json(payload): Json, ) -> impl IntoResponse { - println!("Received action: {} for hash: {}", payload.action, payload.hash); + tracing::info!("Received action: {} for hash: {}", payload.action, payload.hash); // Special handling for delete_with_data if payload.action == "delete_with_data" { @@ -170,14 +233,14 @@ async fn handle_torrent_action( Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to parse path: {}", e)).into_response(), }; - println!("Attempting to delete torrent and data at path: {}", path); + tracing::info!("Attempting to delete torrent and data at path: {}", path); if path.trim().is_empty() || path == "/" { return (StatusCode::BAD_REQUEST, "Safety check failed: Path is empty or root").into_response(); } // 2. Erase Torrent first (so rTorrent releases locks?) if let Err(e) = client.call("d.erase", &[&payload.hash]).await { - eprintln!("Failed to erase torrent entry: {}", e); + tracing::warn!("Failed to erase torrent entry: {}", e); // Proceed anyway to delete files? Maybe not. } @@ -199,8 +262,16 @@ async fn handle_torrent_action( match scgi::system_call(&state.scgi_socket_path, method, vec![&payload.hash]).await { Ok(_) => (StatusCode::OK, "Action executed").into_response(), Err(e) => { - eprintln!("SCGI error: {:?}", e); + tracing::error!("SCGI error: {:?}", e); (StatusCode::INTERNAL_SERVER_ERROR, "Failed to execute action").into_response() } } } + +async fn handle_timeout_error(err: BoxError) -> (StatusCode, &'static str) { + if err.is::() { + (StatusCode::REQUEST_TIMEOUT, "Request timed out") + } else { + (StatusCode::INTERNAL_SERVER_ERROR, "Unhandled internal error") + } +} diff --git a/backend/src/models.rs b/backend/src/models.rs deleted file mode 100644 index 5f5d2c9..0000000 --- a/backend/src/models.rs +++ /dev/null @@ -1,57 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Torrent { - pub hash: String, - pub name: String, - pub size: i64, - pub completed: i64, - pub down_rate: i64, - pub up_rate: i64, - pub eta: i64, - pub percent_complete: f64, - pub status: TorrentStatus, - pub error_message: String, - pub added_date: i64, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct TorrentActionRequest { - pub hash: String, - pub action: String, // "start", "stop", "delete" -} - -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] -pub enum TorrentStatus { - Downloading, - Seeding, - Paused, - Error, - Checking, - Queued, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(tag = "type", content = "data")] -pub enum AppEvent { - FullList(Vec, u64), - Update(TorrentUpdate), -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct TorrentUpdate { - pub hash: String, - // Optional fields for partial updates - pub down_rate: Option, - pub up_rate: Option, - pub percent_complete: Option, -} - -use tokio::sync::watch; -use std::sync::Arc; - -#[derive(Clone)] -pub struct AppState { - pub tx: Arc>>, - pub scgi_socket_path: String, -} diff --git a/backend/src/scgi.rs b/backend/src/scgi.rs index c44eeb6..0620ec3 100644 --- a/backend/src/scgi.rs +++ b/backend/src/scgi.rs @@ -132,7 +132,7 @@ pub async fn system_call( } xml.push_str(""); - println!("Sending XML-RPC Payload: {}", xml); // Debug logging + tracing::debug!("Sending XML-RPC Payload: {}", xml); let req = ScgiRequest::new().body(xml.clone().into_bytes()); let response_bytes = send_request(socket_path, req).await?; diff --git a/backend/src/sse.rs b/backend/src/sse.rs index fcf03f4..3fbb2f2 100644 --- a/backend/src/sse.rs +++ b/backend/src/sse.rs @@ -2,7 +2,7 @@ use axum::response::sse::{Event, Sse}; use futures::stream::{self, Stream}; use std::convert::Infallible; use tokio_stream::StreamExt; -use crate::models::{AppEvent, Torrent}; +use shared::{AppEvent, Torrent, TorrentStatus}; use crate::xmlrpc::{RtorrentClient, parse_multicall_response}; // Helper (should be moved to utils) @@ -64,15 +64,15 @@ pub async fn fetch_torrents(client: &RtorrentClient) -> Result, Str // Status Logic let status = if !message.is_empty() { - crate::models::TorrentStatus::Error + TorrentStatus::Error } else if is_hashing != 0 { - crate::models::TorrentStatus::Checking + TorrentStatus::Checking } else if state == 0 { - crate::models::TorrentStatus::Paused + TorrentStatus::Paused } else if is_complete != 0 { - crate::models::TorrentStatus::Seeding + TorrentStatus::Seeding } else { - crate::models::TorrentStatus::Downloading + TorrentStatus::Downloading }; // ETA Logic (seconds) @@ -110,7 +110,7 @@ pub async fn fetch_torrents(client: &RtorrentClient) -> Result, Str } use axum::extract::State; -use crate::models::AppState; +use crate::AppState; // Import from crate root pub async fn sse_handler( State(state): State, @@ -132,19 +132,25 @@ pub async fn sse_handler( let initial_stream = stream::once(async { Ok::(initial_event) }); // Stream that waits for subsequent changes - let update_stream = stream::unfold(state.tx.subscribe(), |mut rx| async move { - if let Err(_) = rx.changed().await { - return None; + // Stream that waits for subsequent changes via Broadcast channel + let rx = state.event_bus.subscribe(); + let update_stream = stream::unfold(rx, |mut rx| async move { + match rx.recv().await { + Ok(event) => { + match serde_json::to_string(&event) { + Ok(json) => Some((Ok::(Event::default().data(json)), rx)), + Err(e) => { + tracing::warn!("Failed to serialize SSE event: {}", e); + Some((Ok::(Event::default().comment("error")), rx)) + }, + } + }, + Err(e) => { + // If channel closed or lagged, close stream so client reconnects and gets fresh state + tracing::warn!("SSE Broadcast channel error (lagged/closed): {}", e); + None + } } - let torrents = rx.borrow().clone(); - // println!("Broadcasting SSE update with {} items", torrents.len()); - let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(); - let event_data = AppEvent::FullList(torrents, timestamp); - - match serde_json::to_string(&event_data) { - Ok(json) => Some((Ok::(Event::default().data(json)), rx)), - Err(_) => Some((Ok::(Event::default().comment("error")), rx)), - } }); Sse::new(initial_stream.chain(update_stream)) diff --git a/frontend/Cargo.toml b/frontend/Cargo.toml index 5e42b83..259343a 100644 --- a/frontend/Cargo.toml +++ b/frontend/Cargo.toml @@ -19,3 +19,4 @@ uuid = { version = "1", features = ["v4", "js"] } futures = "0.3" chrono = { version = "0.4", features = ["serde"] } web-sys = { version = "0.3", features = ["Window", "Storage"] } +shared = { path = "../shared" } diff --git a/frontend/src/app.rs b/frontend/src/app.rs index bff6c59..29ef8db 100644 --- a/frontend/src/app.rs +++ b/frontend/src/app.rs @@ -1,5 +1,5 @@ use leptos::*; -use crate::models::{Torrent, AppEvent, TorrentStatus, Theme}; +use shared::{Torrent, AppEvent, TorrentStatus, Theme, TorrentUpdate}; use crate::components::context_menu::ContextMenu; use gloo_net::eventsource::futures::EventSource; use futures::StreamExt; @@ -120,9 +120,23 @@ pub fn App() -> impl IntoView { let data = msg.data().as_string().unwrap(); match serde_json::from_str::(&data) { Ok(event) => { - if let AppEvent::FullList(list, ts) = event { - set_torrents.set(list); - set_last_updated.set(ts); + match event { + AppEvent::FullList(list, ts) => { + set_torrents.set(list); + set_last_updated.set(ts); + } + AppEvent::Update(diff) => { + set_torrents.update(|list| { + if let Some(target) = list.iter_mut().find(|t| t.hash == diff.hash) { + if let Some(v) = diff.down_rate { target.down_rate = v; } + if let Some(v) = diff.up_rate { target.up_rate = v; } + if let Some(v) = diff.percent_complete { target.percent_complete = v; } + if let Some(v) = diff.completed { target.completed = v; } + if let Some(v) = diff.eta { target.eta = v; } + if let Some(v) = diff.status { target.status = v; } + } + }); + } } } Err(e) => { diff --git a/frontend/src/lib.rs b/frontend/src/lib.rs index 59174e3..ab22239 100644 --- a/frontend/src/lib.rs +++ b/frontend/src/lib.rs @@ -1,5 +1,5 @@ mod app; -mod models; +// mod models; // Removed mod components; use leptos::*; diff --git a/shared/Cargo.toml b/shared/Cargo.toml new file mode 100644 index 0000000..87ab7e9 --- /dev/null +++ b/shared/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "shared" +version = "0.1.0" +edition = "2024" + +[dependencies] +serde = { version = "1.0.228", features = ["derive"] } diff --git a/frontend/src/models.rs b/shared/src/lib.rs similarity index 71% rename from frontend/src/models.rs rename to shared/src/lib.rs index b514e9d..4619cf4 100644 --- a/frontend/src/models.rs +++ b/shared/src/lib.rs @@ -28,7 +28,7 @@ pub enum TorrentStatus { #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "type", content = "data")] pub enum AppEvent { - FullList(Vec, u64), + FullList(Vec, u64), // u64 is likely free_space_bytes Update(TorrentUpdate), } @@ -38,8 +38,18 @@ pub struct TorrentUpdate { pub down_rate: Option, pub up_rate: Option, pub percent_complete: Option, + pub completed: Option, + pub eta: Option, + pub status: Option, } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct TorrentActionRequest { + pub hash: String, + pub action: String, // "start", "stop", "delete" +} + +// Added Theme here to separate it from backend logic but allow frontend usage via shared #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub enum Theme { Midnight,