feat: initial commit

This commit is contained in:
DecDuck
2025-12-01 16:25:29 +11:00
commit bd2e44d4f0
9 changed files with 2588 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
/target
+2121
View File
File diff suppressed because it is too large Load Diff
+20
View File
@@ -0,0 +1,20 @@
[package]
name = "torrential"
version = "0.1.0"
edition = "2024"
[dependencies]
axum = "0.8.7"
log = "0.4.28"
reqwest = { version = "0.12.24", features = ["json"] }
serde = { version = "1.0.228", features = ["derive"] }
simple_logger = { version = "5.1.0", default-features = false, features = [
"colors",
] }
tokio = { version = "*", features = ["rt-multi-thread", "sync"] }
droplet-rs = { git = "https://github.com/Drop-OSS/droplet-rs.git" }
dashmap = "6.1.0"
anyhow = "1.0.100"
serde_json = "1.0.145"
url = { version = "2.5.7", default-features = false }
tokio-util = { version = "0.7.17", features = ["io"] }
+4
View File
@@ -0,0 +1,4 @@
# Torrential
A Rust webserver designed to hook into the Drop server and serve the content files, but at a lightning fast speed.
+52
View File
@@ -0,0 +1,52 @@
use std::{collections::HashMap, time::Instant};
use droplet_rs::versions::create_backend_constructor;
use log::info;
use reqwest::StatusCode;
use crate::{AppInitData, DownloadContext, remote::{LibraryBackend, fetch_download_context}, util::ErrorOption};
pub async fn create_download_context<'a>(
init_data: &AppInitData,
game_id: String,
version_name: String,
) -> Result<DownloadContext<'a>, ErrorOption> {
let context =
fetch_download_context(init_data.token.clone(), game_id, version_name.clone()).await?;
let (version_path, backend) = init_data
.libraries
.get(&context.library_id)
.ok_or(StatusCode::NOT_FOUND)?;
let version_path = version_path.join(context.library_path.clone());
let version_path = match backend {
LibraryBackend::Filesystem => version_path.join(version_name),
LibraryBackend::FlatFilesystem => version_path,
};
let backend =
create_backend_constructor(&version_path).ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
let backend = backend()?;
let mut chunk_lookup_table =
HashMap::with_capacity(context.manifest.values().map(|v| v.ids.len()).sum());
for (path, file_chunks) in context.manifest {
let mut start = 0;
for (chunk, length) in file_chunks.ids.into_iter().zip(file_chunks.lengths) {
chunk_lookup_table.insert(chunk, (path.clone(), start, start + length));
start += length;
}
}
let download_context = DownloadContext {
library_id: context.library_id,
library_path: context.library_path,
chunk_lookup_table,
backend,
last_access: Instant::now(),
};
Ok(download_context)
}
+224
View File
@@ -0,0 +1,224 @@
use anyhow::{Result, anyhow};
use dashmap::DashMap;
use droplet_rs::versions::types::{VersionBackend, VersionFile};
use reqwest::header;
use simple_logger::SimpleLogger;
use std::{
collections::HashMap,
env::set_current_dir,
path::PathBuf,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
use tokio_util::io::ReaderStream;
use axum::{
Json, Router,
body::Body,
extract::{Path, State},
http::StatusCode,
response::{AppendHeaders, IntoResponse},
routing::{get, post},
};
use log::{error, info, warn};
use serde::Deserialize;
use tokio::{
sync::{OnceCell, Semaphore},
time::sleep,
};
use crate::{
download::create_download_context,
remote::{LibraryBackend, fetch_library_sources},
};
mod download;
mod manifest;
mod remote;
mod util;
static GLOBAL_CONTEXT_SEMAPHORE: Semaphore = Semaphore::const_new(1);
struct DownloadContext<'a> {
library_id: String,
library_path: String,
chunk_lookup_table: HashMap<String, (String, usize, usize)>,
backend: Box<dyn VersionBackend + Send + Sync + 'a>,
last_access: Instant,
}
#[derive(Debug)]
struct AppInitData {
token: String,
libraries: HashMap<String, (PathBuf, LibraryBackend)>,
}
struct AppState<'a> {
token: OnceCell<AppInitData>,
context_cache: DashMap<(String, String), DownloadContext<'a>>,
working_context_cache: DashMap<(String, String), Semaphore>,
}
async fn serve_file(
State(state): State<Arc<AppState<'_>>>,
Path((game_id, version_name, chunk_id)): Path<(String, String, String)>,
) -> Result<impl IntoResponse, StatusCode> {
let init_data = state.token.get().ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
let key = (game_id.clone(), version_name.clone());
let mut context = if let Some(context) = state.context_cache.get_mut(&key) {
context
} else {
let permit = GLOBAL_CONTEXT_SEMAPHORE
.acquire()
.await
.expect("failed to acquire semaphore");
// Check if it's been done while we've been sitting here
if let Some(already_done) = state.context_cache.get_mut(&key) {
already_done
} else {
info!("generating context...");
let context_result =
create_download_context(init_data, game_id.clone(), version_name.clone()).await;
info!("cleaned up semaphore");
let new_context = context_result.inspect_err(|v| warn!("{:?}", v))?;
state.context_cache.insert(key.clone(), new_context);
info!("continuing download");
drop(permit);
state.context_cache.get_mut(&key).unwrap()
}
};
context.last_access = Instant::now();
let (relative_filename, start, end) = context
.chunk_lookup_table
.get(&chunk_id)
.cloned()
.ok_or(StatusCode::NOT_FOUND)?;
let reader = context
.backend
.reader(
&VersionFile {
relative_filename: relative_filename.to_string(),
permission: 0,
size: 0,
},
start.try_into().unwrap(),
end.try_into().unwrap(),
)
.await
.map_err(|v| {
error!("reader error: {v:?}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let stream = ReaderStream::new(reader);
let body = Body::from_stream(stream);
let headers: AppendHeaders<[(header::HeaderName, String); 2]> = AppendHeaders([
(header::CONTENT_TYPE, "application/octet-stream".to_owned()),
(header::CONTENT_LENGTH, (end - start).to_string()),
]);
Ok((headers, body))
}
#[derive(Deserialize)]
struct TokenPayload {
token: String,
}
async fn set_token(
State(state): State<Arc<AppState<'_>>>,
Json(payload): Json<TokenPayload>,
) -> Result<StatusCode, StatusCode> {
if let Some(existing_data) = state.token.get() {
if existing_data.token != payload.token {
panic!("already set up but provided with a different token");
}
return Ok(StatusCode::OK);
}
let token = payload.token;
let library_sources = fetch_library_sources(token.clone()).await.map_err(|v| {
error!("{v:?}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let valid_library_sources = library_sources
.into_iter()
.filter(|v| match v.backend {
remote::LibraryBackend::Filesystem | remote::LibraryBackend::FlatFilesystem => true,
#[allow(unreachable_patterns)]
_ => false,
})
.map(|v| {
let path = PathBuf::from_str(
v.options
.as_object()
.unwrap()
.get("baseDir")
.unwrap()
.as_str()
.unwrap(),
)
.unwrap();
(v.id, (path, v.backend))
})
.collect::<HashMap<String, (PathBuf, LibraryBackend)>>();
state
.token
.set(AppInitData {
token,
libraries: valid_library_sources,
})
.map_err(|err| {
error!("failed to set token: {err:?}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
info!("connected to drop server successfully");
Ok(StatusCode::OK)
}
#[tokio::main]
async fn main() {
SimpleLogger::new()
.with_level(log::LevelFilter::Info)
.init()
.unwrap();
if let Ok(working_directory) = std::env::var("WORKING_DIRECTORY") {
set_current_dir(working_directory).expect("failed to change working directory");
}
let shared_state = Arc::new(AppState {
token: OnceCell::new(),
context_cache: DashMap::new(),
working_context_cache: DashMap::new(),
});
let app = Router::new()
.route(
"/api/v1/depot/{game_id}/{version_name}/{chunk_id}",
get(serve_file),
)
.route("/token", post(set_token))
.with_state(shared_state);
// run our app with hyper, listening globally on port 3000
let listener = tokio::net::TcpListener::bind("0.0.0.0:5000").await.unwrap();
info!("started depot server");
axum::serve(listener, app).await.unwrap();
}
+13
View File
@@ -0,0 +1,13 @@
use std::collections::HashMap;
use serde::Deserialize;
#[derive(Deserialize)]
pub struct DropChunk {
pub permissions: usize,
pub ids: Vec<String>,
pub checksums: Vec<String>,
pub lengths: Vec<usize>,
}
pub type DropletManifest = HashMap<String, DropChunk>;
+116
View File
@@ -0,0 +1,116 @@
use std::{env, sync::LazyLock};
use anyhow::{Result, anyhow};
use log::info;
use reqwest::{Client, ClientBuilder, StatusCode, Url};
use serde::{Deserialize, Serialize};
use crate::{manifest::DropletManifest, util::ErrorOption};
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ContextResponseBody {
timeout: String,
pub manifest: DropletManifest,
version_name: String,
pub library_id: String,
pub library_path: String,
}
#[derive(Serialize)]
pub struct ContextQuery {
game: String,
version: String,
}
static CLIENT: LazyLock<Client> = LazyLock::new(|| {
ClientBuilder::new()
.build()
.expect("failed to build client")
});
static REMOTE_URL: LazyLock<Url> = LazyLock::new(|| {
let user_provided = env::var("DROP_SERVER_URL");
let url = Url::parse(
user_provided
.as_ref()
.map_or("http://localhost:3000", |v| v),
)
.expect("failed to parse URL");
info!("using Drop server url {}", url);
url
});
pub async fn fetch_download_context(
token: String,
game_id: String,
version_name: String,
) -> Result<ContextResponseBody, ErrorOption> {
let context_response = CLIENT
.get(REMOTE_URL.join("/api/v1/admin/depot/context")?)
.query(&ContextQuery {
game: game_id,
version: version_name,
})
.header("Authorization", format!("Bearer {}", token))
.send()
.await?;
if !context_response.status().is_success() {
if context_response.status() == StatusCode::BAD_REQUEST {
return Err(StatusCode::NOT_FOUND.into());
}
return Err(anyhow!(
"Fetching context failed with non-success code: {}, {}",
context_response.status(),
context_response
.text()
.await
.unwrap_or("(failed to read body)".to_string())
).into());
}
let context: ContextResponseBody = context_response.json().await?;
Ok(context)
}
#[derive(Deserialize, Debug)]
#[non_exhaustive]
pub enum LibraryBackend {
Filesystem,
FlatFilesystem
}
#[derive(Deserialize)]
pub struct LibrarySource {
pub options: serde_json::Value,
pub id: String,
name: String,
pub backend: LibraryBackend
}
pub async fn fetch_library_sources(token: String) -> Result<Vec<LibrarySource>> {
let source_response = CLIENT
.get(REMOTE_URL.join("/api/v1/admin/library/sources")?)
.header("Authorization", format!("Bearer {}", token))
.send()
.await?;
if !source_response.status().is_success() {
return Err(anyhow!(
"Fetching library sources failed with non-success code: {}, {}",
source_response.status(),
source_response
.text()
.await
.unwrap_or("(failed to read body)".to_string())
));
}
let library_sources: Vec<LibrarySource> = source_response.json().await?;
Ok(library_sources)
}
+37
View File
@@ -0,0 +1,37 @@
use log::error;
use reqwest::{StatusCode, Url};
#[derive(Debug)]
pub struct ErrorOption(Result<StatusCode, anyhow::Error>);
impl From<anyhow::Error> for ErrorOption {
fn from(value: anyhow::Error) -> Self {
Self(Err(value))
}
}
impl From<reqwest::Error> for ErrorOption {
fn from(value: reqwest::Error) -> Self {
Self(Err(value.into()))
}
}
impl From<url::ParseError> for ErrorOption {
fn from(value: url::ParseError) -> Self {
Self(Err(value.into()))
}
}
impl From<StatusCode> for ErrorOption {
fn from(value: StatusCode) -> Self {
Self(Ok(value))
}
}
impl From<ErrorOption> for StatusCode {
fn from(value: ErrorOption) -> Self {
match value.0 {
Ok(status) => status,
Err(err) => {
error!("{err:?}");
StatusCode::INTERNAL_SERVER_ERROR
}
}
}
}