perf: push bildirimleri paralel gönderim ve env var önbelleğe alma ile optimize edildi
Some checks failed
Build MIPS Binary / build (push) Has been cancelled
Some checks failed
Build MIPS Binary / build (push) Has been cancelled
This commit is contained in:
@@ -690,8 +690,10 @@ pub async fn handle_timeout_error(err: BoxError) -> (StatusCode, &'static str) {
|
|||||||
(status = 200, description = "VAPID public key", body = String)
|
(status = 200, description = "VAPID public key", body = String)
|
||||||
)
|
)
|
||||||
)]
|
)]
|
||||||
pub async fn get_push_public_key_handler() -> impl IntoResponse {
|
pub async fn get_push_public_key_handler(
|
||||||
let public_key = push::get_vapid_public_key();
|
State(state): State<AppState>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let public_key = state.push_store.get_public_key();
|
||||||
(StatusCode::OK, Json(serde_json::json!({ "publicKey": public_key }))).into_response()
|
(StatusCode::OK, Json(serde_json::json!({ "publicKey": public_key }))).into_response()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use utoipa::ToSchema;
|
|||||||
use web_push::{
|
use web_push::{
|
||||||
HyperWebPushClient, SubscriptionInfo, VapidSignatureBuilder, WebPushClient, WebPushMessageBuilder,
|
HyperWebPushClient, SubscriptionInfo, VapidSignatureBuilder, WebPushClient, WebPushMessageBuilder,
|
||||||
};
|
};
|
||||||
|
use futures::StreamExt;
|
||||||
|
|
||||||
use crate::db::Db;
|
use crate::db::Db;
|
||||||
|
|
||||||
@@ -20,17 +21,34 @@ pub struct PushKeys {
|
|||||||
pub auth: String,
|
pub auth: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct VapidConfig {
|
||||||
|
pub private_key: String,
|
||||||
|
pub public_key: String,
|
||||||
|
pub email: String,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct PushSubscriptionStore {
|
pub struct PushSubscriptionStore {
|
||||||
db: Option<Db>,
|
db: Option<Db>,
|
||||||
subscriptions: Arc<RwLock<Vec<PushSubscription>>>,
|
subscriptions: Arc<RwLock<Vec<PushSubscription>>>,
|
||||||
|
vapid_config: VapidConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PushSubscriptionStore {
|
impl PushSubscriptionStore {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
|
let private_key = std::env::var("VAPID_PRIVATE_KEY").expect("VAPID_PRIVATE_KEY must be set in .env");
|
||||||
|
let public_key = std::env::var("VAPID_PUBLIC_KEY").expect("VAPID_PUBLIC_KEY must be set in .env");
|
||||||
|
let email = std::env::var("VAPID_EMAIL").expect("VAPID_EMAIL must be set in .env");
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
db: None,
|
db: None,
|
||||||
subscriptions: Arc::new(RwLock::new(Vec::new())),
|
subscriptions: Arc::new(RwLock::new(Vec::new())),
|
||||||
|
vapid_config: VapidConfig {
|
||||||
|
private_key,
|
||||||
|
public_key,
|
||||||
|
email,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -47,9 +65,18 @@ impl PushSubscriptionStore {
|
|||||||
}
|
}
|
||||||
tracing::info!("Loaded {} push subscriptions from database", subscriptions_vec.len());
|
tracing::info!("Loaded {} push subscriptions from database", subscriptions_vec.len());
|
||||||
|
|
||||||
|
let private_key = std::env::var("VAPID_PRIVATE_KEY").expect("VAPID_PRIVATE_KEY must be set in .env");
|
||||||
|
let public_key = std::env::var("VAPID_PUBLIC_KEY").expect("VAPID_PUBLIC_KEY must be set in .env");
|
||||||
|
let email = std::env::var("VAPID_EMAIL").expect("VAPID_EMAIL must be set in .env");
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
db: Some(db.clone()),
|
db: Some(db.clone()),
|
||||||
subscriptions: Arc::new(RwLock::new(subscriptions_vec)),
|
subscriptions: Arc::new(RwLock::new(subscriptions_vec)),
|
||||||
|
vapid_config: VapidConfig {
|
||||||
|
private_key,
|
||||||
|
public_key,
|
||||||
|
email,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -91,6 +118,10 @@ impl PushSubscriptionStore {
|
|||||||
pub async fn get_all_subscriptions(&self) -> Vec<PushSubscription> {
|
pub async fn get_all_subscriptions(&self) -> Vec<PushSubscription> {
|
||||||
self.subscriptions.read().await.clone()
|
self.subscriptions.read().await.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_public_key(&self) -> &str {
|
||||||
|
&self.vapid_config.public_key
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send push notification to all subscribed clients
|
/// Send push notification to all subscribed clients
|
||||||
@@ -116,12 +147,18 @@ pub async fn send_push_notification(
|
|||||||
"tag": "vibetorrent"
|
"tag": "vibetorrent"
|
||||||
});
|
});
|
||||||
|
|
||||||
let client = HyperWebPushClient::new();
|
let client = Arc::new(HyperWebPushClient::new());
|
||||||
|
let vapid_config = store.vapid_config.clone();
|
||||||
|
let payload_str = payload.to_string();
|
||||||
|
|
||||||
let vapid_private_key = std::env::var("VAPID_PRIVATE_KEY").expect("VAPID_PRIVATE_KEY must be set in .env");
|
// Send notifications concurrently
|
||||||
let vapid_email = std::env::var("VAPID_EMAIL").expect("VAPID_EMAIL must be set in .env");
|
futures::stream::iter(subscriptions)
|
||||||
|
.for_each_concurrent(10, |subscription| {
|
||||||
|
let client = client.clone();
|
||||||
|
let vapid_config = vapid_config.clone();
|
||||||
|
let payload_str = payload_str.clone();
|
||||||
|
|
||||||
for subscription in subscriptions {
|
async move {
|
||||||
let subscription_info = SubscriptionInfo {
|
let subscription_info = SubscriptionInfo {
|
||||||
endpoint: subscription.endpoint.clone(),
|
endpoint: subscription.endpoint.clone(),
|
||||||
keys: web_push::SubscriptionKeys {
|
keys: web_push::SubscriptionKeys {
|
||||||
@@ -130,32 +167,45 @@ pub async fn send_push_notification(
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut sig_builder = VapidSignatureBuilder::from_base64(
|
let sig_res = VapidSignatureBuilder::from_base64(
|
||||||
&vapid_private_key,
|
&vapid_config.private_key,
|
||||||
web_push::URL_SAFE_NO_PAD,
|
web_push::URL_SAFE_NO_PAD,
|
||||||
&subscription_info,
|
&subscription_info,
|
||||||
)?;
|
);
|
||||||
|
|
||||||
sig_builder.add_claim("sub", vapid_email.as_str());
|
match sig_res {
|
||||||
|
Ok(mut sig_builder) => {
|
||||||
|
sig_builder.add_claim("sub", vapid_config.email.as_str());
|
||||||
sig_builder.add_claim("aud", subscription.endpoint.as_str());
|
sig_builder.add_claim("aud", subscription.endpoint.as_str());
|
||||||
let signature = sig_builder.build()?;
|
|
||||||
|
|
||||||
|
match sig_builder.build() {
|
||||||
|
Ok(signature) => {
|
||||||
let mut builder = WebPushMessageBuilder::new(&subscription_info);
|
let mut builder = WebPushMessageBuilder::new(&subscription_info);
|
||||||
builder.set_vapid_signature(signature);
|
builder.set_vapid_signature(signature);
|
||||||
|
|
||||||
let payload_str = payload.to_string();
|
|
||||||
builder.set_payload(web_push::ContentEncoding::Aes128Gcm, payload_str.as_bytes());
|
builder.set_payload(web_push::ContentEncoding::Aes128Gcm, payload_str.as_bytes());
|
||||||
|
|
||||||
match client.send(builder.build()?).await {
|
match builder.build() {
|
||||||
|
Ok(msg) => {
|
||||||
|
match client.send(msg).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
tracing::debug!("Push notification sent to: {}", subscription.endpoint);
|
tracing::debug!("Push notification sent to: {}", subscription.endpoint);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("Failed to send push notification: {}", e);
|
tracing::error!("Failed to send push notification to {}: {}", subscription.endpoint, e);
|
||||||
// TODO: Remove invalid subscriptions
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Err(e) => tracing::error!("Failed to build push message: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => tracing::error!("Failed to build VAPID signature: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => tracing::error!("Failed to create VAPID signature builder: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user