feat: Initial release with MIPS support
This commit is contained in:
182
backend/src/main.rs
Normal file
182
backend/src/main.rs
Normal file
@@ -0,0 +1,182 @@
|
||||
|
||||
mod models;
|
||||
mod scgi;
|
||||
mod sse;
|
||||
mod xmlrpc;
|
||||
|
||||
// fixup modules
|
||||
// remove mm if I didn't create it? I didn't.
|
||||
// I will structure modules correctly.
|
||||
|
||||
use clap::Parser;
|
||||
use rust_embed::RustEmbed;
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::{Path, State},
|
||||
http::{header, StatusCode, Uri},
|
||||
response::{IntoResponse, Response},
|
||||
routing::{get, post},
|
||||
Router, Json,
|
||||
};
|
||||
use tower_http::cors::CorsLayer;
|
||||
use serde::Deserialize;
|
||||
use std::net::SocketAddr;
|
||||
use crate::models::AppState;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// Path to rTorrent SCGI socket
|
||||
#[arg(short, long, default_value = "/tmp/rtorrent.sock")]
|
||||
socket: String,
|
||||
|
||||
/// Port to listen on
|
||||
#[arg(short, long, default_value_t = 3000)]
|
||||
port: u16,
|
||||
}
|
||||
|
||||
#[derive(RustEmbed)]
|
||||
#[folder = "../frontend/dist"]
|
||||
struct Asset;
|
||||
|
||||
async fn static_handler(uri: Uri) -> impl IntoResponse {
|
||||
let mut path = uri.path().trim_start_matches('/').to_string();
|
||||
|
||||
if path.is_empty() {
|
||||
path = "index.html".to_string();
|
||||
}
|
||||
|
||||
match Asset::get(&path) {
|
||||
Some(content) => {
|
||||
let mime = mime_guess::from_path(&path).first_or_octet_stream();
|
||||
([(header::CONTENT_TYPE, mime.as_ref())], content.data).into_response()
|
||||
}
|
||||
None => {
|
||||
if path.contains('.') {
|
||||
return StatusCode::NOT_FOUND.into_response();
|
||||
}
|
||||
// Fallback to index.html for SPA routing
|
||||
match Asset::get("index.html") {
|
||||
Some(content) => {
|
||||
let mime = mime_guess::from_path("index.html").first_or_octet_stream();
|
||||
([(header::CONTENT_TYPE, mime.as_ref())], content.data).into_response()
|
||||
}
|
||||
None => StatusCode::NOT_FOUND.into_response(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use tokio::sync::watch;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
/* ... add_torrent_handler ... */
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// initialize tracing
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
// Parse CLI Args
|
||||
let args = Args::parse();
|
||||
println!("Starting VibeTorrent Backend...");
|
||||
println!("Socket: {}", args.socket);
|
||||
println!("Port: {}", args.port);
|
||||
|
||||
// Channel for torrent list updates
|
||||
let (tx, _rx) = watch::channel(vec![]);
|
||||
let tx = Arc::new(tx);
|
||||
|
||||
let app_state = AppState {
|
||||
tx: tx.clone(),
|
||||
scgi_socket_path: args.socket.clone(),
|
||||
};
|
||||
|
||||
// Spawn background task to poll rTorrent
|
||||
let tx_clone = tx.clone();
|
||||
let socket_path = args.socket.clone(); // Clone for background task
|
||||
tokio::spawn(async move {
|
||||
let client = xmlrpc::RtorrentClient::new(&socket_path);
|
||||
loop {
|
||||
match sse::fetch_torrents(&client).await {
|
||||
Ok(torrents) => {
|
||||
let _ = tx_clone.send(torrents);
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Error fetching torrents in background: {}", e);
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
});
|
||||
|
||||
let app = Router::new()
|
||||
.route("/api/events", get(sse::sse_handler))
|
||||
.route("/api/torrents/add", post(add_torrent_handler))
|
||||
.route("/api/torrents/action", post(handle_torrent_action))
|
||||
.fallback(static_handler) // Serve static files for everything else
|
||||
.layer(CorsLayer::permissive())
|
||||
.with_state(app_state);
|
||||
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], args.port));
|
||||
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
|
||||
println!("Backend listening on {}", addr);
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
}
|
||||
|
||||
async fn handle_torrent_action(
|
||||
State(state): State<AppState>,
|
||||
Json(payload): Json<models::TorrentActionRequest>,
|
||||
) -> impl IntoResponse {
|
||||
println!("Received action: {} for hash: {}", payload.action, payload.hash);
|
||||
|
||||
// Special handling for delete_with_data
|
||||
if payload.action == "delete_with_data" {
|
||||
let client = xmlrpc::RtorrentClient::new(&state.scgi_socket_path);
|
||||
|
||||
// 1. Get Base Path
|
||||
let path_xml = match client.call("d.base_path", &[&payload.hash]).await {
|
||||
Ok(xml) => xml,
|
||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to call rTorrent: {}", e)).into_response(),
|
||||
};
|
||||
|
||||
let path = match xmlrpc::parse_string_response(&path_xml) {
|
||||
Ok(p) => p,
|
||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to parse path: {}", e)).into_response(),
|
||||
};
|
||||
|
||||
println!("Attempting to delete torrent and data at path: {}", path);
|
||||
if path.trim().is_empty() || path == "/" {
|
||||
return (StatusCode::BAD_REQUEST, "Safety check failed: Path is empty or root").into_response();
|
||||
}
|
||||
|
||||
// 2. Erase Torrent first (so rTorrent releases locks?)
|
||||
if let Err(e) = client.call("d.erase", &[&payload.hash]).await {
|
||||
eprintln!("Failed to erase torrent entry: {}", e);
|
||||
// Proceed anyway to delete files? Maybe not.
|
||||
}
|
||||
|
||||
// 3. Delete Files via rTorrent (execute.throw.bg)
|
||||
// Command: rm -rf <path>
|
||||
match client.call("execute.throw.bg", &["", "rm", "-rf", &path]).await {
|
||||
Ok(_) => return (StatusCode::OK, "Torrent and data deleted").into_response(),
|
||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to delete data: {}", e)).into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
let method = match payload.action.as_str() {
|
||||
"start" => "d.start",
|
||||
"stop" => "d.stop",
|
||||
"delete" => "d.erase",
|
||||
_ => return (StatusCode::BAD_REQUEST, "Invalid action").into_response(),
|
||||
};
|
||||
|
||||
match scgi::system_call(&state.scgi_socket_path, method, vec![&payload.hash]).await {
|
||||
Ok(_) => (StatusCode::OK, "Action executed").into_response(),
|
||||
Err(e) => {
|
||||
eprintln!("SCGI error: {:?}", e);
|
||||
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to execute action").into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
57
backend/src/models.rs
Normal file
57
backend/src/models.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Torrent {
|
||||
pub hash: String,
|
||||
pub name: String,
|
||||
pub size: i64,
|
||||
pub completed: i64,
|
||||
pub down_rate: i64,
|
||||
pub up_rate: i64,
|
||||
pub eta: i64,
|
||||
pub percent_complete: f64,
|
||||
pub status: TorrentStatus,
|
||||
pub error_message: String,
|
||||
pub added_date: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TorrentActionRequest {
|
||||
pub hash: String,
|
||||
pub action: String, // "start", "stop", "delete"
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
pub enum TorrentStatus {
|
||||
Downloading,
|
||||
Seeding,
|
||||
Paused,
|
||||
Error,
|
||||
Checking,
|
||||
Queued,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(tag = "type", content = "data")]
|
||||
pub enum AppEvent {
|
||||
FullList(Vec<Torrent>, u64),
|
||||
Update(TorrentUpdate),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TorrentUpdate {
|
||||
pub hash: String,
|
||||
// Optional fields for partial updates
|
||||
pub down_rate: Option<i64>,
|
||||
pub up_rate: Option<i64>,
|
||||
pub percent_complete: Option<f64>,
|
||||
}
|
||||
|
||||
use tokio::sync::watch;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub tx: Arc<watch::Sender<Vec<Torrent>>>,
|
||||
pub scgi_socket_path: String,
|
||||
}
|
||||
144
backend/src/scgi.rs
Normal file
144
backend/src/scgi.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use std::collections::HashMap;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::UnixStream;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ScgiError {
|
||||
Io(std::io::Error),
|
||||
Protocol(String),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for ScgiError {
|
||||
fn from(err: std::io::Error) -> Self {
|
||||
ScgiError::Io(err)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ScgiRequest {
|
||||
headers: HashMap<String, String>,
|
||||
body: Vec<u8>,
|
||||
}
|
||||
|
||||
impl ScgiRequest {
|
||||
pub fn new() -> Self {
|
||||
let mut headers = HashMap::new();
|
||||
headers.insert("SCGI".to_string(), "1".to_string());
|
||||
Self {
|
||||
headers,
|
||||
body: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn header(mut self, key: &str, value: &str) -> Self {
|
||||
self.headers.insert(key.to_string(), value.to_string());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn body(mut self, body: Vec<u8>) -> Self {
|
||||
self.body = body;
|
||||
self.headers
|
||||
.insert("CONTENT_LENGTH".to_string(), self.body.len().to_string());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn encode(&self) -> Vec<u8> {
|
||||
let mut headers_data = Vec::new();
|
||||
|
||||
// SCGI Spec: The first header must be "CONTENT_LENGTH"
|
||||
// The second header must be "SCGI" with value "1"
|
||||
|
||||
// We handle CONTENT_LENGTH and SCGI explicitly first
|
||||
let content_len = self.body.len().to_string();
|
||||
headers_data.extend_from_slice(b"CONTENT_LENGTH");
|
||||
headers_data.push(0);
|
||||
headers_data.extend_from_slice(content_len.as_bytes());
|
||||
headers_data.push(0);
|
||||
|
||||
headers_data.extend_from_slice(b"SCGI");
|
||||
headers_data.push(0);
|
||||
headers_data.extend_from_slice(b"1");
|
||||
headers_data.push(0);
|
||||
|
||||
// Add remaining headers (excluding the ones we just added if they exist in the map)
|
||||
for (k, v) in &self.headers {
|
||||
if k == "CONTENT_LENGTH" || k == "SCGI" {
|
||||
continue;
|
||||
}
|
||||
headers_data.extend_from_slice(k.as_bytes());
|
||||
headers_data.push(0);
|
||||
headers_data.extend_from_slice(v.as_bytes());
|
||||
headers_data.push(0);
|
||||
}
|
||||
|
||||
let headers_len = headers_data.len();
|
||||
let mut packet = Vec::new();
|
||||
let len_str = headers_len.to_string();
|
||||
packet.extend_from_slice(len_str.as_bytes());
|
||||
packet.push(b':');
|
||||
packet.extend(headers_data);
|
||||
packet.push(b',');
|
||||
packet.extend(&self.body);
|
||||
|
||||
packet
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_request(
|
||||
socket_path: &str,
|
||||
request: ScgiRequest,
|
||||
) -> Result<Bytes, ScgiError> {
|
||||
let mut stream = UnixStream::connect(socket_path).await?;
|
||||
let data = request.encode();
|
||||
stream.write_all(&data).await?;
|
||||
|
||||
let mut response = Vec::new();
|
||||
stream.read_to_end(&mut response).await?;
|
||||
|
||||
// The response is usually HTTP-like: headers\r\n\r\nbody
|
||||
// We strictly want the body for XML-RPC
|
||||
// Find double newline
|
||||
let double_newline = b"\r\n\r\n";
|
||||
if let Some(pos) = response
|
||||
.windows(double_newline.len())
|
||||
.position(|window| window == double_newline)
|
||||
{
|
||||
Ok(Bytes::from(response.split_off(pos + double_newline.len())))
|
||||
} else {
|
||||
// Fallback: rTorrent sometimes sends raw XML without headers if configured poorly,
|
||||
// but SCGI usually implies headers.
|
||||
// If we don't find headers, maybe it's all body?
|
||||
// But usually there's at least "Status: 200 OK"
|
||||
// Let's return everything if we can't find the split, or error.
|
||||
// For now, assume everything is body if no headers found might be unsafe,
|
||||
// but valid for simple XML-RPC dumping.
|
||||
Ok(Bytes::from(response))
|
||||
}
|
||||
}
|
||||
pub async fn system_call(
|
||||
socket_path: &str,
|
||||
method: &str,
|
||||
params: Vec<&str>,
|
||||
) -> Result<String, ScgiError> {
|
||||
// Construct XML-RPC payload manually for simplicity
|
||||
// <methodCall><methodName>method</methodName><params><param><value><string>val</string></value></param>...</params></methodCall>
|
||||
let mut xml = String::from("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
|
||||
xml.push_str(&format!("<methodCall><methodName>{}</methodName><params>", method));
|
||||
for param in params {
|
||||
// Use CDATA for safety with special chars in magnet links
|
||||
xml.push_str(&format!("<param><value><string><![CDATA[{}]]></string></value></param>", param));
|
||||
}
|
||||
xml.push_str("</params></methodCall>");
|
||||
|
||||
println!("Sending XML-RPC Payload: {}", xml); // Debug logging
|
||||
|
||||
let req = ScgiRequest::new().body(xml.clone().into_bytes());
|
||||
let response_bytes = send_request(socket_path, req).await?;
|
||||
let response_str = String::from_utf8_lossy(&response_bytes).to_string();
|
||||
|
||||
// Ideally parse the response, but for actions we just check if it executed without SCGI error
|
||||
// rTorrent usually returns <value><i8>0</i8></value> for success or fault.
|
||||
// For now, returning the raw string is fine for debugging/logging in main.
|
||||
|
||||
Ok(response_str)
|
||||
}
|
||||
157
backend/src/sse.rs
Normal file
157
backend/src/sse.rs
Normal file
@@ -0,0 +1,157 @@
|
||||
use axum::response::sse::{Event, Sse};
|
||||
use futures::stream::{self, Stream};
|
||||
use std::{convert::Infallible, time::Duration};
|
||||
use tokio_stream::StreamExt;
|
||||
use crate::models::{AppEvent, Torrent};
|
||||
use crate::xmlrpc::{RtorrentClient, parse_multicall_response};
|
||||
|
||||
// Helper (should be moved to utils)
|
||||
fn parse_size(s: &str) -> i64 {
|
||||
s.parse().unwrap_or(0)
|
||||
}
|
||||
|
||||
fn parse_float(s: &str) -> f64 {
|
||||
// rTorrent usually returns integers for bytes done etc.
|
||||
// We might need to handle empty strings.
|
||||
s.parse().unwrap_or(0.0)
|
||||
}
|
||||
|
||||
pub async fn fetch_torrents(client: &RtorrentClient) -> Result<Vec<Torrent>, String> {
|
||||
// d.multicall2("", "main", ...)
|
||||
let params = vec![
|
||||
"",
|
||||
"main",
|
||||
"d.hash=",
|
||||
"d.name=",
|
||||
"d.size_bytes=",
|
||||
"d.bytes_done=",
|
||||
"d.down.rate=",
|
||||
"d.up.rate=",
|
||||
"d.state=", // 6
|
||||
"d.complete=", // 7
|
||||
"d.message=", // 8
|
||||
"d.left_bytes=", // 9
|
||||
"d.creation_date=", // 10
|
||||
"d.hashing=", // 11
|
||||
];
|
||||
|
||||
match client.call("d.multicall2", ¶ms).await {
|
||||
Ok(xml) => {
|
||||
if xml.trim().is_empty() {
|
||||
return Err("Empty response from SCGI".to_string());
|
||||
}
|
||||
match parse_multicall_response(&xml) {
|
||||
Ok(rows) => {
|
||||
let torrents = rows.into_iter().map(|row| {
|
||||
// row map indexes:
|
||||
// 0: hash, 1: name, 2: size, 3: completed, 4: down_rate, 5: up_rate
|
||||
// 6: state, 7: complete, 8: message, 9: left_bytes, 10: added, 11: hashing
|
||||
|
||||
let hash = row.get(0).cloned().unwrap_or_default();
|
||||
let name = row.get(1).cloned().unwrap_or_default();
|
||||
let size = parse_size(row.get(2).unwrap_or(&"0".to_string()));
|
||||
let completed = parse_size(row.get(3).unwrap_or(&"0".to_string()));
|
||||
let down_rate = parse_size(row.get(4).unwrap_or(&"0".to_string()));
|
||||
let up_rate = parse_size(row.get(5).unwrap_or(&"0".to_string()));
|
||||
|
||||
let state = parse_size(row.get(6).unwrap_or(&"0".to_string()));
|
||||
let is_complete = parse_size(row.get(7).unwrap_or(&"0".to_string()));
|
||||
let message = row.get(8).cloned().unwrap_or_default();
|
||||
let left_bytes = parse_size(row.get(9).unwrap_or(&"0".to_string()));
|
||||
let added_date = parse_size(row.get(10).unwrap_or(&"0".to_string()));
|
||||
let is_hashing = parse_size(row.get(11).unwrap_or(&"0".to_string()));
|
||||
|
||||
let percent_complete = if size > 0 {
|
||||
(completed as f64 / size as f64) * 100.0
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
// Status Logic
|
||||
let status = if !message.is_empty() {
|
||||
crate::models::TorrentStatus::Error
|
||||
} else if is_hashing != 0 {
|
||||
crate::models::TorrentStatus::Checking
|
||||
} else if state == 0 {
|
||||
crate::models::TorrentStatus::Paused
|
||||
} else if is_complete != 0 {
|
||||
crate::models::TorrentStatus::Seeding
|
||||
} else {
|
||||
crate::models::TorrentStatus::Downloading
|
||||
};
|
||||
|
||||
// ETA Logic (seconds)
|
||||
let eta = if down_rate > 0 && left_bytes > 0 {
|
||||
left_bytes / down_rate
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
Torrent {
|
||||
hash,
|
||||
name,
|
||||
size,
|
||||
completed,
|
||||
down_rate,
|
||||
up_rate,
|
||||
eta,
|
||||
percent_complete,
|
||||
status,
|
||||
error_message: message,
|
||||
added_date,
|
||||
}
|
||||
}).collect();
|
||||
Ok(torrents)
|
||||
},
|
||||
Err(e) => {
|
||||
Err(format!("XML Parse Error: {}", e))
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
Err(format!("RPC Error: {}", e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use axum::extract::State;
|
||||
use crate::models::AppState;
|
||||
|
||||
pub async fn sse_handler(
|
||||
State(state): State<AppState>,
|
||||
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
|
||||
// Get initial value synchronously (from the watch channel's current state)
|
||||
let initial_rx = state.tx.subscribe();
|
||||
let initial_torrents = initial_rx.borrow().clone();
|
||||
|
||||
let initial_event = {
|
||||
let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
|
||||
let event_data = AppEvent::FullList(initial_torrents, timestamp);
|
||||
match serde_json::to_string(&event_data) {
|
||||
Ok(json) => Event::default().data(json),
|
||||
Err(_) => Event::default().comment("init_error"),
|
||||
}
|
||||
};
|
||||
|
||||
// Stream that yields the initial event once
|
||||
let initial_stream = stream::once(async { Ok::<Event, Infallible>(initial_event) });
|
||||
|
||||
// Stream that waits for subsequent changes
|
||||
let update_stream = stream::unfold(state.tx.subscribe(), |mut rx| async move {
|
||||
if let Err(_) = rx.changed().await {
|
||||
return None;
|
||||
}
|
||||
let torrents = rx.borrow().clone();
|
||||
// println!("Broadcasting SSE update with {} items", torrents.len());
|
||||
let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
|
||||
let event_data = AppEvent::FullList(torrents, timestamp);
|
||||
|
||||
match serde_json::to_string(&event_data) {
|
||||
Ok(json) => Some((Ok::<Event, Infallible>(Event::default().data(json)), rx)),
|
||||
Err(_) => Some((Ok::<Event, Infallible>(Event::default().comment("error")), rx)),
|
||||
}
|
||||
});
|
||||
|
||||
Sse::new(initial_stream.chain(update_stream))
|
||||
.keep_alive(axum::response::sse::KeepAlive::default())
|
||||
}
|
||||
181
backend/src/xmlrpc.rs
Normal file
181
backend/src/xmlrpc.rs
Normal file
@@ -0,0 +1,181 @@
|
||||
use crate::scgi::{send_request, ScgiRequest};
|
||||
use quick_xml::events::Event;
|
||||
use quick_xml::reader::Reader;
|
||||
use serde::Deserialize;
|
||||
|
||||
// Simple helper to build an XML-RPC method call
|
||||
pub fn build_method_call(method: &str, params: &[&str]) -> String {
|
||||
let mut xml = String::from("<?xml version=\"1.0\"?>\n<methodCall>\n");
|
||||
xml.push_str(&format!("<methodName>{}</methodName>\n<params>\n", method));
|
||||
for param in params {
|
||||
xml.push_str("<param><value><string><![CDATA[");
|
||||
xml.push_str(param);
|
||||
xml.push_str("]]></string></value></param>\n");
|
||||
}
|
||||
xml.push_str("</params>\n</methodCall>");
|
||||
xml
|
||||
}
|
||||
|
||||
pub struct RtorrentClient {
|
||||
socket_path: String,
|
||||
}
|
||||
|
||||
impl RtorrentClient {
|
||||
pub fn new(socket_path: &str) -> Self {
|
||||
Self {
|
||||
socket_path: socket_path.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn call(&self, method: &str, params: &[&str]) -> Result<String, String> {
|
||||
let xml = build_method_call(method, params);
|
||||
let req = ScgiRequest::new().body(xml.into_bytes());
|
||||
|
||||
match send_request(&self.socket_path, req).await {
|
||||
Ok(bytes) => {
|
||||
let s = String::from_utf8_lossy(&bytes).to_string();
|
||||
Ok(s)
|
||||
}
|
||||
Err(e) => Err(format!("{:?}", e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Specialized parser for d.multicall2 response
|
||||
// Expected structure:
|
||||
// <methodResponse><params><param><value><array><data>
|
||||
// <value><array><data>
|
||||
// <value><string>HASH</string></value>
|
||||
// <value><string>NAME</string></value>
|
||||
// ...
|
||||
// </data></array></value>
|
||||
// ...
|
||||
// </data></array></value></param></params></methodResponse>
|
||||
|
||||
pub fn parse_multicall_response(xml: &str) -> Result<Vec<Vec<String>>, String> {
|
||||
let mut reader = Reader::from_str(xml);
|
||||
reader.trim_text(true);
|
||||
|
||||
let mut buf = Vec::new();
|
||||
let mut results = Vec::new();
|
||||
let mut current_row = Vec::new();
|
||||
let mut inside_value = false;
|
||||
let mut current_text = String::new();
|
||||
|
||||
// Loop through events
|
||||
// Strategy: We look for <data> inside the outer array.
|
||||
// The outer array contains values which are arrays (rows).
|
||||
// Each row array contains values (columns).
|
||||
|
||||
// Simplified logic: flatten all <value>... content, but respect structure?
|
||||
// Actually, handling nested arrays properly with a streaming parser is tricky.
|
||||
// Let's rely on the fact that d.multicall2 returns a 2D array.
|
||||
// Depth 0: methodResponse/params/param/value/array/data
|
||||
// Depth 1: value (row) / array / data
|
||||
// Depth 2: value (col) / type (string/i8/i4)
|
||||
|
||||
// We can count <array> depth.
|
||||
|
||||
let mut array_depth = 0;
|
||||
|
||||
loop {
|
||||
match reader.read_event_into(&mut buf) {
|
||||
Ok(Event::Start(ref e)) => {
|
||||
match e.name().as_ref() {
|
||||
b"array" => array_depth += 1,
|
||||
b"value" => inside_value = true,
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
Ok(Event::End(ref e)) => {
|
||||
match e.name().as_ref() {
|
||||
b"array" => {
|
||||
array_depth -= 1;
|
||||
// If we just finished a row (depth 1 which means the inner array of the main list)
|
||||
if array_depth == 1 {
|
||||
if !current_row.is_empty() {
|
||||
results.push(current_row.clone());
|
||||
current_row.clear();
|
||||
}
|
||||
}
|
||||
},
|
||||
b"value" => {
|
||||
inside_value = false;
|
||||
// If we are at depth 2 (inside a column value)
|
||||
if array_depth == 2 && !current_text.is_empty() {
|
||||
current_row.push(current_text.clone());
|
||||
current_text.clear();
|
||||
} else if array_depth == 2 {
|
||||
// Empty value or non-text?
|
||||
// Sometimes values are empty, e.g. empty string
|
||||
// We should push it if we just closed a value at depth 2
|
||||
// But wait, the text event handles the content.
|
||||
// Logic: If we closed value at depth 2, we push the collected text (which might be empty).
|
||||
// To handle empty text correctly, we should clear text at Start(value) or use a flag.
|
||||
if inside_value == false { // we just closed it
|
||||
current_row.push(current_text.clone());
|
||||
current_text.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
Ok(Event::Text(e)) => {
|
||||
if inside_value && array_depth == 2 {
|
||||
current_text = e.unescape().unwrap().into_owned();
|
||||
}
|
||||
}
|
||||
Ok(Event::Eof) => break,
|
||||
Err(e) => return Err(format!("Parse error: {:?}", e)),
|
||||
_ => (),
|
||||
}
|
||||
buf.clear();
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
// Parse a simple string response from a method call
|
||||
// Expected: <methodResponse><params><param><value><string>RESULT</string></value></param></params></methodResponse>
|
||||
pub fn parse_string_response(xml: &str) -> Result<String, String> {
|
||||
let mut reader = Reader::from_str(xml);
|
||||
reader.trim_text(true);
|
||||
let mut buf = Vec::new();
|
||||
let mut result = String::new();
|
||||
let mut inside_string = false;
|
||||
|
||||
loop {
|
||||
match reader.read_event_into(&mut buf) {
|
||||
Ok(Event::Start(ref e)) => {
|
||||
if e.name().as_ref() == b"string" {
|
||||
inside_string = true;
|
||||
}
|
||||
}
|
||||
Ok(Event::Text(e)) => {
|
||||
if inside_string {
|
||||
result = e.unescape().unwrap().into_owned();
|
||||
}
|
||||
}
|
||||
Ok(Event::End(ref e)) => {
|
||||
if e.name().as_ref() == b"string" {
|
||||
inside_string = false;
|
||||
// Assuming only one string in the response which matters
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(Event::Eof) => break,
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
if result.is_empty() {
|
||||
// It might be empty string or we didn't find it.
|
||||
// If xml contains "fault", we should verify.
|
||||
if xml.contains("fault") {
|
||||
return Err("RPC Fault detected".to_string());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
Reference in New Issue
Block a user