feat: implement notification system and backoff strategy for rTorrent polling

This commit is contained in:
spinline
2026-02-05 18:31:42 +03:00
parent 97086bf33a
commit e2a8e17eae
7 changed files with 158 additions and 11 deletions

View File

@@ -85,10 +85,24 @@ pub async fn add_torrent_handler(
tracing::error!("rTorrent returned fault: {}", response); tracing::error!("rTorrent returned fault: {}", response);
return StatusCode::INTERNAL_SERVER_ERROR; return StatusCode::INTERNAL_SERVER_ERROR;
} }
let _ =
state
.event_bus
.send(shared::AppEvent::Notification(shared::SystemNotification {
level: shared::NotificationLevel::Success,
message: "Torrent added successfully".to_string(),
}));
StatusCode::OK StatusCode::OK
} }
Err(e) => { Err(e) => {
tracing::error!("Failed to add torrent: {}", e); tracing::error!("Failed to add torrent: {}", e);
let _ =
state
.event_bus
.send(shared::AppEvent::Notification(shared::SystemNotification {
level: shared::NotificationLevel::Error,
message: format!("Failed to add torrent: {}", e),
}));
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
} }
} }
@@ -121,7 +135,15 @@ pub async fn handle_torrent_action(
// Special handling for delete_with_data // Special handling for delete_with_data
if payload.action == "delete_with_data" { if payload.action == "delete_with_data" {
return match delete_torrent_with_data(&client, &payload.hash).await { return match delete_torrent_with_data(&client, &payload.hash).await {
Ok(msg) => (StatusCode::OK, msg).into_response(), Ok(msg) => {
let _ = state.event_bus.send(shared::AppEvent::Notification(
shared::SystemNotification {
level: shared::NotificationLevel::Success,
message: format!("Torrent deleted with data: {}", payload.hash),
},
));
(StatusCode::OK, msg).into_response()
}
Err((status, msg)) => (status, msg).into_response(), Err((status, msg)) => (status, msg).into_response(),
}; };
} }
@@ -136,7 +158,16 @@ pub async fn handle_torrent_action(
let params = vec![RpcParam::from(payload.hash.as_str())]; let params = vec![RpcParam::from(payload.hash.as_str())];
match client.call(method, &params).await { match client.call(method, &params).await {
Ok(_) => (StatusCode::OK, "Action executed").into_response(), Ok(_) => {
let _ =
state
.event_bus
.send(shared::AppEvent::Notification(shared::SystemNotification {
level: shared::NotificationLevel::Info,
message: format!("Action '{}' executed on torrent", payload.action),
}));
(StatusCode::OK, "Action executed").into_response()
}
Err(e) => { Err(e) => {
tracing::error!("RPC error: {}", e); tracing::error!("RPC error: {}", e);
( (

View File

@@ -147,6 +147,8 @@ async fn main() {
tokio::spawn(async move { tokio::spawn(async move {
let client = xmlrpc::RtorrentClient::new(&socket_path); let client = xmlrpc::RtorrentClient::new(&socket_path);
let mut previous_torrents: Vec<Torrent> = Vec::new(); let mut previous_torrents: Vec<Torrent> = Vec::new();
let mut consecutive_errors = 0;
let mut backoff_duration = Duration::from_secs(1);
loop { loop {
// 1. Fetch Torrents // 1. Fetch Torrents
@@ -158,6 +160,21 @@ async fn main() {
// Handle Torrents // Handle Torrents
match torrents_result { match torrents_result {
Ok(new_torrents) => { Ok(new_torrents) => {
// Check if we recovered from an error state
if consecutive_errors > 0 {
tracing::info!(
"Reconnected to rTorrent after {} failures.",
consecutive_errors
);
let _ =
event_bus_tx.send(AppEvent::Notification(shared::SystemNotification {
level: shared::NotificationLevel::Success,
message: "Reconnected to rTorrent".to_string(),
}));
consecutive_errors = 0;
backoff_duration = Duration::from_secs(1);
}
// Update latest state // Update latest state
let _ = tx_clone.send(new_torrents.clone()); let _ = tx_clone.send(new_torrents.clone());
@@ -186,6 +203,23 @@ async fn main() {
} }
Err(e) => { Err(e) => {
tracing::error!("Error fetching torrents in background: {}", e); tracing::error!("Error fetching torrents in background: {}", e);
consecutive_errors += 1;
// If this is the first error after success (or startup), notify clients
if consecutive_errors == 1 {
let _ =
event_bus_tx.send(AppEvent::Notification(shared::SystemNotification {
level: shared::NotificationLevel::Error,
message: format!("Lost connection to rTorrent: {}", e),
}));
}
// Exponential backoff with a cap of 30 seconds
backoff_duration = std::cmp::min(backoff_duration * 2, Duration::from_secs(30));
tracing::warn!(
"Backoff: Sleeping for {:?} due to rTorrent error.",
backoff_duration
);
} }
} }
@@ -199,7 +233,7 @@ async fn main() {
} }
} }
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(backoff_duration).await;
} }
}); });

View File

@@ -1,9 +1,10 @@
use crate::components::layout::sidebar::Sidebar;
use crate::components::layout::statusbar::StatusBar;
use crate::components::layout::toolbar::Toolbar;
use crate::components::toast::ToastContainer;
use crate::components::torrent::table::TorrentTable;
use leptos::*; use leptos::*;
use leptos_router::*; use leptos_router::*;
use crate::components::layout::sidebar::Sidebar;
use crate::components::layout::toolbar::Toolbar;
use crate::components::layout::statusbar::StatusBar;
use crate::components::torrent::table::TorrentTable;
#[component] #[component]
pub fn App() -> impl IntoView { pub fn App() -> impl IntoView {
@@ -36,6 +37,8 @@ pub fn App() -> impl IntoView {
<Sidebar /> <Sidebar />
</div> </div>
</div> </div>
<ToastContainer />
</div> </div>
} }
} }

View File

@@ -1,4 +1,5 @@
pub mod modal;
pub mod context_menu; pub mod context_menu;
pub mod layout; pub mod layout;
pub mod modal;
pub mod toast;
pub mod torrent; pub mod torrent;

View File

@@ -0,0 +1,35 @@
use leptos::*;
use shared::NotificationLevel;
#[component]
pub fn ToastContainer() -> impl IntoView {
let store = use_context::<crate::store::TorrentStore>().expect("store not provided");
let notifications = store.notifications;
view! {
<div class="toast toast-end toast-bottom z-[9999]">
{move || notifications.get().into_iter().map(|item| {
let alert_class = match item.notification.level {
NotificationLevel::Info => "alert-info",
NotificationLevel::Success => "alert-success",
NotificationLevel::Warning => "alert-warning",
NotificationLevel::Error => "alert-error",
};
let icon = match item.notification.level {
NotificationLevel::Info => view! { <svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" class="stroke-current shrink-0 w-6 h-6"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"></path></svg> },
NotificationLevel::Success => view! { <svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M9 12l2 2 4-4m6 2a9 9 0 11-18 0 9 9 0 0118 0z" /></svg> },
NotificationLevel::Warning => view! { <svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z" /></svg> },
NotificationLevel::Error => view! { <svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10 14l2-2m0 0l2-2m-2 2l-2-2m2 2l2 2m7-2a9 9 0 11-18 0 9 9 0 0118 0z" /></svg> },
};
view! {
<div class={format!("alert {} shadow-lg transition-all duration-300 animate-in slide-in-from-bottom-5 fade-in", alert_class)}>
{icon}
<span>{item.notification.message}</span>
</div>
}
}).collect::<Vec<_>>()}
</div>
}
}

View File

@@ -1,7 +1,13 @@
use futures::StreamExt; use futures::StreamExt;
use gloo_net::eventsource::futures::EventSource; use gloo_net::eventsource::futures::EventSource;
use leptos::*; use leptos::*;
use shared::{AppEvent, GlobalStats, Torrent}; use shared::{AppEvent, GlobalStats, SystemNotification, Torrent};
#[derive(Clone, Debug, PartialEq)]
pub struct NotificationItem {
pub id: u64,
pub notification: SystemNotification,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum FilterStatus { pub enum FilterStatus {
@@ -36,6 +42,7 @@ pub struct TorrentStore {
pub filter: RwSignal<FilterStatus>, pub filter: RwSignal<FilterStatus>,
pub search_query: RwSignal<String>, pub search_query: RwSignal<String>,
pub global_stats: RwSignal<GlobalStats>, pub global_stats: RwSignal<GlobalStats>,
pub notifications: RwSignal<Vec<NotificationItem>>,
} }
pub fn provide_torrent_store() { pub fn provide_torrent_store() {
@@ -43,12 +50,14 @@ pub fn provide_torrent_store() {
let filter = create_rw_signal(FilterStatus::All); let filter = create_rw_signal(FilterStatus::All);
let search_query = create_rw_signal(String::new()); let search_query = create_rw_signal(String::new());
let global_stats = create_rw_signal(GlobalStats::default()); let global_stats = create_rw_signal(GlobalStats::default());
let notifications = create_rw_signal(Vec::<NotificationItem>::new());
let store = TorrentStore { let store = TorrentStore {
torrents, torrents,
filter, filter,
search_query, search_query,
global_stats, global_stats,
notifications,
}; };
provide_context(store); provide_context(store);
@@ -105,6 +114,25 @@ pub fn provide_torrent_store() {
AppEvent::Stats(stats) => { AppEvent::Stats(stats) => {
global_stats.set(stats); global_stats.set(stats);
} }
AppEvent::Notification(n) => {
let id = js_sys::Date::now() as u64;
let item = NotificationItem {
id,
notification: n,
};
notifications.update(|list| list.push(item));
// Auto-remove after 5 seconds
let notifications = notifications;
let _ = set_timeout(
move || {
notifications.update(|list| {
list.retain(|i| i.id != id);
});
},
std::time::Duration::from_secs(5),
);
}
} }
} }
} }

View File

@@ -36,6 +36,21 @@ pub enum AppEvent {
}, },
Update(TorrentUpdate), Update(TorrentUpdate),
Stats(GlobalStats), Stats(GlobalStats),
Notification(SystemNotification),
}
#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
pub struct SystemNotification {
pub level: NotificationLevel,
pub message: String,
}
#[derive(Debug, Serialize, Deserialize, Clone, ToSchema, PartialEq, Eq)]
pub enum NotificationLevel {
Info,
Success,
Warning,
Error,
} }
#[derive(Debug, Serialize, Deserialize, Clone, ToSchema, Default)] #[derive(Debug, Serialize, Deserialize, Clone, ToSchema, Default)]