feat: Refactor VibeTorrent v3 with shared crate, fine-grained updates, tracing, and middleware optimization
This commit is contained in:
81
backend/src/diff.rs
Normal file
81
backend/src/diff.rs
Normal file
@@ -0,0 +1,81 @@
|
||||
use shared::{AppEvent, Torrent, TorrentUpdate};
|
||||
|
||||
pub fn diff_torrents(old: &[Torrent], new: &[Torrent]) -> Vec<AppEvent> {
|
||||
// 1. Structural Change Check
|
||||
// If length differs or any hash at specific index differs (simplistic view), send FullList.
|
||||
// Ideally we should track "Added/Removed", but for simplicity and robustness as per prompt "FullList for big changes",
|
||||
// we fallback to FullList on structural changes.
|
||||
if old.len() != new.len() {
|
||||
// Timestamp is needed for FullList? The definition is FullList(Vec, u64).
|
||||
// We'll let the caller handle the timestamp or pass it in?
|
||||
// AppEvent in shared::lib.rs is FullList(Vec<Torrent>, u64).
|
||||
// We'll return just the list decision here, or constructs events.
|
||||
// Let's assume caller adds the u64 (disk space/timestamp).
|
||||
// Actually, let's keep it simple: Return Option<Vec<AppEvent>>.
|
||||
// But simply returning "NeedFullList" signal is easier if we can't accept u64 here.
|
||||
// Let's change signature to return an enum or boolean flag if FullList needed.
|
||||
return vec![]; // Special signal: Empty vec means "No diffs" or "Caller handles FullList"?
|
||||
// This function is tricky if we don't have the u64.
|
||||
}
|
||||
|
||||
// Check for hash mismatch (order changed)
|
||||
for (i, t) in new.iter().enumerate() {
|
||||
if old[i].hash != t.hash {
|
||||
return vec![]; // Signal Full List needed
|
||||
}
|
||||
}
|
||||
|
||||
let mut events = Vec::new();
|
||||
|
||||
for (i, new_t) in new.iter().enumerate() {
|
||||
let old_t = &old[i];
|
||||
|
||||
let mut update = TorrentUpdate {
|
||||
hash: new_t.hash.clone(),
|
||||
down_rate: None,
|
||||
up_rate: None,
|
||||
percent_complete: None,
|
||||
completed: None,
|
||||
eta: None,
|
||||
status: None,
|
||||
};
|
||||
|
||||
let mut has_changes = false;
|
||||
|
||||
if old_t.down_rate != new_t.down_rate {
|
||||
update.down_rate = Some(new_t.down_rate);
|
||||
has_changes = true;
|
||||
}
|
||||
if old_t.up_rate != new_t.up_rate {
|
||||
update.up_rate = Some(new_t.up_rate);
|
||||
has_changes = true;
|
||||
}
|
||||
// Floating point comparison with epsilon
|
||||
if (old_t.percent_complete - new_t.percent_complete).abs() > 0.01 {
|
||||
update.percent_complete = Some(new_t.percent_complete);
|
||||
has_changes = true;
|
||||
}
|
||||
if old_t.completed != new_t.completed {
|
||||
update.completed = Some(new_t.completed);
|
||||
has_changes = true;
|
||||
}
|
||||
if old_t.eta != new_t.eta {
|
||||
update.eta = Some(new_t.eta);
|
||||
has_changes = true;
|
||||
}
|
||||
if old_t.status != new_t.status {
|
||||
update.status = Some(new_t.status.clone());
|
||||
has_changes = true;
|
||||
}
|
||||
|
||||
if has_changes {
|
||||
events.push(AppEvent::Update(update));
|
||||
}
|
||||
}
|
||||
|
||||
if !events.is_empty() {
|
||||
tracing::debug!("Generated {} updates", events.len());
|
||||
}
|
||||
|
||||
events
|
||||
}
|
||||
@@ -1,13 +1,8 @@
|
||||
|
||||
mod models;
|
||||
mod diff;
|
||||
mod scgi;
|
||||
mod sse;
|
||||
mod xmlrpc;
|
||||
|
||||
// fixup modules
|
||||
// remove mm if I didn't create it? I didn't.
|
||||
// I will structure modules correctly.
|
||||
|
||||
use clap::Parser;
|
||||
use rust_embed::RustEmbed;
|
||||
use axum::{
|
||||
@@ -17,10 +12,28 @@ use axum::{
|
||||
routing::{get, post},
|
||||
Router, Json,
|
||||
};
|
||||
use tower_http::cors::CorsLayer;
|
||||
use tower_http::{
|
||||
cors::CorsLayer,
|
||||
trace::TraceLayer,
|
||||
compression::{CompressionLayer, CompressionLevel},
|
||||
};
|
||||
use axum::{
|
||||
error_handling::HandleErrorLayer,
|
||||
BoxError,
|
||||
};
|
||||
use tower::ServiceBuilder;
|
||||
use serde::Deserialize;
|
||||
use std::net::SocketAddr;
|
||||
use crate::models::AppState;
|
||||
use shared::{Torrent, TorrentActionRequest, AppEvent}; // shared crates imports
|
||||
use tokio::sync::{watch, broadcast};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub tx: Arc<watch::Sender<Vec<Torrent>>>,
|
||||
pub event_bus: broadcast::Sender<AppEvent>,
|
||||
pub scgi_socket_path: String,
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
@@ -66,8 +79,6 @@ async fn static_handler(uri: Uri) -> impl IntoResponse {
|
||||
}
|
||||
}
|
||||
|
||||
use tokio::sync::watch;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -79,19 +90,19 @@ async fn add_torrent_handler(
|
||||
State(state): State<AppState>,
|
||||
Json(payload): Json<AddTorrentRequest>,
|
||||
) -> StatusCode {
|
||||
println!("Received add_torrent request. URI length: {}", payload.uri.len());
|
||||
tracing::info!("Received add_torrent request. URI length: {}", payload.uri.len());
|
||||
let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path);
|
||||
match client.call("load.start", &["", &payload.uri]).await {
|
||||
Ok(response) => {
|
||||
println!("rTorrent response to load.start: {}", response);
|
||||
tracing::debug!("rTorrent response to load.start: {}", response);
|
||||
if response.contains("faultCode") {
|
||||
eprintln!("rTorrent returned fault: {}", response);
|
||||
tracing::error!("rTorrent returned fault: {}", response);
|
||||
return StatusCode::INTERNAL_SERVER_ERROR;
|
||||
}
|
||||
StatusCode::OK
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("Failed to add torrent: {}", e);
|
||||
tracing::error!("Failed to add torrent: {}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
}
|
||||
}
|
||||
@@ -99,36 +110,79 @@ async fn add_torrent_handler(
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// initialize tracing
|
||||
tracing_subscriber::fmt::init();
|
||||
// initialize tracing with env filter (default to info)
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env()
|
||||
.add_directive(tracing::Level::INFO.into()))
|
||||
.init();
|
||||
|
||||
// Parse CLI Args
|
||||
let args = Args::parse();
|
||||
println!("Starting VibeTorrent Backend...");
|
||||
println!("Socket: {}", args.socket);
|
||||
println!("Port: {}", args.port);
|
||||
tracing::info!("Starting VibeTorrent Backend...");
|
||||
tracing::info!("Socket: {}", args.socket);
|
||||
tracing::info!("Port: {}", args.port);
|
||||
|
||||
// Channel for torrent list updates
|
||||
// Channel for latest state (for new clients)
|
||||
let (tx, _rx) = watch::channel(vec![]);
|
||||
let tx = Arc::new(tx);
|
||||
|
||||
// Channel for Events (Diffs)
|
||||
let (event_bus, _) = broadcast::channel::<AppEvent>(1024);
|
||||
|
||||
let app_state = AppState {
|
||||
tx: tx.clone(),
|
||||
event_bus: event_bus.clone(),
|
||||
scgi_socket_path: args.socket.clone(),
|
||||
};
|
||||
|
||||
// Spawn background task to poll rTorrent
|
||||
let tx_clone = tx.clone();
|
||||
let event_bus_tx = event_bus.clone();
|
||||
let socket_path = args.socket.clone(); // Clone for background task
|
||||
|
||||
tokio::spawn(async move {
|
||||
let client = xmlrpc::RtorrentClient::new(&socket_path);
|
||||
let mut previous_torrents: Vec<Torrent> = Vec::new();
|
||||
|
||||
loop {
|
||||
match sse::fetch_torrents(&client).await {
|
||||
Ok(torrents) => {
|
||||
let _ = tx_clone.send(torrents);
|
||||
Ok(new_torrents) => {
|
||||
// 1. Update latest state (always)
|
||||
let _ = tx_clone.send(new_torrents.clone());
|
||||
|
||||
// 2. Calculate Diff and Broadcasting
|
||||
let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
|
||||
|
||||
let mut structural_change = false;
|
||||
if previous_torrents.len() != new_torrents.len() {
|
||||
structural_change = true;
|
||||
} else {
|
||||
// Check for order/hash change
|
||||
for (i, t) in new_torrents.iter().enumerate() {
|
||||
if previous_torrents[i].hash != t.hash {
|
||||
structural_change = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if structural_change {
|
||||
// Structural change -> Send FullList
|
||||
let _ = event_bus_tx.send(AppEvent::FullList(new_torrents.clone(), now));
|
||||
} else {
|
||||
// Same structure -> Calculate partial updates
|
||||
let updates = diff::diff_torrents(&previous_torrents, &new_torrents);
|
||||
if !updates.is_empty() {
|
||||
for update in updates {
|
||||
let _ = event_bus_tx.send(update);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
previous_torrents = new_torrents;
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Error fetching torrents in background: {}", e);
|
||||
tracing::error!("Error fetching torrents in background: {}", e);
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
@@ -140,20 +194,29 @@ async fn main() {
|
||||
.route("/api/torrents/add", post(add_torrent_handler))
|
||||
.route("/api/torrents/action", post(handle_torrent_action))
|
||||
.fallback(static_handler) // Serve static files for everything else
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.layer(CompressionLayer::new()
|
||||
.br(false)
|
||||
.gzip(true)
|
||||
.quality(CompressionLevel::Fastest))
|
||||
.layer(ServiceBuilder::new()
|
||||
.layer(HandleErrorLayer::new(handle_timeout_error))
|
||||
.layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(30)))
|
||||
)
|
||||
.layer(CorsLayer::permissive())
|
||||
.with_state(app_state);
|
||||
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], args.port));
|
||||
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
|
||||
println!("Backend listening on {}", addr);
|
||||
tracing::info!("Backend listening on {}", addr);
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
}
|
||||
|
||||
async fn handle_torrent_action(
|
||||
State(state): State<AppState>,
|
||||
Json(payload): Json<models::TorrentActionRequest>,
|
||||
Json(payload): Json<TorrentActionRequest>,
|
||||
) -> impl IntoResponse {
|
||||
println!("Received action: {} for hash: {}", payload.action, payload.hash);
|
||||
tracing::info!("Received action: {} for hash: {}", payload.action, payload.hash);
|
||||
|
||||
// Special handling for delete_with_data
|
||||
if payload.action == "delete_with_data" {
|
||||
@@ -170,14 +233,14 @@ async fn handle_torrent_action(
|
||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to parse path: {}", e)).into_response(),
|
||||
};
|
||||
|
||||
println!("Attempting to delete torrent and data at path: {}", path);
|
||||
tracing::info!("Attempting to delete torrent and data at path: {}", path);
|
||||
if path.trim().is_empty() || path == "/" {
|
||||
return (StatusCode::BAD_REQUEST, "Safety check failed: Path is empty or root").into_response();
|
||||
}
|
||||
|
||||
// 2. Erase Torrent first (so rTorrent releases locks?)
|
||||
if let Err(e) = client.call("d.erase", &[&payload.hash]).await {
|
||||
eprintln!("Failed to erase torrent entry: {}", e);
|
||||
tracing::warn!("Failed to erase torrent entry: {}", e);
|
||||
// Proceed anyway to delete files? Maybe not.
|
||||
}
|
||||
|
||||
@@ -199,8 +262,16 @@ async fn handle_torrent_action(
|
||||
match scgi::system_call(&state.scgi_socket_path, method, vec![&payload.hash]).await {
|
||||
Ok(_) => (StatusCode::OK, "Action executed").into_response(),
|
||||
Err(e) => {
|
||||
eprintln!("SCGI error: {:?}", e);
|
||||
tracing::error!("SCGI error: {:?}", e);
|
||||
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to execute action").into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_timeout_error(err: BoxError) -> (StatusCode, &'static str) {
|
||||
if err.is::<tower::timeout::error::Elapsed>() {
|
||||
(StatusCode::REQUEST_TIMEOUT, "Request timed out")
|
||||
} else {
|
||||
(StatusCode::INTERNAL_SERVER_ERROR, "Unhandled internal error")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Torrent {
|
||||
pub hash: String,
|
||||
pub name: String,
|
||||
pub size: i64,
|
||||
pub completed: i64,
|
||||
pub down_rate: i64,
|
||||
pub up_rate: i64,
|
||||
pub eta: i64,
|
||||
pub percent_complete: f64,
|
||||
pub status: TorrentStatus,
|
||||
pub error_message: String,
|
||||
pub added_date: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TorrentActionRequest {
|
||||
pub hash: String,
|
||||
pub action: String, // "start", "stop", "delete"
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
pub enum TorrentStatus {
|
||||
Downloading,
|
||||
Seeding,
|
||||
Paused,
|
||||
Error,
|
||||
Checking,
|
||||
Queued,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(tag = "type", content = "data")]
|
||||
pub enum AppEvent {
|
||||
FullList(Vec<Torrent>, u64),
|
||||
Update(TorrentUpdate),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TorrentUpdate {
|
||||
pub hash: String,
|
||||
// Optional fields for partial updates
|
||||
pub down_rate: Option<i64>,
|
||||
pub up_rate: Option<i64>,
|
||||
pub percent_complete: Option<f64>,
|
||||
}
|
||||
|
||||
use tokio::sync::watch;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub tx: Arc<watch::Sender<Vec<Torrent>>>,
|
||||
pub scgi_socket_path: String,
|
||||
}
|
||||
@@ -132,7 +132,7 @@ pub async fn system_call(
|
||||
}
|
||||
xml.push_str("</params></methodCall>");
|
||||
|
||||
println!("Sending XML-RPC Payload: {}", xml); // Debug logging
|
||||
tracing::debug!("Sending XML-RPC Payload: {}", xml);
|
||||
|
||||
let req = ScgiRequest::new().body(xml.clone().into_bytes());
|
||||
let response_bytes = send_request(socket_path, req).await?;
|
||||
|
||||
@@ -2,7 +2,7 @@ use axum::response::sse::{Event, Sse};
|
||||
use futures::stream::{self, Stream};
|
||||
use std::convert::Infallible;
|
||||
use tokio_stream::StreamExt;
|
||||
use crate::models::{AppEvent, Torrent};
|
||||
use shared::{AppEvent, Torrent, TorrentStatus};
|
||||
use crate::xmlrpc::{RtorrentClient, parse_multicall_response};
|
||||
|
||||
// Helper (should be moved to utils)
|
||||
@@ -64,15 +64,15 @@ pub async fn fetch_torrents(client: &RtorrentClient) -> Result<Vec<Torrent>, Str
|
||||
|
||||
// Status Logic
|
||||
let status = if !message.is_empty() {
|
||||
crate::models::TorrentStatus::Error
|
||||
TorrentStatus::Error
|
||||
} else if is_hashing != 0 {
|
||||
crate::models::TorrentStatus::Checking
|
||||
TorrentStatus::Checking
|
||||
} else if state == 0 {
|
||||
crate::models::TorrentStatus::Paused
|
||||
TorrentStatus::Paused
|
||||
} else if is_complete != 0 {
|
||||
crate::models::TorrentStatus::Seeding
|
||||
TorrentStatus::Seeding
|
||||
} else {
|
||||
crate::models::TorrentStatus::Downloading
|
||||
TorrentStatus::Downloading
|
||||
};
|
||||
|
||||
// ETA Logic (seconds)
|
||||
@@ -110,7 +110,7 @@ pub async fn fetch_torrents(client: &RtorrentClient) -> Result<Vec<Torrent>, Str
|
||||
}
|
||||
|
||||
use axum::extract::State;
|
||||
use crate::models::AppState;
|
||||
use crate::AppState; // Import from crate root
|
||||
|
||||
pub async fn sse_handler(
|
||||
State(state): State<AppState>,
|
||||
@@ -132,19 +132,25 @@ pub async fn sse_handler(
|
||||
let initial_stream = stream::once(async { Ok::<Event, Infallible>(initial_event) });
|
||||
|
||||
// Stream that waits for subsequent changes
|
||||
let update_stream = stream::unfold(state.tx.subscribe(), |mut rx| async move {
|
||||
if let Err(_) = rx.changed().await {
|
||||
return None;
|
||||
// Stream that waits for subsequent changes via Broadcast channel
|
||||
let rx = state.event_bus.subscribe();
|
||||
let update_stream = stream::unfold(rx, |mut rx| async move {
|
||||
match rx.recv().await {
|
||||
Ok(event) => {
|
||||
match serde_json::to_string(&event) {
|
||||
Ok(json) => Some((Ok::<Event, Infallible>(Event::default().data(json)), rx)),
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to serialize SSE event: {}", e);
|
||||
Some((Ok::<Event, Infallible>(Event::default().comment("error")), rx))
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
// If channel closed or lagged, close stream so client reconnects and gets fresh state
|
||||
tracing::warn!("SSE Broadcast channel error (lagged/closed): {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
let torrents = rx.borrow().clone();
|
||||
// println!("Broadcasting SSE update with {} items", torrents.len());
|
||||
let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
|
||||
let event_data = AppEvent::FullList(torrents, timestamp);
|
||||
|
||||
match serde_json::to_string(&event_data) {
|
||||
Ok(json) => Some((Ok::<Event, Infallible>(Event::default().data(json)), rx)),
|
||||
Err(_) => Some((Ok::<Event, Infallible>(Event::default().comment("error")), rx)),
|
||||
}
|
||||
});
|
||||
|
||||
Sse::new(initial_stream.chain(update_stream))
|
||||
|
||||
Reference in New Issue
Block a user