diff --git a/torrential/Cargo.lock b/torrential/Cargo.lock index fa905fd5..c9de6188 100644 --- a/torrential/Cargo.lock +++ b/torrential/Cargo.lock @@ -155,6 +155,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" dependencies = [ "axum-core", + "axum-macros", "bytes", "form_urlencoded", "futures-util", @@ -200,6 +201,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "base64" version = "0.22.1" @@ -544,8 +556,9 @@ dependencies = [ [[package]] name = "droplet-rs" -version = "0.12.2" -source = "git+https://github.com/Drop-OSS/droplet-rs.git#05f7027b362fcc72b7cc621df8b5b0850b6cf082" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef23d187f7153bc3ce1c6a5440be1206e2aaf42b153825ddf2f8151f278539c5" dependencies = [ "anyhow", "async-trait", @@ -2108,6 +2121,7 @@ dependencies = [ "file_open_limit", "futures-util", "log", + "num_cpus", "pin-project-lite", "protobuf", "protobuf-codegen", @@ -2120,6 +2134,7 @@ dependencies = [ "tokio", "tokio-util", "url", + "uuid", "waitmap", ] @@ -2233,9 +2248,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" +checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f" dependencies = [ "getrandom 0.3.4", "js-sys", diff --git a/torrential/Cargo.toml b/torrential/Cargo.toml index 51b90a5f..d2603d75 100644 --- a/torrential/Cargo.toml +++ b/torrential/Cargo.toml @@ -12,7 +12,7 @@ name = "torrential" path = "src/main.rs" [dependencies] -axum = "0.8.7" +axum = { version = "0.8.7", features = ["macros"] } log = "0.4.28" reqwest = { version = "0.12.24", default-features = false, features = [ "json", @@ -23,7 +23,7 @@ simple_logger = { version = "5.1.0", default-features = false, features = [ "colors", ] } tokio = { version = "*", features = ["rt-multi-thread", "sync"] } -droplet-rs = { git="https://github.com/Drop-OSS/droplet-rs.git" } +droplet-rs = "0.15.1" dashmap = "6.1.0" anyhow = "1.0.100" serde_json = "1.0.145" @@ -38,6 +38,8 @@ file_open_limit = "0.0.5" pin-project-lite = "0.2.16" protobuf = "3.7.2" waitmap = "1.1.0" +uuid = { version = "1.20.0", features = ["v4"] } +num_cpus = "1.17.0" [lints.clippy] pedantic = { level = "warn", priority = -1 } diff --git a/torrential/proto/core.proto b/torrential/proto/core.proto index b2348a2c..2277396f 100644 --- a/torrential/proto/core.proto +++ b/torrential/proto/core.proto @@ -1,24 +1,30 @@ syntax = "proto3"; -enum ResponseType { +enum TorrentialBoundType { ERROR = 0; SERVER_GAMES_RESPONSE = 1; VERSION_RESPONSE = 2; + GENERATE_MANIFEST = 3; } -message Response { +message TorrentialBound { string message_id = 1; - ResponseType type = 2; + TorrentialBoundType type = 2; bytes data = 3; } -enum QueryType { +enum DropBoundType { SERVER_GAMES_QUERY = 0; VERSION_QUERY = 1; + + MANIFEST_PROGRESS = 2; + MANIFEST_LOG = 3; + MANIFEST_COMPLETE = 4; + MANIFEST_ERROR = 5; } -message Query { +message DropBound { string message_id = 1; - QueryType type = 2; + DropBoundType type = 2; bytes data = 3; } diff --git a/torrential/proto/droplet.proto b/torrential/proto/droplet.proto new file mode 100644 index 00000000..ddb4da99 --- /dev/null +++ b/torrential/proto/droplet.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; + +/// Certificates +message RootCertQuery {} +message RootCertResponse { + string cert = 1; + string priv = 2; +} + +message ClientCertQuery { + string client_id = 1; + string client_name = 2; + string root_cert = 3; + string root_priv = 4; +} +message ClientCertResponse { + string cert = 1; + string priv = 2; +} + +/// Manifest generation +message GenerateManifest { + string version_dir = 1; +} + +message ManifestProgress { + float progress = 1; +} +message ManifestLog { + string log_line = 1; +} +message ManifestComplete { + string manifest = 1; +} +message ManifestError { + string error = 1; +} diff --git a/torrential/src/conversions.rs b/torrential/src/conversions.rs index b606b18e..622168da 100644 --- a/torrential/src/conversions.rs +++ b/torrential/src/conversions.rs @@ -1,4 +1,4 @@ -use crate::proto::{self, version::version_response::Manifest}; +use crate::proto::version::version_response::Manifest; fn fixed_length(v: Vec) -> [T; N] { v.try_into() diff --git a/torrential/src/download.rs b/torrential/src/downloads/download.rs similarity index 100% rename from torrential/src/download.rs rename to torrential/src/downloads/download.rs diff --git a/torrential/src/handlers.rs b/torrential/src/downloads/handlers.rs similarity index 93% rename from torrential/src/handlers.rs rename to torrential/src/downloads/handlers.rs index c342f416..ad597bf1 100644 --- a/torrential/src/handlers.rs +++ b/torrential/src/downloads/handlers.rs @@ -1,9 +1,6 @@ use std::{ collections::HashMap, - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, + sync::Arc, task::Poll, }; @@ -24,7 +21,7 @@ use tokio_util::io::ReaderStream; use crate::{server::download::fetch_instance_games, state::AppState}; -pub async fn healthcheck(State(state): State>) -> StatusCode { +pub async fn healthcheck() -> StatusCode { StatusCode::OK } @@ -60,7 +57,7 @@ const ZERO: [u8; 1024] = [0u8; _]; impl AsyncRead for SpeedtestStream { fn poll_read( mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, + _cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { if self.remaining > 0 { diff --git a/torrential/src/downloads/mod.rs b/torrential/src/downloads/mod.rs new file mode 100644 index 00000000..b9959744 --- /dev/null +++ b/torrential/src/downloads/mod.rs @@ -0,0 +1,3 @@ +pub mod handlers; +pub mod serve; +pub mod download; diff --git a/torrential/src/serve.rs b/torrential/src/downloads/serve.rs similarity index 96% rename from torrential/src/serve.rs rename to torrential/src/downloads/serve.rs index 3a22b370..d700a75f 100644 --- a/torrential/src/serve.rs +++ b/torrential/src/downloads/serve.rs @@ -1,7 +1,5 @@ use std::{ - cell::{LazyCell, OnceCell}, io::Error, - rc::Rc, sync::{Arc, LazyLock}, }; @@ -10,7 +8,7 @@ use axum::{ body::Body, extract::{Path, State}, http::HeaderMap, - response::{AppendHeaders, IntoResponse}, + response::IntoResponse, }; use bytes::Bytes; use dashmap::{DashMap, mapref::one::RefMut}; @@ -26,7 +24,8 @@ use tokio::sync::{Semaphore, SemaphorePermit}; use tokio_util::io::ReaderStream; use crate::{ - DownloadContext, GLOBAL_CONTEXT_SEMAPHORE, download::create_download_context, state::AppState, + DownloadContext, GLOBAL_CONTEXT_SEMAPHORE, downloads::download::create_download_context, + state::AppState, }; type Aes128Ctr64LE = ctr::Ctr64LE; diff --git a/torrential/src/droplet/cert.rs b/torrential/src/droplet/cert.rs new file mode 100644 index 00000000..e69de29b diff --git a/torrential/src/droplet/manifest.rs b/torrential/src/droplet/manifest.rs new file mode 100644 index 00000000..6af4f0a4 --- /dev/null +++ b/torrential/src/droplet/manifest.rs @@ -0,0 +1,105 @@ +use std::{ + path::PathBuf, + sync::{Arc, LazyLock}, +}; + +use log::{info, warn}; +use protobuf::Message; +use serde_json::json; +use tokio::{spawn, sync::Semaphore}; + +use crate::{ + droplet, + proto::{ + core::{DropBoundType, TorrentialBound}, + droplet::{ + GenerateManifest, ManifestComplete, ManifestError, ManifestLog, ManifestProgress, + }, + }, + server::DropServer, +}; + +static READER_SEMAPHORE: LazyLock = LazyLock::new(|| { + let cores = num_cpus::get(); + Semaphore::new(cores) +}); + +async fn generate_manifest_raw( + server: Arc, + message: TorrentialBound, +) -> Result<(), anyhow::Error> { + let manifest_message = GenerateManifest::parse_from_bytes(&message.data)?; + info!( + "seven zip install: {}", + *droplet_rs::versions::backends::SEVEN_ZIP_INSTALLED + ); + + let manifest = droplet_rs::manifest::generate_manifest_rusty( + &PathBuf::from(manifest_message.version_dir), + |progress| { + let mut progress_message = ManifestProgress::new(); + progress_message.progress = progress; + + let server = server.clone(); + let message_id = message.message_id.clone(); + spawn(async move { + let _ = server + .send_message( + DropBoundType::MANIFEST_PROGRESS, + progress_message, + Some(message_id), + ) + .await; + }); + }, + |log_line| { + let mut progress_log = ManifestLog::new(); + progress_log.log_line = log_line; + + let server = server.clone(); + let message_id = message.message_id.clone(); + spawn(async move { + let _ = server + .send_message(DropBoundType::MANIFEST_LOG, progress_log, Some(message_id)) + .await; + }); + }, + Some(&READER_SEMAPHORE), + ) + .await?; + + let mut manifest_complete = ManifestComplete::new(); + manifest_complete.manifest = json!(manifest).to_string(); + + server + .send_message( + DropBoundType::MANIFEST_COMPLETE, + manifest_complete, + Some(message.message_id), + ) + .await?; + + Ok(()) +} + +pub async fn generate_manifest(server: Arc, message: TorrentialBound) { + let message_id = message.message_id.clone(); + warn!("generating manifest..."); + let result = generate_manifest_raw(server.clone(), message).await; + info!("manifest generation exited"); + if let Err(err) = result { + warn!("manifest generation failed with err: {:?}", err); + let mut manifest_err = ManifestError::new(); + manifest_err.error = err.to_string(); + let _ = server + .send_message( + DropBoundType::MANIFEST_ERROR, + manifest_err, + Some(message_id), + ) + .await + .inspect_err(|err| { + warn!("failed to send manifest err: {err:?}"); + }); + } +} diff --git a/torrential/src/droplet/mod.rs b/torrential/src/droplet/mod.rs new file mode 100644 index 00000000..e07a6a54 --- /dev/null +++ b/torrential/src/droplet/mod.rs @@ -0,0 +1,2 @@ +pub mod manifest; +pub mod cert; diff --git a/torrential/src/lib.rs b/torrential/src/lib.rs index 8cc3309f..3fa089f6 100644 --- a/torrential/src/lib.rs +++ b/torrential/src/lib.rs @@ -1,13 +1,12 @@ use tokio::sync::Semaphore; -pub mod download; -pub mod serve; -pub mod handlers; +pub mod downloads; pub mod state; pub mod util; pub mod proto; pub mod conversions; pub mod server; +pub mod droplet; -pub use download::DownloadContext; +pub use downloads::download::DownloadContext; static GLOBAL_CONTEXT_SEMAPHORE: Semaphore = Semaphore::const_new(1); diff --git a/torrential/src/main.rs b/torrential/src/main.rs index 025d6ee4..bfdf92e4 100644 --- a/torrential/src/main.rs +++ b/torrential/src/main.rs @@ -12,7 +12,11 @@ use dashmap::DashMap; use log::info; use simple_logger::SimpleLogger; use tokio::{runtime::Handle, spawn, time}; -use torrential::{handlers, serve, server::create_drop_server, state::AppState}; +use torrential::{ + downloads::{handlers, serve}, + server::create_drop_server, + state::AppState, +}; const CONTEXT_TTL: u64 = 10 * 60; @@ -28,7 +32,9 @@ async fn main() { let metrics = Handle::current().metrics(); info!("using {} threads", metrics.num_workers()); - let server = create_drop_server().await.expect("failed to connect to drop server"); + let server = create_drop_server() + .await + .expect("failed to connect to drop server"); let shared_state = Arc::new(AppState { context_cache: DashMap::new(), diff --git a/torrential/src/server/download.rs b/torrential/src/server/download.rs index 5a366455..6c71aaea 100644 --- a/torrential/src/server/download.rs +++ b/torrential/src/server/download.rs @@ -1,15 +1,37 @@ -use crate::{proto::{manifest::server_games_response::SkeletonGame, version::VersionResponse}, state::{AppState}, util::ErrorOption}; +use crate::{ + proto::{ + core::DropBoundType, + manifest::{ServerGamesQuery, ServerGamesResponse, server_games_response::SkeletonGame}, + version::{VersionQuery, VersionResponse}, + }, + state::AppState, + util::ErrorOption, +}; pub async fn fetch_version_data( app_state: &AppState, - game_id: String, + _game_id: String, version_id: String, ) -> Result { - unreachable!(); + let mut query = VersionQuery::new(); + query.version_id = version_id; + let message_id = app_state + .server + .send_message(DropBoundType::VERSION_QUERY, query, None) + .await?; + + let response: VersionResponse = app_state.server.wait_for_message_id(&message_id).await?; + + Ok(response) } -pub async fn fetch_instance_games( - app_state: &AppState, -) -> Result, ErrorOption> { - unreachable!() +pub async fn fetch_instance_games(app_state: &AppState) -> Result, ErrorOption> { + let message_id = app_state + .server + .send_message(DropBoundType::SERVER_GAMES_QUERY, ServerGamesQuery::new(), None) + .await?; + + let response: ServerGamesResponse = app_state.server.wait_for_message_id(&message_id).await?; + + return Ok(response.games); } diff --git a/torrential/src/server/mod.rs b/torrential/src/server/mod.rs index d59bc8c0..ca583c51 100644 --- a/torrential/src/server/mod.rs +++ b/torrential/src/server/mod.rs @@ -1,7 +1,8 @@ -use std::sync::{Arc, Mutex}; +use std::{mem, sync::Arc}; use anyhow::anyhow; -use protobuf::Message; +use log::{info, warn}; +use protobuf::{EnumOrUnknown, Message}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt as _, BufReader}, net::{ @@ -9,57 +10,102 @@ use tokio::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, }, spawn, + sync::Mutex, }; use waitmap::WaitMap; -use crate::proto::core::Response; +use crate::{ + droplet::manifest::generate_manifest, + proto::core::{DropBound, DropBoundType, TorrentialBound, TorrentialBoundType}, +}; pub mod download; pub struct DropServer { + server: TcpListener, write_stream: Mutex, - waitmap: WaitMap, + waitmap: WaitMap, } impl DropServer { + /** + Reads from the socket, and tries to parse it into a message, + and then updates the waitmap with the corresponding message ID + and content + */ + async fn recieve_loop( + myself: Arc, + buffered_reader: &mut BufReader, + ) -> Result<(), anyhow::Error> { + let mut length_buffer: [u8; 8] = [0; 8]; + buffered_reader.read_exact(&mut length_buffer).await?; + + let length = usize::from_le_bytes(length_buffer); + let mut buffer = vec![0; length]; + + buffered_reader.read_exact(&mut buffer).await?; + + let message = TorrentialBound::parse_from_bytes(&buffer) + .expect("response didn't deserialize correctly"); + + match message.type_.unwrap() { + TorrentialBoundType::GENERATE_MANIFEST => { + spawn(async move { generate_manifest(myself.clone(), message).await }); + } + _ => { + myself.waitmap.insert(message.message_id.clone(), message); + } + } + + Ok(()) + } + + /** + Long-lived subroutine that never returns, runs the recieve_loop and reconnects + as necessary + */ async fn recieve_subroutine(myself: Arc, read_stream: OwnedReadHalf) -> ! { let mut buffered_reader = BufReader::new(read_stream); loop { - let mut length_buffer: [u8; 8] = [0; 8]; - buffered_reader - .read_exact(&mut length_buffer) - .await - .expect("failed to read from internal pipe"); + if let Err(err) = Self::recieve_loop(myself.clone(), &mut buffered_reader).await { + warn!("server disconnected with error: {:?}", err); - let length = usize::from_le_bytes(length_buffer); - let mut buffer = Vec::with_capacity(length); + let (drop_stream, _) = myself + .server + .accept() + .await + .expect("failed to accept new listener"); + let (read, mut write) = drop_stream.into_split(); - buffered_reader - .read_exact(&mut buffer) - .await - .expect("failed to read from internal pipe"); + info!("reconnected to drop server"); - let message = - Response::parse_from_bytes(&buffer).expect("response didn't deserialize correctly"); - myself.waitmap.insert(message.message_id.clone(), message); + let mut lock = myself.write_stream.lock().await; + mem::swap(&mut *lock, &mut write); + + let mut new_reader = BufReader::new(read); + mem::swap(&mut buffered_reader, &mut new_reader); + } } } - async fn wait_for_message_id(&self, message_id: &str) -> Result + /** + Uses the waitmap to wait for a response from a query + */ + pub async fn wait_for_message_id(&self, message_id: &str) -> Result where T: protobuf::Message, { let message = self .waitmap - .wait(message_id.clone()) + .wait(message_id) .await .ok_or(anyhow!("no response returned for value"))?; let message = message.value(); match message.type_.unwrap() { - crate::proto::core::ResponseType::ERROR => { + crate::proto::core::TorrentialBoundType::ERROR => { return Err(anyhow!(String::from_utf8(message.data.clone()).unwrap())); } _ => { @@ -69,26 +115,41 @@ impl DropServer { } } - async fn send_message(&self, message: T) -> Result<(), anyhow::Error> + /** + Sends a message, returning the message ID + */ + pub async fn send_message( + &self, + message_type: DropBoundType, + message: T, + message_id: Option, + ) -> Result where T: protobuf::Message, { + let mut query = DropBound::new(); + query.message_id = message_id.unwrap_or(uuid::Uuid::new_v4().to_string()); + query.type_ = EnumOrUnknown::new(message_type); + query.data = Vec::new(); + message.write_to_vec(&mut query.data)?; + let mut buf = Vec::new(); - message.write_to_vec(&mut buf)?; + query.write_to_vec(&mut buf)?; { - let mut mutex_lock = self - .write_stream - .lock() - .expect("failed to lock send stream"); + let mut mutex_lock = self.write_stream.lock().await; mutex_lock.write(&buf.len().to_le_bytes()).await?; mutex_lock.write_all(&buf).await?; }; - Ok(()) + Ok(query.message_id) } } +/** +Spins up the TCP listener, and waits for the first client to connect +Also starts the recieve subroutine +*/ pub async fn create_drop_server() -> Result, anyhow::Error> { let server = TcpListener::bind("127.0.0.1:33148").await?; @@ -97,11 +158,14 @@ pub async fn create_drop_server() -> Result, anyhow::Error> { let (read, write) = drop_stream.into_split(); let client = Arc::new(DropServer { + server, write_stream: Mutex::new(write), waitmap: WaitMap::new(), }); spawn(DropServer::recieve_subroutine(client.clone(), read)); + info!("created client subroutine"); + Ok(client) }