feat: manifest generation

This commit is contained in:
DecDuck
2026-02-05 12:00:25 +11:00
parent aa46a88957
commit 55b76a6529
16 changed files with 321 additions and 64 deletions
+19 -4
View File
@@ -155,6 +155,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425"
dependencies = [
"axum-core",
"axum-macros",
"bytes",
"form_urlencoded",
"futures-util",
@@ -200,6 +201,17 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-macros"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "base64"
version = "0.22.1"
@@ -544,8 +556,9 @@ dependencies = [
[[package]]
name = "droplet-rs"
version = "0.12.2"
source = "git+https://github.com/Drop-OSS/droplet-rs.git#05f7027b362fcc72b7cc621df8b5b0850b6cf082"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef23d187f7153bc3ce1c6a5440be1206e2aaf42b153825ddf2f8151f278539c5"
dependencies = [
"anyhow",
"async-trait",
@@ -2108,6 +2121,7 @@ dependencies = [
"file_open_limit",
"futures-util",
"log",
"num_cpus",
"pin-project-lite",
"protobuf",
"protobuf-codegen",
@@ -2120,6 +2134,7 @@ dependencies = [
"tokio",
"tokio-util",
"url",
"uuid",
"waitmap",
]
@@ -2233,9 +2248,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "uuid"
version = "1.19.0"
version = "1.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a"
checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f"
dependencies = [
"getrandom 0.3.4",
"js-sys",
+4 -2
View File
@@ -12,7 +12,7 @@ name = "torrential"
path = "src/main.rs"
[dependencies]
axum = "0.8.7"
axum = { version = "0.8.7", features = ["macros"] }
log = "0.4.28"
reqwest = { version = "0.12.24", default-features = false, features = [
"json",
@@ -23,7 +23,7 @@ simple_logger = { version = "5.1.0", default-features = false, features = [
"colors",
] }
tokio = { version = "*", features = ["rt-multi-thread", "sync"] }
droplet-rs = { git="https://github.com/Drop-OSS/droplet-rs.git" }
droplet-rs = "0.15.1"
dashmap = "6.1.0"
anyhow = "1.0.100"
serde_json = "1.0.145"
@@ -38,6 +38,8 @@ file_open_limit = "0.0.5"
pin-project-lite = "0.2.16"
protobuf = "3.7.2"
waitmap = "1.1.0"
uuid = { version = "1.20.0", features = ["v4"] }
num_cpus = "1.17.0"
[lints.clippy]
pedantic = { level = "warn", priority = -1 }
+12 -6
View File
@@ -1,24 +1,30 @@
syntax = "proto3";
enum ResponseType {
enum TorrentialBoundType {
ERROR = 0;
SERVER_GAMES_RESPONSE = 1;
VERSION_RESPONSE = 2;
GENERATE_MANIFEST = 3;
}
message Response {
message TorrentialBound {
string message_id = 1;
ResponseType type = 2;
TorrentialBoundType type = 2;
bytes data = 3;
}
enum QueryType {
enum DropBoundType {
SERVER_GAMES_QUERY = 0;
VERSION_QUERY = 1;
MANIFEST_PROGRESS = 2;
MANIFEST_LOG = 3;
MANIFEST_COMPLETE = 4;
MANIFEST_ERROR = 5;
}
message Query {
message DropBound {
string message_id = 1;
QueryType type = 2;
DropBoundType type = 2;
bytes data = 3;
}
+37
View File
@@ -0,0 +1,37 @@
syntax = "proto3";
/// Certificates
message RootCertQuery {}
message RootCertResponse {
string cert = 1;
string priv = 2;
}
message ClientCertQuery {
string client_id = 1;
string client_name = 2;
string root_cert = 3;
string root_priv = 4;
}
message ClientCertResponse {
string cert = 1;
string priv = 2;
}
/// Manifest generation
message GenerateManifest {
string version_dir = 1;
}
message ManifestProgress {
float progress = 1;
}
message ManifestLog {
string log_line = 1;
}
message ManifestComplete {
string manifest = 1;
}
message ManifestError {
string error = 1;
}
+1 -1
View File
@@ -1,4 +1,4 @@
use crate::proto::{self, version::version_response::Manifest};
use crate::proto::version::version_response::Manifest;
fn fixed_length<T, const N: usize>(v: Vec<T>) -> [T; N] {
v.try_into()
@@ -1,9 +1,6 @@
use std::{
collections::HashMap,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
sync::Arc,
task::Poll,
};
@@ -24,7 +21,7 @@ use tokio_util::io::ReaderStream;
use crate::{server::download::fetch_instance_games, state::AppState};
pub async fn healthcheck(State(state): State<Arc<AppState>>) -> StatusCode {
pub async fn healthcheck() -> StatusCode {
StatusCode::OK
}
@@ -60,7 +57,7 @@ const ZERO: [u8; 1024] = [0u8; _];
impl AsyncRead for SpeedtestStream {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
_cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if self.remaining > 0 {
+3
View File
@@ -0,0 +1,3 @@
pub mod handlers;
pub mod serve;
pub mod download;
@@ -1,7 +1,5 @@
use std::{
cell::{LazyCell, OnceCell},
io::Error,
rc::Rc,
sync::{Arc, LazyLock},
};
@@ -10,7 +8,7 @@ use axum::{
body::Body,
extract::{Path, State},
http::HeaderMap,
response::{AppendHeaders, IntoResponse},
response::IntoResponse,
};
use bytes::Bytes;
use dashmap::{DashMap, mapref::one::RefMut};
@@ -26,7 +24,8 @@ use tokio::sync::{Semaphore, SemaphorePermit};
use tokio_util::io::ReaderStream;
use crate::{
DownloadContext, GLOBAL_CONTEXT_SEMAPHORE, download::create_download_context, state::AppState,
DownloadContext, GLOBAL_CONTEXT_SEMAPHORE, downloads::download::create_download_context,
state::AppState,
};
type Aes128Ctr64LE = ctr::Ctr64LE<aes::Aes128>;
View File
+105
View File
@@ -0,0 +1,105 @@
use std::{
path::PathBuf,
sync::{Arc, LazyLock},
};
use log::{info, warn};
use protobuf::Message;
use serde_json::json;
use tokio::{spawn, sync::Semaphore};
use crate::{
droplet,
proto::{
core::{DropBoundType, TorrentialBound},
droplet::{
GenerateManifest, ManifestComplete, ManifestError, ManifestLog, ManifestProgress,
},
},
server::DropServer,
};
static READER_SEMAPHORE: LazyLock<Semaphore> = LazyLock::new(|| {
let cores = num_cpus::get();
Semaphore::new(cores)
});
async fn generate_manifest_raw(
server: Arc<DropServer>,
message: TorrentialBound,
) -> Result<(), anyhow::Error> {
let manifest_message = GenerateManifest::parse_from_bytes(&message.data)?;
info!(
"seven zip install: {}",
*droplet_rs::versions::backends::SEVEN_ZIP_INSTALLED
);
let manifest = droplet_rs::manifest::generate_manifest_rusty(
&PathBuf::from(manifest_message.version_dir),
|progress| {
let mut progress_message = ManifestProgress::new();
progress_message.progress = progress;
let server = server.clone();
let message_id = message.message_id.clone();
spawn(async move {
let _ = server
.send_message(
DropBoundType::MANIFEST_PROGRESS,
progress_message,
Some(message_id),
)
.await;
});
},
|log_line| {
let mut progress_log = ManifestLog::new();
progress_log.log_line = log_line;
let server = server.clone();
let message_id = message.message_id.clone();
spawn(async move {
let _ = server
.send_message(DropBoundType::MANIFEST_LOG, progress_log, Some(message_id))
.await;
});
},
Some(&READER_SEMAPHORE),
)
.await?;
let mut manifest_complete = ManifestComplete::new();
manifest_complete.manifest = json!(manifest).to_string();
server
.send_message(
DropBoundType::MANIFEST_COMPLETE,
manifest_complete,
Some(message.message_id),
)
.await?;
Ok(())
}
pub async fn generate_manifest(server: Arc<DropServer>, message: TorrentialBound) {
let message_id = message.message_id.clone();
warn!("generating manifest...");
let result = generate_manifest_raw(server.clone(), message).await;
info!("manifest generation exited");
if let Err(err) = result {
warn!("manifest generation failed with err: {:?}", err);
let mut manifest_err = ManifestError::new();
manifest_err.error = err.to_string();
let _ = server
.send_message(
DropBoundType::MANIFEST_ERROR,
manifest_err,
Some(message_id),
)
.await
.inspect_err(|err| {
warn!("failed to send manifest err: {err:?}");
});
}
}
+2
View File
@@ -0,0 +1,2 @@
pub mod manifest;
pub mod cert;
+3 -4
View File
@@ -1,13 +1,12 @@
use tokio::sync::Semaphore;
pub mod download;
pub mod serve;
pub mod handlers;
pub mod downloads;
pub mod state;
pub mod util;
pub mod proto;
pub mod conversions;
pub mod server;
pub mod droplet;
pub use download::DownloadContext;
pub use downloads::download::DownloadContext;
static GLOBAL_CONTEXT_SEMAPHORE: Semaphore = Semaphore::const_new(1);
+8 -2
View File
@@ -12,7 +12,11 @@ use dashmap::DashMap;
use log::info;
use simple_logger::SimpleLogger;
use tokio::{runtime::Handle, spawn, time};
use torrential::{handlers, serve, server::create_drop_server, state::AppState};
use torrential::{
downloads::{handlers, serve},
server::create_drop_server,
state::AppState,
};
const CONTEXT_TTL: u64 = 10 * 60;
@@ -28,7 +32,9 @@ async fn main() {
let metrics = Handle::current().metrics();
info!("using {} threads", metrics.num_workers());
let server = create_drop_server().await.expect("failed to connect to drop server");
let server = create_drop_server()
.await
.expect("failed to connect to drop server");
let shared_state = Arc::new(AppState {
context_cache: DashMap::new(),
+29 -7
View File
@@ -1,15 +1,37 @@
use crate::{proto::{manifest::server_games_response::SkeletonGame, version::VersionResponse}, state::{AppState}, util::ErrorOption};
use crate::{
proto::{
core::DropBoundType,
manifest::{ServerGamesQuery, ServerGamesResponse, server_games_response::SkeletonGame},
version::{VersionQuery, VersionResponse},
},
state::AppState,
util::ErrorOption,
};
pub async fn fetch_version_data(
app_state: &AppState,
game_id: String,
_game_id: String,
version_id: String,
) -> Result<VersionResponse, ErrorOption> {
unreachable!();
let mut query = VersionQuery::new();
query.version_id = version_id;
let message_id = app_state
.server
.send_message(DropBoundType::VERSION_QUERY, query, None)
.await?;
let response: VersionResponse = app_state.server.wait_for_message_id(&message_id).await?;
Ok(response)
}
pub async fn fetch_instance_games(
app_state: &AppState,
) -> Result<Vec<SkeletonGame>, ErrorOption> {
unreachable!()
pub async fn fetch_instance_games(app_state: &AppState) -> Result<Vec<SkeletonGame>, ErrorOption> {
let message_id = app_state
.server
.send_message(DropBoundType::SERVER_GAMES_QUERY, ServerGamesQuery::new(), None)
.await?;
let response: ServerGamesResponse = app_state.server.wait_for_message_id(&message_id).await?;
return Ok(response.games);
}
+92 -28
View File
@@ -1,7 +1,8 @@
use std::sync::{Arc, Mutex};
use std::{mem, sync::Arc};
use anyhow::anyhow;
use protobuf::Message;
use log::{info, warn};
use protobuf::{EnumOrUnknown, Message};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt as _, BufReader},
net::{
@@ -9,57 +10,102 @@ use tokio::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
},
spawn,
sync::Mutex,
};
use waitmap::WaitMap;
use crate::proto::core::Response;
use crate::{
droplet::manifest::generate_manifest,
proto::core::{DropBound, DropBoundType, TorrentialBound, TorrentialBoundType},
};
pub mod download;
pub struct DropServer {
server: TcpListener,
write_stream: Mutex<OwnedWriteHalf>,
waitmap: WaitMap<String, Response>,
waitmap: WaitMap<String, TorrentialBound>,
}
impl DropServer {
/**
Reads from the socket, and tries to parse it into a message,
and then updates the waitmap with the corresponding message ID
and content
*/
async fn recieve_loop(
myself: Arc<DropServer>,
buffered_reader: &mut BufReader<OwnedReadHalf>,
) -> Result<(), anyhow::Error> {
let mut length_buffer: [u8; 8] = [0; 8];
buffered_reader.read_exact(&mut length_buffer).await?;
let length = usize::from_le_bytes(length_buffer);
let mut buffer = vec![0; length];
buffered_reader.read_exact(&mut buffer).await?;
let message = TorrentialBound::parse_from_bytes(&buffer)
.expect("response didn't deserialize correctly");
match message.type_.unwrap() {
TorrentialBoundType::GENERATE_MANIFEST => {
spawn(async move { generate_manifest(myself.clone(), message).await });
}
_ => {
myself.waitmap.insert(message.message_id.clone(), message);
}
}
Ok(())
}
/**
Long-lived subroutine that never returns, runs the recieve_loop and reconnects
as necessary
*/
async fn recieve_subroutine(myself: Arc<DropServer>, read_stream: OwnedReadHalf) -> ! {
let mut buffered_reader = BufReader::new(read_stream);
loop {
let mut length_buffer: [u8; 8] = [0; 8];
buffered_reader
.read_exact(&mut length_buffer)
.await
.expect("failed to read from internal pipe");
if let Err(err) = Self::recieve_loop(myself.clone(), &mut buffered_reader).await {
warn!("server disconnected with error: {:?}", err);
let length = usize::from_le_bytes(length_buffer);
let mut buffer = Vec::with_capacity(length);
let (drop_stream, _) = myself
.server
.accept()
.await
.expect("failed to accept new listener");
let (read, mut write) = drop_stream.into_split();
buffered_reader
.read_exact(&mut buffer)
.await
.expect("failed to read from internal pipe");
info!("reconnected to drop server");
let message =
Response::parse_from_bytes(&buffer).expect("response didn't deserialize correctly");
myself.waitmap.insert(message.message_id.clone(), message);
let mut lock = myself.write_stream.lock().await;
mem::swap(&mut *lock, &mut write);
let mut new_reader = BufReader::new(read);
mem::swap(&mut buffered_reader, &mut new_reader);
}
}
}
async fn wait_for_message_id<T>(&self, message_id: &str) -> Result<T, anyhow::Error>
/**
Uses the waitmap to wait for a response from a query
*/
pub async fn wait_for_message_id<T>(&self, message_id: &str) -> Result<T, anyhow::Error>
where
T: protobuf::Message,
{
let message = self
.waitmap
.wait(message_id.clone())
.wait(message_id)
.await
.ok_or(anyhow!("no response returned for value"))?;
let message = message.value();
match message.type_.unwrap() {
crate::proto::core::ResponseType::ERROR => {
crate::proto::core::TorrentialBoundType::ERROR => {
return Err(anyhow!(String::from_utf8(message.data.clone()).unwrap()));
}
_ => {
@@ -69,26 +115,41 @@ impl DropServer {
}
}
async fn send_message<T>(&self, message: T) -> Result<(), anyhow::Error>
/**
Sends a message, returning the message ID
*/
pub async fn send_message<T>(
&self,
message_type: DropBoundType,
message: T,
message_id: Option<String>,
) -> Result<String, anyhow::Error>
where
T: protobuf::Message,
{
let mut query = DropBound::new();
query.message_id = message_id.unwrap_or(uuid::Uuid::new_v4().to_string());
query.type_ = EnumOrUnknown::new(message_type);
query.data = Vec::new();
message.write_to_vec(&mut query.data)?;
let mut buf = Vec::new();
message.write_to_vec(&mut buf)?;
query.write_to_vec(&mut buf)?;
{
let mut mutex_lock = self
.write_stream
.lock()
.expect("failed to lock send stream");
let mut mutex_lock = self.write_stream.lock().await;
mutex_lock.write(&buf.len().to_le_bytes()).await?;
mutex_lock.write_all(&buf).await?;
};
Ok(())
Ok(query.message_id)
}
}
/**
Spins up the TCP listener, and waits for the first client to connect
Also starts the recieve subroutine
*/
pub async fn create_drop_server() -> Result<Arc<DropServer>, anyhow::Error> {
let server = TcpListener::bind("127.0.0.1:33148").await?;
@@ -97,11 +158,14 @@ pub async fn create_drop_server() -> Result<Arc<DropServer>, anyhow::Error> {
let (read, write) = drop_stream.into_split();
let client = Arc::new(DropServer {
server,
write_stream: Mutex::new(write),
waitmap: WaitMap::new(),
});
spawn(DropServer::recieve_subroutine(client.clone(), read));
info!("created client subroutine");
Ok(client)
}