refactor: move SSE logic to spawn_local with continuous user check
Some checks failed
Build MIPS Binary / build (push) Failing after 1m15s
Some checks failed
Build MIPS Binary / build (push) Failing after 1m15s
This commit is contained in:
@@ -11,10 +11,6 @@ pub struct NotificationItem {
|
|||||||
pub notification: SystemNotification,
|
pub notification: SystemNotification,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================
|
|
||||||
// Toast Helper Functions
|
|
||||||
// ============================================================================
|
|
||||||
|
|
||||||
pub fn show_toast_with_signal(
|
pub fn show_toast_with_signal(
|
||||||
notifications: RwSignal<Vec<NotificationItem>>,
|
notifications: RwSignal<Vec<NotificationItem>>,
|
||||||
level: NotificationLevel,
|
level: NotificationLevel,
|
||||||
@@ -29,7 +25,6 @@ pub fn show_toast_with_signal(
|
|||||||
|
|
||||||
notifications.update(|list| list.push(item));
|
notifications.update(|list| list.push(item));
|
||||||
|
|
||||||
// Auto-remove after 5 seconds
|
|
||||||
leptos::prelude::set_timeout(
|
leptos::prelude::set_timeout(
|
||||||
move || {
|
move || {
|
||||||
notifications.update(|list| list.retain(|i| i.id != id));
|
notifications.update(|list| list.retain(|i| i.id != id));
|
||||||
@@ -47,10 +42,6 @@ pub fn show_toast(level: NotificationLevel, message: impl Into<String>) {
|
|||||||
pub fn toast_success(message: impl Into<String>) { show_toast(NotificationLevel::Success, message); }
|
pub fn toast_success(message: impl Into<String>) { show_toast(NotificationLevel::Success, message); }
|
||||||
pub fn toast_error(message: impl Into<String>) { show_toast(NotificationLevel::Error, message); }
|
pub fn toast_error(message: impl Into<String>) { show_toast(NotificationLevel::Error, message); }
|
||||||
|
|
||||||
// ============================================================================
|
|
||||||
// Action Message Mapping
|
|
||||||
// ============================================================================
|
|
||||||
|
|
||||||
pub fn get_action_messages(action: &str) -> (&'static str, &'static str) {
|
pub fn get_action_messages(action: &str) -> (&'static str, &'static str) {
|
||||||
match action {
|
match action {
|
||||||
"start" => ("Torrent başlatıldı", "Torrent başlatılamadı"),
|
"start" => ("Torrent başlatıldı", "Torrent başlatılamadı"),
|
||||||
@@ -75,10 +66,6 @@ pub struct PushKeys {
|
|||||||
pub auth: String,
|
pub auth: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================
|
|
||||||
// Store Definition
|
|
||||||
// ============================================================================
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||||
pub enum FilterStatus {
|
pub enum FilterStatus {
|
||||||
All, Downloading, Seeding, Completed, Paused, Inactive, Active, Error,
|
All, Downloading, Seeding, Completed, Paused, Inactive, Active, Error,
|
||||||
@@ -107,47 +94,40 @@ pub fn provide_torrent_store() {
|
|||||||
let store = TorrentStore { torrents, filter, search_query, global_stats, notifications, user };
|
let store = TorrentStore { torrents, filter, search_query, global_stats, notifications, user };
|
||||||
provide_context(store);
|
provide_context(store);
|
||||||
|
|
||||||
let notifications_for_effect = notifications;
|
let user_for_sse = user;
|
||||||
let global_stats_for_effect = global_stats;
|
let notifications_for_sse = notifications;
|
||||||
let torrents_for_effect = torrents;
|
let global_stats_for_sse = global_stats;
|
||||||
|
let torrents_for_sse = torrents;
|
||||||
let show_browser_notification = show_browser_notification.clone();
|
let show_browser_notification = show_browser_notification.clone();
|
||||||
|
|
||||||
Effect::new(move |_| {
|
spawn_local(async move {
|
||||||
let user_val = user.get();
|
|
||||||
log::debug!("SSE Effect: user = {:?}", user_val);
|
|
||||||
if user_val.is_none() {
|
|
||||||
log::debug!("SSE Effect: user is None, skipping connection");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let notifications = notifications_for_effect;
|
|
||||||
let global_stats = global_stats_for_effect;
|
|
||||||
let torrents = torrents_for_effect;
|
|
||||||
let show_browser_notification = show_browser_notification.clone();
|
|
||||||
|
|
||||||
log::info!("SSE: Starting connection (user logged in)");
|
|
||||||
|
|
||||||
leptos::task::spawn_local(async move {
|
|
||||||
let mut backoff_ms: u32 = 1000;
|
let mut backoff_ms: u32 = 1000;
|
||||||
let mut was_connected = false;
|
let mut was_connected = false;
|
||||||
let mut disconnect_notified = false;
|
let mut disconnect_notified = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
let user_val = user_for_sse.get();
|
||||||
|
if user_val.is_none() {
|
||||||
|
log::debug!("SSE: User not authenticated, waiting...");
|
||||||
|
gloo_timers::future::TimeoutFuture::new(1000).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
log::debug!("SSE: Creating EventSource...");
|
log::debug!("SSE: Creating EventSource...");
|
||||||
let es_result = EventSource::new("/api/events");
|
let es_result = EventSource::new("/api/events");
|
||||||
match es_result {
|
match es_result {
|
||||||
Ok(mut es) => {
|
Ok(mut es) => {
|
||||||
log::debug!("SSE: EventSource created, subscribing to message channel...");
|
log::debug!("SSE: EventSource created, subscribing...");
|
||||||
if let Ok(mut stream) = es.subscribe("message") {
|
if let Ok(mut stream) = es.subscribe("message") {
|
||||||
log::debug!("SSE: Subscribed to message channel");
|
log::debug!("SSE: Subscribed to message channel");
|
||||||
let mut got_first_message = false;
|
let mut got_first_message = false;
|
||||||
while let Some(Ok((_, msg))) = stream.next().await {
|
while let Some(Ok((_, msg))) = stream.next().await {
|
||||||
log::debug!("SSE: Received message: {:?}", msg.data());
|
log::debug!("SSE: Received message");
|
||||||
if !got_first_message {
|
if !got_first_message {
|
||||||
got_first_message = true;
|
got_first_message = true;
|
||||||
backoff_ms = 1000;
|
backoff_ms = 1000;
|
||||||
if was_connected && disconnect_notified {
|
if was_connected && disconnect_notified {
|
||||||
show_toast_with_signal(notifications, NotificationLevel::Success, "Sunucu bağlantısı yeniden kuruldu");
|
show_toast_with_signal(notifications_for_sse, NotificationLevel::Success, "Sunucu bağlantısı yeniden kuruldu");
|
||||||
disconnect_notified = false;
|
disconnect_notified = false;
|
||||||
}
|
}
|
||||||
was_connected = true;
|
was_connected = true;
|
||||||
@@ -159,17 +139,17 @@ pub fn provide_torrent_store() {
|
|||||||
match event {
|
match event {
|
||||||
AppEvent::FullList { torrents: list, .. } => {
|
AppEvent::FullList { torrents: list, .. } => {
|
||||||
log::info!("SSE: Received FullList with {} torrents", list.len());
|
log::info!("SSE: Received FullList with {} torrents", list.len());
|
||||||
torrents.update(|map| {
|
torrents_for_sse.update(|map| {
|
||||||
let new_hashes: std::collections::HashSet<String> = list.iter().map(|t| t.hash.clone()).collect();
|
let new_hashes: std::collections::HashSet<String> = list.iter().map(|t| t.hash.clone()).collect();
|
||||||
map.retain(|hash, _| new_hashes.contains(hash));
|
map.retain(|hash, _| new_hashes.contains(hash));
|
||||||
for new_torrent in list {
|
for new_torrent in list {
|
||||||
map.insert(new_torrent.hash.clone(), new_torrent);
|
map.insert(new_torrent.hash.clone(), new_torrent);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
log::debug!("SSE: torrents map now has {} entries", torrents.with(|m| m.len()));
|
log::debug!("SSE: torrents map now has {} entries", torrents_for_sse.with(|m| m.len()));
|
||||||
}
|
}
|
||||||
AppEvent::Update(update) => {
|
AppEvent::Update(update) => {
|
||||||
torrents.update(|map| {
|
torrents_for_sse.update(|map| {
|
||||||
if let Some(t) = map.get_mut(&update.hash) {
|
if let Some(t) = map.get_mut(&update.hash) {
|
||||||
if let Some(v) = update.name { t.name = v; }
|
if let Some(v) = update.name { t.name = v; }
|
||||||
if let Some(v) = update.size { t.size = v; }
|
if let Some(v) = update.size { t.size = v; }
|
||||||
@@ -184,9 +164,9 @@ pub fn provide_torrent_store() {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
AppEvent::Stats(stats) => { global_stats.set(stats); }
|
AppEvent::Stats(stats) => { global_stats_for_sse.set(stats); }
|
||||||
AppEvent::Notification(n) => {
|
AppEvent::Notification(n) => {
|
||||||
show_toast_with_signal(notifications, n.level.clone(), n.message.clone());
|
show_toast_with_signal(notifications_for_sse, n.level.clone(), n.message.clone());
|
||||||
if n.message.contains("tamamlandı") || n.level == shared::NotificationLevel::Error {
|
if n.message.contains("tamamlandı") || n.level == shared::NotificationLevel::Error {
|
||||||
show_browser_notification("VibeTorrent", &n.message);
|
show_browser_notification("VibeTorrent", &n.message);
|
||||||
}
|
}
|
||||||
@@ -196,23 +176,23 @@ pub fn provide_torrent_store() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if was_connected && !disconnect_notified {
|
if was_connected && !disconnect_notified {
|
||||||
show_toast_with_signal(notifications, NotificationLevel::Warning, "Sunucu bağlantısı kesildi, yeniden bağlanılıyor...");
|
show_toast_with_signal(notifications_for_sse, NotificationLevel::Warning, "Sunucu bağlantısı kesildi, yeniden bağlanılıyor...");
|
||||||
disconnect_notified = true;
|
disconnect_notified = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
if was_connected && !disconnect_notified {
|
if was_connected && !disconnect_notified {
|
||||||
show_toast_with_signal(notifications, NotificationLevel::Warning, "Sunucu bağlantısı kurulamıyor...");
|
show_toast_with_signal(notifications_for_sse, NotificationLevel::Warning, "Sunucu bağlantısı kurulamıyor...");
|
||||||
disconnect_notified = true;
|
disconnect_notified = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log::debug!("SSE: Reconnecting in {}ms...", backoff_ms);
|
||||||
gloo_timers::future::TimeoutFuture::new(backoff_ms).await;
|
gloo_timers::future::TimeoutFuture::new(backoff_ms).await;
|
||||||
backoff_ms = std::cmp::min(backoff_ms * 2, 30000);
|
backoff_ms = std::cmp::min(backoff_ms * 2, 30000);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn subscribe_to_push_notifications() {
|
pub async fn subscribe_to_push_notifications() {
|
||||||
|
|||||||
Reference in New Issue
Block a user