feat: open file semaphore
This commit is contained in:
Generated
+20
@@ -547,6 +547,15 @@ version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
|
||||
|
||||
[[package]]
|
||||
name = "file_open_limit"
|
||||
version = "0.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2a1c2ca7813e68cf8e8aa5fbfe64016332355b70495d57e69df4058fd08cdbb"
|
||||
dependencies = [
|
||||
"rlimit",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "find-msvc-tools"
|
||||
version = "0.1.5"
|
||||
@@ -1475,6 +1484,15 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rlimit"
|
||||
version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f7278a1ec8bfd4a4e07515c589f5ff7b309a373f987393aef44813d9dcf87aa3"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "2.1.1"
|
||||
@@ -1915,8 +1933,10 @@ dependencies = [
|
||||
"ctr",
|
||||
"dashmap",
|
||||
"droplet-rs",
|
||||
"file_open_limit",
|
||||
"futures-util",
|
||||
"log",
|
||||
"pin-project-lite",
|
||||
"rand",
|
||||
"reqwest",
|
||||
"serde",
|
||||
|
||||
@@ -34,6 +34,8 @@ futures-util = "0.3.31"
|
||||
ctr = "0.9.2"
|
||||
aes = "0.8.4"
|
||||
bytes = "*"
|
||||
file_open_limit = "0.0.5"
|
||||
pin-project-lite = "0.2.16"
|
||||
|
||||
[lints.clippy]
|
||||
pedantic = { level = "warn", priority = -1 }
|
||||
|
||||
+31
-1
@@ -1,6 +1,7 @@
|
||||
use std::{
|
||||
env::{self, set_current_dir},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use axum::{
|
||||
@@ -10,10 +11,12 @@ use axum::{
|
||||
use dashmap::DashMap;
|
||||
use log::info;
|
||||
use simple_logger::SimpleLogger;
|
||||
use tokio::{runtime::Handle, sync::OnceCell};
|
||||
use tokio::{runtime::Handle, spawn, sync::OnceCell, time};
|
||||
use torrential::{handlers, serve, set_token, state::AppState};
|
||||
use url::Url;
|
||||
|
||||
const CONTEXT_TTL: u64 = 10 * 60;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
initialise_logger();
|
||||
@@ -31,6 +34,33 @@ async fn main() {
|
||||
context_cache: DashMap::new(),
|
||||
});
|
||||
|
||||
let interval_shared_state = shared_state.clone();
|
||||
|
||||
spawn(async move {
|
||||
let shared_state = interval_shared_state;
|
||||
let mut interval = time::interval(Duration::from_mins(1));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let keys = shared_state
|
||||
.context_cache
|
||||
.iter()
|
||||
.map(|v| v.key().clone())
|
||||
.collect::<Vec<(String, String)>>();
|
||||
for key in keys {
|
||||
let last_access = if let Some(context) = shared_state.context_cache.get(&key) {
|
||||
context.last_access()
|
||||
} else {
|
||||
Instant::now()
|
||||
};
|
||||
if last_access.elapsed().as_secs() >= CONTEXT_TTL {
|
||||
shared_state.context_cache.remove(&key);
|
||||
info!("cleaned context: {:?}", key);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let app = setup_app(shared_state);
|
||||
|
||||
serve(app).await.unwrap();
|
||||
|
||||
+58
-6
@@ -1,4 +1,9 @@
|
||||
use std::{io::Error, rc::Rc, sync::Arc};
|
||||
use std::{
|
||||
cell::{LazyCell, OnceCell},
|
||||
io::Error,
|
||||
rc::Rc,
|
||||
sync::{Arc, LazyLock},
|
||||
};
|
||||
|
||||
use aes::cipher::{KeyIvInit, StreamCipher};
|
||||
use axum::{
|
||||
@@ -13,10 +18,11 @@ use droplet_rs::{
|
||||
manifest::ChunkData,
|
||||
versions::types::{MinimumFileObject, VersionFile},
|
||||
};
|
||||
use futures_util::{StreamExt, stream};
|
||||
use futures_util::{Stream, StreamExt, stream};
|
||||
use log::{error, info};
|
||||
use reqwest::{StatusCode, header};
|
||||
use tokio::sync::SemaphorePermit;
|
||||
use pin_project_lite::pin_project;
|
||||
use reqwest::StatusCode;
|
||||
use tokio::sync::{Semaphore, SemaphorePermit};
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
use crate::{
|
||||
@@ -25,6 +31,44 @@ use crate::{
|
||||
|
||||
type Aes128Ctr64LE = ctr::Ctr64LE<aes::Aes128>;
|
||||
|
||||
pin_project! {
|
||||
struct SemaphoreStream<'a, T>
|
||||
where T: Stream
|
||||
{
|
||||
#[pin]
|
||||
stream: T,
|
||||
semaphore: SemaphorePermit<'a>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Stream> SemaphoreStream<'a, T> {
|
||||
fn new(stream: T, permit: SemaphorePermit<'a>) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
semaphore: permit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Stream> Stream for SemaphoreStream<'a, T>
|
||||
where
|
||||
T: Stream,
|
||||
{
|
||||
type Item = T::Item;
|
||||
|
||||
fn poll_next(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
this.stream.poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
static SEMPAHORE_COUNT: LazyLock<usize> =
|
||||
LazyLock::new(|| file_open_limit::get().expect("failed to count max open files"));
|
||||
static FILE_SEMAPHORE: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(*SEMPAHORE_COUNT));
|
||||
|
||||
pub async fn serve_file(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path((game_id, version_name, chunk_id)): Path<(String, String, String)>,
|
||||
@@ -35,6 +79,13 @@ pub async fn serve_file(
|
||||
context.reset_last_access();
|
||||
|
||||
let chunk_data = lookup_chunk(&chunk_id, &context)?;
|
||||
if chunk_data.files.len() >= *SEMPAHORE_COUNT {
|
||||
return Err(StatusCode::INSUFFICIENT_STORAGE);
|
||||
}
|
||||
let permit = FILE_SEMAPHORE
|
||||
.acquire_many(chunk_data.files.len().try_into().unwrap())
|
||||
.await
|
||||
.map_err(|_| StatusCode::INSUFFICIENT_STORAGE)?;
|
||||
let mut streams = Vec::with_capacity(chunk_data.files.len());
|
||||
let mut content_length = 0;
|
||||
|
||||
@@ -54,7 +105,7 @@ pub async fn serve_file(
|
||||
|
||||
let stream = stream::iter(streams).flatten();
|
||||
let mut cipher = Aes128Ctr64LE::new(&context.manifest.key.into(), &chunk_data.iv.into());
|
||||
let encrypted_stream = stream.chunks(3).map(move |raw| -> Result<Bytes, Error> {
|
||||
let encrypted_stream = stream.chunks(16).map(move |raw| -> Result<Bytes, Error> {
|
||||
let data: Result<Vec<Bytes>, Error> = raw.into_iter().collect();
|
||||
let mut data = data?.concat();
|
||||
|
||||
@@ -62,7 +113,8 @@ pub async fn serve_file(
|
||||
|
||||
Ok(data.into())
|
||||
});
|
||||
let body: Body = Body::from_stream(encrypted_stream);
|
||||
let permit_stream = SemaphoreStream::new(encrypted_stream, permit);
|
||||
let body: Body = Body::from_stream(permit_stream);
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{collections::HashMap, path::PathBuf};
|
||||
use std::{collections::HashMap, path::PathBuf, sync::Arc};
|
||||
|
||||
use dashmap::DashMap;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::sync::{OnceCell, Semaphore};
|
||||
|
||||
use crate::
|
||||
DownloadContext
|
||||
|
||||
Reference in New Issue
Block a user