feat(frontend): add SSE auto-reconnect with toast notifications

- Automatic reconnection with exponential backoff (1s to 30s max)
- Toast when connection is lost: 'Sunucu bağlantısı kesildi'
- Toast when reconnected: 'Sunucu bağlantısı yeniden kuruldu'
- Backend rTorrent notifications still work via SSE
This commit is contained in:
spinline
2026-02-05 21:46:20 +03:00
parent 3bc4b0f364
commit ce6c3e01aa
3 changed files with 122 additions and 65 deletions

13
Cargo.lock generated
View File

@@ -695,6 +695,7 @@ dependencies = [
"console_log", "console_log",
"futures", "futures",
"gloo-net 0.5.0", "gloo-net 0.5.0",
"gloo-timers",
"js-sys", "js-sys",
"leptos", "leptos",
"leptos_router", "leptos_router",
@@ -874,6 +875,18 @@ dependencies = [
"web-sys", "web-sys",
] ]
[[package]]
name = "gloo-timers"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]] [[package]]
name = "gloo-utils" name = "gloo-utils"
version = "0.2.0" version = "0.2.0"

View File

@@ -16,6 +16,7 @@ log = "0.4"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
gloo-net = "0.5" gloo-net = "0.5"
gloo-timers = { version = "0.3", features = ["futures"] }
wasm-bindgen = "0.2" wasm-bindgen = "0.2"
uuid = { version = "1", features = ["v4", "js"] } uuid = { version = "1", features = ["v4", "js"] }
futures = "0.3" futures = "0.3"

View File

@@ -138,81 +138,124 @@ pub fn provide_torrent_store() {
}; };
provide_context(store); provide_context(store);
// Initialize SSE connection // Initialize SSE connection with auto-reconnect
create_effect(move |_| { create_effect(move |_| {
spawn_local(async move { spawn_local(async move {
let mut es = EventSource::new("/api/events").unwrap(); let mut backoff_ms: u32 = 1000; // Start with 1 second
let mut stream = es.subscribe("message").unwrap(); let max_backoff_ms: u32 = 30000; // Max 30 seconds
let mut was_connected = false;
while let Some(Ok((_, msg))) = stream.next().await { loop {
if let Some(data_str) = msg.data().as_string() { let es_result = EventSource::new("/api/events");
if let Ok(event) = serde_json::from_str::<AppEvent>(&data_str) {
match event { match es_result {
AppEvent::FullList { torrents: list, .. } => { Ok(mut es) => {
torrents.set(list); match es.subscribe("message") {
} Ok(mut stream) => {
AppEvent::Update(update) => { // Connection established
torrents.update(|list| { if was_connected {
if let Some(t) = list.iter_mut().find(|t| t.hash == update.hash) // We were previously connected and lost connection, now reconnected
{ show_toast_with_signal(
if let Some(name) = update.name { notifications,
t.name = name; NotificationLevel::Success,
} "Sunucu bağlantısı yeniden kuruldu",
if let Some(size) = update.size { );
t.size = size; }
} was_connected = true;
if let Some(down_rate) = update.down_rate { backoff_ms = 1000; // Reset backoff on successful connection
t.down_rate = down_rate;
} // Process messages
if let Some(up_rate) = update.up_rate { while let Some(Ok((_, msg))) = stream.next().await {
t.up_rate = up_rate; if let Some(data_str) = msg.data().as_string() {
} if let Ok(event) = serde_json::from_str::<AppEvent>(&data_str) {
if let Some(percent_complete) = update.percent_complete { match event {
t.percent_complete = percent_complete; AppEvent::FullList { torrents: list, .. } => {
} torrents.set(list);
if let Some(completed) = update.completed { }
t.completed = completed; AppEvent::Update(update) => {
} torrents.update(|list| {
if let Some(eta) = update.eta { if let Some(t) = list.iter_mut().find(|t| t.hash == update.hash)
t.eta = eta; {
} if let Some(name) = update.name {
if let Some(status) = update.status { t.name = name;
t.status = status; }
} if let Some(size) = update.size {
if let Some(error_message) = update.error_message { t.size = size;
t.error_message = error_message; }
} if let Some(down_rate) = update.down_rate {
if let Some(label) = update.label { t.down_rate = down_rate;
t.label = Some(label); }
if let Some(up_rate) = update.up_rate {
t.up_rate = up_rate;
}
if let Some(percent_complete) = update.percent_complete {
t.percent_complete = percent_complete;
}
if let Some(completed) = update.completed {
t.completed = completed;
}
if let Some(eta) = update.eta {
t.eta = eta;
}
if let Some(status) = update.status {
t.status = status;
}
if let Some(error_message) = update.error_message {
t.error_message = error_message;
}
if let Some(label) = update.label {
t.label = Some(label);
}
}
});
}
AppEvent::Stats(stats) => {
global_stats.set(stats);
}
AppEvent::Notification(n) => {
show_toast_with_signal(notifications, n.level, n.message);
}
}
} }
} }
}); }
}
AppEvent::Stats(stats) => {
global_stats.set(stats);
}
AppEvent::Notification(n) => {
let id = js_sys::Date::now() as u64;
let item = NotificationItem {
id,
notification: n,
};
notifications.update(|list| list.push(item));
// Auto-remove after 5 seconds // Stream ended - connection lost
let notifications = notifications; if was_connected {
let _ = set_timeout( show_toast_with_signal(
move || { notifications,
notifications.update(|list| { NotificationLevel::Warning,
list.retain(|i| i.id != id); "Sunucu bağlantısı kesildi, yeniden bağlanılıyor...",
}); );
}, }
std::time::Duration::from_secs(5), }
); Err(_) => {
// Failed to subscribe
if was_connected {
show_toast_with_signal(
notifications,
NotificationLevel::Warning,
"Sunucu bağlantısı kesildi, yeniden bağlanılıyor...",
);
}
} }
} }
} }
Err(_) => {
// Failed to create EventSource
if was_connected {
show_toast_with_signal(
notifications,
NotificationLevel::Warning,
"Sunucu bağlantısı kesildi, yeniden bağlanılıyor...",
);
}
}
} }
// Wait before reconnecting (exponential backoff)
gloo_timers::future::TimeoutFuture::new(backoff_ms).await;
backoff_ms = std::cmp::min(backoff_ms * 2, max_backoff_ms);
} }
}); });
}); });