From b812543a4c3d2305aab85dffd34d1c7b6803df1c Mon Sep 17 00:00:00 2001 From: DecDuck Date: Sat, 20 Dec 2025 20:09:53 +1100 Subject: [PATCH] feat: open file semaphore --- torrential/Cargo.lock | 20 +++++++++++++ torrential/Cargo.toml | 2 ++ torrential/src/main.rs | 32 ++++++++++++++++++++- torrential/src/serve.rs | 64 +++++++++++++++++++++++++++++++++++++---- torrential/src/state.rs | 4 +-- 5 files changed, 113 insertions(+), 9 deletions(-) diff --git a/torrential/Cargo.lock b/torrential/Cargo.lock index dab0e33b..b51b484a 100644 --- a/torrential/Cargo.lock +++ b/torrential/Cargo.lock @@ -547,6 +547,15 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "file_open_limit" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2a1c2ca7813e68cf8e8aa5fbfe64016332355b70495d57e69df4058fd08cdbb" +dependencies = [ + "rlimit", +] + [[package]] name = "find-msvc-tools" version = "0.1.5" @@ -1475,6 +1484,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rlimit" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7278a1ec8bfd4a4e07515c589f5ff7b309a373f987393aef44813d9dcf87aa3" +dependencies = [ + "libc", +] + [[package]] name = "rustc-hash" version = "2.1.1" @@ -1915,8 +1933,10 @@ dependencies = [ "ctr", "dashmap", "droplet-rs", + "file_open_limit", "futures-util", "log", + "pin-project-lite", "rand", "reqwest", "serde", diff --git a/torrential/Cargo.toml b/torrential/Cargo.toml index 8ae8aba4..ad4ea957 100644 --- a/torrential/Cargo.toml +++ b/torrential/Cargo.toml @@ -34,6 +34,8 @@ futures-util = "0.3.31" ctr = "0.9.2" aes = "0.8.4" bytes = "*" +file_open_limit = "0.0.5" +pin-project-lite = "0.2.16" [lints.clippy] pedantic = { level = "warn", priority = -1 } diff --git a/torrential/src/main.rs b/torrential/src/main.rs index 14fd39c3..4fd95e86 100644 --- a/torrential/src/main.rs +++ b/torrential/src/main.rs @@ -1,6 +1,7 @@ use std::{ env::{self, set_current_dir}, sync::Arc, + time::{Duration, Instant}, }; use axum::{ @@ -10,10 +11,12 @@ use axum::{ use dashmap::DashMap; use log::info; use simple_logger::SimpleLogger; -use tokio::{runtime::Handle, sync::OnceCell}; +use tokio::{runtime::Handle, spawn, sync::OnceCell, time}; use torrential::{handlers, serve, set_token, state::AppState}; use url::Url; +const CONTEXT_TTL: u64 = 10 * 60; + #[tokio::main] async fn main() { initialise_logger(); @@ -31,6 +34,33 @@ async fn main() { context_cache: DashMap::new(), }); + let interval_shared_state = shared_state.clone(); + + spawn(async move { + let shared_state = interval_shared_state; + let mut interval = time::interval(Duration::from_mins(1)); + + loop { + interval.tick().await; + let keys = shared_state + .context_cache + .iter() + .map(|v| v.key().clone()) + .collect::>(); + for key in keys { + let last_access = if let Some(context) = shared_state.context_cache.get(&key) { + context.last_access() + } else { + Instant::now() + }; + if last_access.elapsed().as_secs() >= CONTEXT_TTL { + shared_state.context_cache.remove(&key); + info!("cleaned context: {:?}", key); + } + } + } + }); + let app = setup_app(shared_state); serve(app).await.unwrap(); diff --git a/torrential/src/serve.rs b/torrential/src/serve.rs index 20bf3b5a..348038e5 100644 --- a/torrential/src/serve.rs +++ b/torrential/src/serve.rs @@ -1,4 +1,9 @@ -use std::{io::Error, rc::Rc, sync::Arc}; +use std::{ + cell::{LazyCell, OnceCell}, + io::Error, + rc::Rc, + sync::{Arc, LazyLock}, +}; use aes::cipher::{KeyIvInit, StreamCipher}; use axum::{ @@ -13,10 +18,11 @@ use droplet_rs::{ manifest::ChunkData, versions::types::{MinimumFileObject, VersionFile}, }; -use futures_util::{StreamExt, stream}; +use futures_util::{Stream, StreamExt, stream}; use log::{error, info}; -use reqwest::{StatusCode, header}; -use tokio::sync::SemaphorePermit; +use pin_project_lite::pin_project; +use reqwest::StatusCode; +use tokio::sync::{Semaphore, SemaphorePermit}; use tokio_util::io::ReaderStream; use crate::{ @@ -25,6 +31,44 @@ use crate::{ type Aes128Ctr64LE = ctr::Ctr64LE; +pin_project! { + struct SemaphoreStream<'a, T> + where T: Stream + { + #[pin] + stream: T, + semaphore: SemaphorePermit<'a>, + } +} + +impl<'a, T: Stream> SemaphoreStream<'a, T> { + fn new(stream: T, permit: SemaphorePermit<'a>) -> Self { + Self { + stream, + semaphore: permit, + } + } +} + +impl<'a, T: Stream> Stream for SemaphoreStream<'a, T> +where + T: Stream, +{ + type Item = T::Item; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.project(); + this.stream.poll_next(cx) + } +} + +static SEMPAHORE_COUNT: LazyLock = + LazyLock::new(|| file_open_limit::get().expect("failed to count max open files")); +static FILE_SEMAPHORE: LazyLock = LazyLock::new(|| Semaphore::new(*SEMPAHORE_COUNT)); + pub async fn serve_file( State(state): State>, Path((game_id, version_name, chunk_id)): Path<(String, String, String)>, @@ -35,6 +79,13 @@ pub async fn serve_file( context.reset_last_access(); let chunk_data = lookup_chunk(&chunk_id, &context)?; + if chunk_data.files.len() >= *SEMPAHORE_COUNT { + return Err(StatusCode::INSUFFICIENT_STORAGE); + } + let permit = FILE_SEMAPHORE + .acquire_many(chunk_data.files.len().try_into().unwrap()) + .await + .map_err(|_| StatusCode::INSUFFICIENT_STORAGE)?; let mut streams = Vec::with_capacity(chunk_data.files.len()); let mut content_length = 0; @@ -54,7 +105,7 @@ pub async fn serve_file( let stream = stream::iter(streams).flatten(); let mut cipher = Aes128Ctr64LE::new(&context.manifest.key.into(), &chunk_data.iv.into()); - let encrypted_stream = stream.chunks(3).map(move |raw| -> Result { + let encrypted_stream = stream.chunks(16).map(move |raw| -> Result { let data: Result, Error> = raw.into_iter().collect(); let mut data = data?.concat(); @@ -62,7 +113,8 @@ pub async fn serve_file( Ok(data.into()) }); - let body: Body = Body::from_stream(encrypted_stream); + let permit_stream = SemaphoreStream::new(encrypted_stream, permit); + let body: Body = Body::from_stream(permit_stream); let mut headers = HeaderMap::new(); headers.insert("Content-Type", "application/octet-stream".parse().unwrap()); diff --git a/torrential/src/state.rs b/torrential/src/state.rs index 3cfd35c7..131239f5 100644 --- a/torrential/src/state.rs +++ b/torrential/src/state.rs @@ -1,7 +1,7 @@ -use std::{collections::HashMap, path::PathBuf}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; use dashmap::DashMap; -use tokio::sync::OnceCell; +use tokio::sync::{OnceCell, Semaphore}; use crate:: DownloadContext