diff --git a/Cargo.lock b/Cargo.lock index 4fb80b3..abfdcaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -695,6 +695,7 @@ dependencies = [ "console_log", "futures", "gloo-net 0.5.0", + "gloo-timers", "js-sys", "leptos", "leptos_router", @@ -874,6 +875,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "gloo-utils" version = "0.2.0" diff --git a/frontend/Cargo.toml b/frontend/Cargo.toml index 36b3ddd..ecd9fa2 100644 --- a/frontend/Cargo.toml +++ b/frontend/Cargo.toml @@ -16,6 +16,7 @@ log = "0.4" serde = { version = "1", features = ["derive"] } serde_json = "1" gloo-net = "0.5" +gloo-timers = { version = "0.3", features = ["futures"] } wasm-bindgen = "0.2" uuid = { version = "1", features = ["v4", "js"] } futures = "0.3" diff --git a/frontend/src/store.rs b/frontend/src/store.rs index fef66dd..a6f4e90 100644 --- a/frontend/src/store.rs +++ b/frontend/src/store.rs @@ -138,81 +138,124 @@ pub fn provide_torrent_store() { }; provide_context(store); - // Initialize SSE connection + // Initialize SSE connection with auto-reconnect create_effect(move |_| { spawn_local(async move { - let mut es = EventSource::new("/api/events").unwrap(); - let mut stream = es.subscribe("message").unwrap(); + let mut backoff_ms: u32 = 1000; // Start with 1 second + let max_backoff_ms: u32 = 30000; // Max 30 seconds + let mut was_connected = false; - while let Some(Ok((_, msg))) = stream.next().await { - if let Some(data_str) = msg.data().as_string() { - if let Ok(event) = serde_json::from_str::(&data_str) { - match event { - AppEvent::FullList { torrents: list, .. } => { - torrents.set(list); - } - AppEvent::Update(update) => { - torrents.update(|list| { - if let Some(t) = list.iter_mut().find(|t| t.hash == update.hash) - { - if let Some(name) = update.name { - t.name = name; - } - if let Some(size) = update.size { - t.size = size; - } - if let Some(down_rate) = update.down_rate { - t.down_rate = down_rate; - } - if let Some(up_rate) = update.up_rate { - t.up_rate = up_rate; - } - if let Some(percent_complete) = update.percent_complete { - t.percent_complete = percent_complete; - } - if let Some(completed) = update.completed { - t.completed = completed; - } - if let Some(eta) = update.eta { - t.eta = eta; - } - if let Some(status) = update.status { - t.status = status; - } - if let Some(error_message) = update.error_message { - t.error_message = error_message; - } - if let Some(label) = update.label { - t.label = Some(label); + loop { + let es_result = EventSource::new("/api/events"); + + match es_result { + Ok(mut es) => { + match es.subscribe("message") { + Ok(mut stream) => { + // Connection established + if was_connected { + // We were previously connected and lost connection, now reconnected + show_toast_with_signal( + notifications, + NotificationLevel::Success, + "Sunucu bağlantısı yeniden kuruldu", + ); + } + was_connected = true; + backoff_ms = 1000; // Reset backoff on successful connection + + // Process messages + while let Some(Ok((_, msg))) = stream.next().await { + if let Some(data_str) = msg.data().as_string() { + if let Ok(event) = serde_json::from_str::(&data_str) { + match event { + AppEvent::FullList { torrents: list, .. } => { + torrents.set(list); + } + AppEvent::Update(update) => { + torrents.update(|list| { + if let Some(t) = list.iter_mut().find(|t| t.hash == update.hash) + { + if let Some(name) = update.name { + t.name = name; + } + if let Some(size) = update.size { + t.size = size; + } + if let Some(down_rate) = update.down_rate { + t.down_rate = down_rate; + } + if let Some(up_rate) = update.up_rate { + t.up_rate = up_rate; + } + if let Some(percent_complete) = update.percent_complete { + t.percent_complete = percent_complete; + } + if let Some(completed) = update.completed { + t.completed = completed; + } + if let Some(eta) = update.eta { + t.eta = eta; + } + if let Some(status) = update.status { + t.status = status; + } + if let Some(error_message) = update.error_message { + t.error_message = error_message; + } + if let Some(label) = update.label { + t.label = Some(label); + } + } + }); + } + AppEvent::Stats(stats) => { + global_stats.set(stats); + } + AppEvent::Notification(n) => { + show_toast_with_signal(notifications, n.level, n.message); + } + } } } - }); - } - AppEvent::Stats(stats) => { - global_stats.set(stats); - } - AppEvent::Notification(n) => { - let id = js_sys::Date::now() as u64; - let item = NotificationItem { - id, - notification: n, - }; - notifications.update(|list| list.push(item)); + } - // Auto-remove after 5 seconds - let notifications = notifications; - let _ = set_timeout( - move || { - notifications.update(|list| { - list.retain(|i| i.id != id); - }); - }, - std::time::Duration::from_secs(5), - ); + // Stream ended - connection lost + if was_connected { + show_toast_with_signal( + notifications, + NotificationLevel::Warning, + "Sunucu bağlantısı kesildi, yeniden bağlanılıyor...", + ); + } + } + Err(_) => { + // Failed to subscribe + if was_connected { + show_toast_with_signal( + notifications, + NotificationLevel::Warning, + "Sunucu bağlantısı kesildi, yeniden bağlanılıyor...", + ); + } } } } + Err(_) => { + // Failed to create EventSource + if was_connected { + show_toast_with_signal( + notifications, + NotificationLevel::Warning, + "Sunucu bağlantısı kesildi, yeniden bağlanılıyor...", + ); + } + } } + + // Wait before reconnecting (exponential backoff) + gloo_timers::future::TimeoutFuture::new(backoff_ms).await; + backoff_ms = std::cmp::min(backoff_ms * 2, max_backoff_ms); } }); });