diff --git a/frontend/src/store.rs b/frontend/src/store.rs index 3c8ad00..ae63365 100644 --- a/frontend/src/store.rs +++ b/frontend/src/store.rs @@ -11,10 +11,6 @@ pub struct NotificationItem { pub notification: SystemNotification, } -// ============================================================================ -// Toast Helper Functions -// ============================================================================ - pub fn show_toast_with_signal( notifications: RwSignal>, level: NotificationLevel, @@ -29,7 +25,6 @@ pub fn show_toast_with_signal( notifications.update(|list| list.push(item)); - // Auto-remove after 5 seconds leptos::prelude::set_timeout( move || { notifications.update(|list| list.retain(|i| i.id != id)); @@ -47,10 +42,6 @@ pub fn show_toast(level: NotificationLevel, message: impl Into) { pub fn toast_success(message: impl Into) { show_toast(NotificationLevel::Success, message); } pub fn toast_error(message: impl Into) { show_toast(NotificationLevel::Error, message); } -// ============================================================================ -// Action Message Mapping -// ============================================================================ - pub fn get_action_messages(action: &str) -> (&'static str, &'static str) { match action { "start" => ("Torrent başlatıldı", "Torrent başlatılamadı"), @@ -75,10 +66,6 @@ pub struct PushKeys { pub auth: String, } -// ============================================================================ -// Store Definition -// ============================================================================ - #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum FilterStatus { All, Downloading, Seeding, Completed, Paused, Inactive, Active, Error, @@ -107,111 +94,104 @@ pub fn provide_torrent_store() { let store = TorrentStore { torrents, filter, search_query, global_stats, notifications, user }; provide_context(store); - let notifications_for_effect = notifications; - let global_stats_for_effect = global_stats; - let torrents_for_effect = torrents; + let user_for_sse = user; + let notifications_for_sse = notifications; + let global_stats_for_sse = global_stats; + let torrents_for_sse = torrents; let show_browser_notification = show_browser_notification.clone(); - Effect::new(move |_| { - let user_val = user.get(); - log::debug!("SSE Effect: user = {:?}", user_val); - if user_val.is_none() { - log::debug!("SSE Effect: user is None, skipping connection"); - return; - } + spawn_local(async move { + let mut backoff_ms: u32 = 1000; + let mut was_connected = false; + let mut disconnect_notified = false; - let notifications = notifications_for_effect; - let global_stats = global_stats_for_effect; - let torrents = torrents_for_effect; - let show_browser_notification = show_browser_notification.clone(); + loop { + let user_val = user_for_sse.get(); + if user_val.is_none() { + log::debug!("SSE: User not authenticated, waiting..."); + gloo_timers::future::TimeoutFuture::new(1000).await; + continue; + } - log::info!("SSE: Starting connection (user logged in)"); - - leptos::task::spawn_local(async move { - let mut backoff_ms: u32 = 1000; - let mut was_connected = false; - let mut disconnect_notified = false; - - loop { - log::debug!("SSE: Creating EventSource..."); - let es_result = EventSource::new("/api/events"); - match es_result { - Ok(mut es) => { - log::debug!("SSE: EventSource created, subscribing to message channel..."); - if let Ok(mut stream) = es.subscribe("message") { - log::debug!("SSE: Subscribed to message channel"); - let mut got_first_message = false; - while let Some(Ok((_, msg))) = stream.next().await { - log::debug!("SSE: Received message: {:?}", msg.data()); - if !got_first_message { - got_first_message = true; - backoff_ms = 1000; - if was_connected && disconnect_notified { - show_toast_with_signal(notifications, NotificationLevel::Success, "Sunucu bağlantısı yeniden kuruldu"); - disconnect_notified = false; - } - was_connected = true; + log::debug!("SSE: Creating EventSource..."); + let es_result = EventSource::new("/api/events"); + match es_result { + Ok(mut es) => { + log::debug!("SSE: EventSource created, subscribing..."); + if let Ok(mut stream) = es.subscribe("message") { + log::debug!("SSE: Subscribed to message channel"); + let mut got_first_message = false; + while let Some(Ok((_, msg))) = stream.next().await { + log::debug!("SSE: Received message"); + if !got_first_message { + got_first_message = true; + backoff_ms = 1000; + if was_connected && disconnect_notified { + show_toast_with_signal(notifications_for_sse, NotificationLevel::Success, "Sunucu bağlantısı yeniden kuruldu"); + disconnect_notified = false; } + was_connected = true; + } - if let Some(data_str) = msg.data().as_string() { - log::debug!("SSE: Parsing JSON: {}", data_str); - if let Ok(event) = serde_json::from_str::(&data_str) { - match event { - AppEvent::FullList { torrents: list, .. } => { - log::info!("SSE: Received FullList with {} torrents", list.len()); - torrents.update(|map| { - let new_hashes: std::collections::HashSet = list.iter().map(|t| t.hash.clone()).collect(); - map.retain(|hash, _| new_hashes.contains(hash)); - for new_torrent in list { - map.insert(new_torrent.hash.clone(), new_torrent); - } - }); - log::debug!("SSE: torrents map now has {} entries", torrents.with(|m| m.len())); - } - AppEvent::Update(update) => { - torrents.update(|map| { - if let Some(t) = map.get_mut(&update.hash) { - if let Some(v) = update.name { t.name = v; } - if let Some(v) = update.size { t.size = v; } - if let Some(v) = update.down_rate { t.down_rate = v; } - if let Some(v) = update.up_rate { t.up_rate = v; } - if let Some(v) = update.percent_complete { t.percent_complete = v; } - if let Some(v) = update.completed { t.completed = v; } - if let Some(v) = update.eta { t.eta = v; } - if let Some(v) = update.status { t.status = v; } - if let Some(v) = update.error_message { t.error_message = v; } - if let Some(v) = update.label { t.label = Some(v); } - } - }); - } - AppEvent::Stats(stats) => { global_stats.set(stats); } - AppEvent::Notification(n) => { - show_toast_with_signal(notifications, n.level.clone(), n.message.clone()); - if n.message.contains("tamamlandı") || n.level == shared::NotificationLevel::Error { - show_browser_notification("VibeTorrent", &n.message); + if let Some(data_str) = msg.data().as_string() { + log::debug!("SSE: Parsing JSON: {}", data_str); + if let Ok(event) = serde_json::from_str::(&data_str) { + match event { + AppEvent::FullList { torrents: list, .. } => { + log::info!("SSE: Received FullList with {} torrents", list.len()); + torrents_for_sse.update(|map| { + let new_hashes: std::collections::HashSet = list.iter().map(|t| t.hash.clone()).collect(); + map.retain(|hash, _| new_hashes.contains(hash)); + for new_torrent in list { + map.insert(new_torrent.hash.clone(), new_torrent); } + }); + log::debug!("SSE: torrents map now has {} entries", torrents_for_sse.with(|m| m.len())); + } + AppEvent::Update(update) => { + torrents_for_sse.update(|map| { + if let Some(t) = map.get_mut(&update.hash) { + if let Some(v) = update.name { t.name = v; } + if let Some(v) = update.size { t.size = v; } + if let Some(v) = update.down_rate { t.down_rate = v; } + if let Some(v) = update.up_rate { t.up_rate = v; } + if let Some(v) = update.percent_complete { t.percent_complete = v; } + if let Some(v) = update.completed { t.completed = v; } + if let Some(v) = update.eta { t.eta = v; } + if let Some(v) = update.status { t.status = v; } + if let Some(v) = update.error_message { t.error_message = v; } + if let Some(v) = update.label { t.label = Some(v); } + } + }); + } + AppEvent::Stats(stats) => { global_stats_for_sse.set(stats); } + AppEvent::Notification(n) => { + show_toast_with_signal(notifications_for_sse, n.level.clone(), n.message.clone()); + if n.message.contains("tamamlandı") || n.level == shared::NotificationLevel::Error { + show_browser_notification("VibeTorrent", &n.message); } } } } } - if was_connected && !disconnect_notified { - show_toast_with_signal(notifications, NotificationLevel::Warning, "Sunucu bağlantısı kesildi, yeniden bağlanılıyor..."); - disconnect_notified = true; - } } - } - Err(_) => { if was_connected && !disconnect_notified { - show_toast_with_signal(notifications, NotificationLevel::Warning, "Sunucu bağlantısı kurulamıyor..."); + show_toast_with_signal(notifications_for_sse, NotificationLevel::Warning, "Sunucu bağlantısı kesildi, yeniden bağlanılıyor..."); disconnect_notified = true; } } } - gloo_timers::future::TimeoutFuture::new(backoff_ms).await; - backoff_ms = std::cmp::min(backoff_ms * 2, 30000); + Err(_) => { + if was_connected && !disconnect_notified { + show_toast_with_signal(notifications_for_sse, NotificationLevel::Warning, "Sunucu bağlantısı kurulamıyor..."); + disconnect_notified = true; + } + } } - }); + log::debug!("SSE: Reconnecting in {}ms...", backoff_ms); + gloo_timers::future::TimeoutFuture::new(backoff_ms).await; + backoff_ms = std::cmp::min(backoff_ms * 2, 30000); + } }); }