feat: modernize stack with shadcn, struct_patch and msgpack
Some checks failed
Build MIPS Binary / build (push) Failing after 6s
Some checks failed
Build MIPS Binary / build (push) Failing after 6s
This commit is contained in:
@@ -15,6 +15,8 @@ tower = { version = "0.5", features = ["util", "timeout"] }
|
||||
tower-http = { version = "0.6", features = ["fs", "trace", "cors", "compression-full"] }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
rmp-serde = "1.3"
|
||||
struct_patch = "0.5"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
tokio-stream = "0.1"
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use shared::{AppEvent, NotificationLevel, SystemNotification, Torrent, TorrentUpdate};
|
||||
use shared::{AppEvent, NotificationLevel, SystemNotification, Torrent};
|
||||
use struct_patch::traits::Patchable;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DiffResult {
|
||||
@@ -9,70 +10,28 @@ pub enum DiffResult {
|
||||
}
|
||||
|
||||
pub fn diff_torrents(old: &[Torrent], new: &[Torrent]) -> DiffResult {
|
||||
// 1. Structural Check: Eğer torrent sayısı değişmişse (yeni eklenen veya silinen),
|
||||
// şimdilik basitlik adına FullUpdate gönderiyoruz.
|
||||
if old.len() != new.len() {
|
||||
return DiffResult::FullUpdate;
|
||||
}
|
||||
|
||||
// 2. Hash Set Karşılaştırması:
|
||||
// Sıralama değişmiş olabilir ama torrentler aynı mı?
|
||||
let old_map: HashMap<&str, &Torrent> = old.iter().map(|t| (t.hash.as_str(), t)).collect();
|
||||
|
||||
// Eğer yeni listedeki bir hash eski listede yoksa, yapı değişmiş demektir.
|
||||
for new_t in new {
|
||||
if !old_map.contains_key(new_t.hash.as_str()) {
|
||||
return DiffResult::FullUpdate;
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Alan Güncellemeleri (Partial Updates)
|
||||
// Buraya geldiğimizde biliyoruz ki old ve new listelerindeki torrentler (hash olarak) aynı,
|
||||
// sadece sıraları farklı olabilir veya içindeki veriler güncellenmiş olabilir.
|
||||
let mut events = Vec::new();
|
||||
|
||||
for new_t in new {
|
||||
// old_map'ten ilgili torrente hash ile ulaşalım (sıradan bağımsız)
|
||||
let old_t = old_map.get(new_t.hash.as_str()).unwrap();
|
||||
|
||||
let mut update = TorrentUpdate {
|
||||
hash: new_t.hash.clone(),
|
||||
name: None,
|
||||
size: None,
|
||||
down_rate: None,
|
||||
up_rate: None,
|
||||
percent_complete: None,
|
||||
completed: None,
|
||||
eta: None,
|
||||
status: None,
|
||||
error_message: None,
|
||||
label: None,
|
||||
};
|
||||
// struct_patch::diff uses the Patch trait we derived in shared crate
|
||||
let patch = old_t.diff(new_t);
|
||||
|
||||
let mut has_changes = false;
|
||||
|
||||
// Alanları karşılaştır
|
||||
if old_t.name != new_t.name {
|
||||
update.name = Some(new_t.name.clone());
|
||||
has_changes = true;
|
||||
}
|
||||
if old_t.size != new_t.size {
|
||||
update.size = Some(new_t.size);
|
||||
has_changes = true;
|
||||
}
|
||||
if old_t.down_rate != new_t.down_rate {
|
||||
update.down_rate = Some(new_t.down_rate);
|
||||
has_changes = true;
|
||||
}
|
||||
if old_t.up_rate != new_t.up_rate {
|
||||
update.up_rate = Some(new_t.up_rate);
|
||||
has_changes = true;
|
||||
}
|
||||
if (old_t.percent_complete - new_t.percent_complete).abs() > 0.01 {
|
||||
update.percent_complete = Some(new_t.percent_complete);
|
||||
has_changes = true;
|
||||
|
||||
// Torrent tamamlanma kontrolü
|
||||
if !patch.is_empty() {
|
||||
// If percent_complete jumped to 100, send notification
|
||||
if old_t.percent_complete < 100.0 && new_t.percent_complete >= 100.0 {
|
||||
tracing::info!("Torrent completed: {} ({})", new_t.name, new_t.hash);
|
||||
events.push(AppEvent::Notification(SystemNotification {
|
||||
@@ -80,35 +39,7 @@ pub fn diff_torrents(old: &[Torrent], new: &[Torrent]) -> DiffResult {
|
||||
message: format!("Torrent tamamlandı: {}", new_t.name),
|
||||
}));
|
||||
}
|
||||
}
|
||||
if old_t.completed != new_t.completed {
|
||||
update.completed = Some(new_t.completed);
|
||||
has_changes = true;
|
||||
}
|
||||
if old_t.eta != new_t.eta {
|
||||
update.eta = Some(new_t.eta);
|
||||
has_changes = true;
|
||||
}
|
||||
if old_t.status != new_t.status {
|
||||
update.status = Some(new_t.status.clone());
|
||||
has_changes = true;
|
||||
|
||||
tracing::debug!(
|
||||
"Torrent status changed: {} ({}) {:?} -> {:?}",
|
||||
new_t.name, new_t.hash, old_t.status, new_t.status
|
||||
);
|
||||
}
|
||||
if old_t.error_message != new_t.error_message {
|
||||
update.error_message = Some(new_t.error_message.clone());
|
||||
has_changes = true;
|
||||
}
|
||||
if old_t.label != new_t.label {
|
||||
update.label = new_t.label.clone();
|
||||
has_changes = true;
|
||||
}
|
||||
|
||||
if has_changes {
|
||||
events.push(AppEvent::Update(update));
|
||||
events.push(AppEvent::Update(patch));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -192,9 +192,25 @@ pub async fn fetch_global_stats(client: &RtorrentClient) -> Result<GlobalStats,
|
||||
})
|
||||
}
|
||||
|
||||
use shared::xmlrpc::{
|
||||
parse_i64_response, parse_multicall_response, RpcParam, RtorrentClient, XmlRpcError,
|
||||
};
|
||||
use crate::AppState;
|
||||
use axum::extract::State;
|
||||
use axum::response::sse::{Event, Sse};
|
||||
use futures::stream::{self, Stream};
|
||||
use shared::{AppEvent, GlobalStats, Torrent, TorrentStatus};
|
||||
use std::convert::Infallible;
|
||||
use tokio_stream::StreamExt;
|
||||
use axum::response::IntoResponse;
|
||||
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
|
||||
|
||||
|
||||
// ... (fields and other helper functions remain the same)
|
||||
|
||||
pub async fn sse_handler(
|
||||
State(state): State<AppState>,
|
||||
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
|
||||
) -> impl IntoResponse {
|
||||
// Notify background worker to wake up and poll immediately
|
||||
state.notify_poll.notify_one();
|
||||
|
||||
@@ -213,8 +229,8 @@ pub async fn sse_handler(
|
||||
timestamp,
|
||||
};
|
||||
|
||||
match serde_json::to_string(&event_data) {
|
||||
Ok(json) => Event::default().data(json),
|
||||
match rmp_serde::to_vec(&event_data) {
|
||||
Ok(bytes) => Event::default().data(BASE64.encode(bytes)),
|
||||
Err(_) => Event::default().comment("init_error"),
|
||||
}
|
||||
};
|
||||
@@ -226,10 +242,10 @@ pub async fn sse_handler(
|
||||
let rx = state.event_bus.subscribe();
|
||||
let update_stream = stream::unfold(rx, |mut rx| async move {
|
||||
match rx.recv().await {
|
||||
Ok(event) => match serde_json::to_string(&event) {
|
||||
Ok(json) => Some((Ok::<Event, Infallible>(Event::default().data(json)), rx)),
|
||||
Ok(event) => match rmp_serde::to_vec(&event) {
|
||||
Ok(bytes) => Some((Ok::<Event, Infallible>(Event::default().data(BASE64.encode(bytes))), rx)),
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to serialize SSE event: {}", e);
|
||||
tracing::warn!("Failed to serialize SSE event (MessagePack): {}", e);
|
||||
Some((
|
||||
Ok::<Event, Infallible>(Event::default().comment("error")),
|
||||
rx,
|
||||
@@ -244,6 +260,11 @@ pub async fn sse_handler(
|
||||
}
|
||||
});
|
||||
|
||||
Sse::new(initial_stream.chain(update_stream))
|
||||
.keep_alive(axum::response::sse::KeepAlive::default())
|
||||
let sse = Sse::new(initial_stream.chain(update_stream))
|
||||
.keep_alive(axum::response::sse::KeepAlive::default());
|
||||
|
||||
(
|
||||
[("content-type", "application/x-msgpack")],
|
||||
sse
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user