From 454fb19941bb03cc91dbe805e5289b68285a3ee4 Mon Sep 17 00:00:00 2001 From: DecDuck Date: Thu, 5 Feb 2026 22:36:50 +1100 Subject: [PATCH] feat: rest of droplet calls --- torrential/proto/core.proto | 47 ++++++++----- torrential/proto/droplet.proto | 28 +++++++- torrential/src/droplet/backend.rs | 102 +++++++++++++++++++++++++++++ torrential/src/droplet/cert.rs | 71 ++++++++++++++++++++ torrential/src/droplet/manifest.rs | 40 +++-------- torrential/src/droplet/mod.rs | 32 ++++++++- torrential/src/server/mod.rs | 30 ++++++++- 7 files changed, 299 insertions(+), 51 deletions(-) create mode 100644 torrential/src/droplet/backend.rs diff --git a/torrential/proto/core.proto b/torrential/proto/core.proto index 2277396f..0b1da663 100644 --- a/torrential/proto/core.proto +++ b/torrential/proto/core.proto @@ -1,30 +1,45 @@ syntax = "proto3"; enum TorrentialBoundType { - ERROR = 0; - SERVER_GAMES_RESPONSE = 1; - VERSION_RESPONSE = 2; - GENERATE_MANIFEST = 3; + ERROR = 0; + SERVER_GAMES_RESPONSE = 1; + VERSION_RESPONSE = 2; + + GENERATE_MANIFEST = 3; + GENERATE_ROOT_CA = 4; + GENERATE_CLIENT_CERT = 5; + + HAS_BACKEND_QUERY = 6; + LIST_FILES_QUERY = 7; + PEEK_FILE_QUERY = 8; } message TorrentialBound { - string message_id = 1; - TorrentialBoundType type = 2; - bytes data = 3; + string message_id = 1; + TorrentialBoundType type = 2; + bytes data = 3; } enum DropBoundType { - SERVER_GAMES_QUERY = 0; - VERSION_QUERY = 1; + SERVER_GAMES_QUERY = 0; + VERSION_QUERY = 1; - MANIFEST_PROGRESS = 2; - MANIFEST_LOG = 3; - MANIFEST_COMPLETE = 4; - MANIFEST_ERROR = 5; + RPC_ERROR = 2; + + MANIFEST_PROGRESS = 3; + MANIFEST_LOG = 4; + MANIFEST_COMPLETE = 5; + + ROOT_CA_COMPLETE = 6; + CLIENT_CERT_COMPLETE = 7; + + HAS_BACKEND_COMPLETE = 8; + LIST_FILES_COMPLETE = 9; + PEEK_FILE_COMPLETE = 10; } message DropBound { - string message_id = 1; - DropBoundType type = 2; - bytes data = 3; + string message_id = 1; + DropBoundType type = 2; + bytes data = 3; } diff --git a/torrential/proto/droplet.proto b/torrential/proto/droplet.proto index ddb4da99..357a31f8 100644 --- a/torrential/proto/droplet.proto +++ b/torrential/proto/droplet.proto @@ -1,5 +1,9 @@ syntax = "proto3"; +message RpcError { + string error = 1; +} + /// Certificates message RootCertQuery {} message RootCertResponse { @@ -32,6 +36,26 @@ message ManifestLog { message ManifestComplete { string manifest = 1; } -message ManifestError { - string error = 1; + +/// Backend tools +message HasBackendQuery { + string path = 1; +} +message HasBackendResponse { + bool result = 1; +} + +message ListFilesQuery { + string path = 1; +} +message ListFilesResponse { + repeated string files = 1; +} + +message PeekFileQuery { + string path = 1; + string filename = 2; +} +message PeekFileResponse { + uint64 size = 1; } diff --git a/torrential/src/droplet/backend.rs b/torrential/src/droplet/backend.rs new file mode 100644 index 00000000..d91ba401 --- /dev/null +++ b/torrential/src/droplet/backend.rs @@ -0,0 +1,102 @@ +use std::{ + path::Path, + sync::{Arc, LazyLock}, +}; + +use anyhow::anyhow; +use dashmap::DashMap; +use droplet_rs::versions::types::VersionBackend; +use protobuf::Message; + +use crate::{ + proto::{ + core::{DropBoundType, TorrentialBound}, + droplet::{ + HasBackendQuery, HasBackendResponse, ListFilesQuery, ListFilesResponse, PeekFileQuery, + PeekFileResponse, + }, + }, + server::DropServer, +}; + +pub async fn has_backend_rpc( + server: Arc, + message: TorrentialBound, +) -> Result<(), anyhow::Error> { + let has_backend = HasBackendQuery::parse_from_bytes(&message.data)?; + + let has_backend = { + let path = Path::new(&has_backend.path); + let backend_constructor = droplet_rs::versions::create_backend_constructor(path); + + backend_constructor.is_some() + }; + + let mut response = HasBackendResponse::new(); + response.result = has_backend; + + server + .send_message( + DropBoundType::HAS_BACKEND_COMPLETE, + response, + Some(message.message_id), + ) + .await?; + + Ok(()) +} + +fn create_backend(path: &String) -> Result, anyhow::Error> { + let backend_constructor = droplet_rs::versions::create_backend_constructor(Path::new(path)) + .ok_or(anyhow!("backend doesn't exist at path {}", path))?; + let backend = backend_constructor()?; + + Ok(backend) +} + +pub async fn list_files_rpc( + server: Arc, + message: TorrentialBound, +) -> Result<(), anyhow::Error> { + let query = ListFilesQuery::parse_from_bytes(&message.data)?; + + let mut backend = create_backend(&query.path)?; + + let files = backend.list_files().await?; + + let mut response = ListFilesResponse::new(); + response.files = files.into_iter().map(|v| v.relative_filename).collect(); + + server + .send_message( + DropBoundType::LIST_FILES_COMPLETE, + response, + Some(message.message_id), + ) + .await?; + + Ok(()) +} + +pub async fn peek_file_rpc( + server: Arc, + message: TorrentialBound, +) -> Result<(), anyhow::Error> { + let query = PeekFileQuery::parse_from_bytes(&message.data)?; + + let mut backend = create_backend(&query.path)?; + let file_peek = backend.peek_file(query.filename).await?; + + let mut response = PeekFileResponse::new(); + response.size = file_peek.size; + + server + .send_message( + DropBoundType::PEEK_FILE_COMPLETE, + response, + Some(message.message_id), + ) + .await?; + + Ok(()) +} diff --git a/torrential/src/droplet/cert.rs b/torrential/src/droplet/cert.rs index e69de29b..80db5f4b 100644 --- a/torrential/src/droplet/cert.rs +++ b/torrential/src/droplet/cert.rs @@ -0,0 +1,71 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use protobuf::Message as _; + +use crate::{ + proto::{ + core::{DropBoundType, TorrentialBound}, + droplet::{ClientCertQuery, ClientCertResponse, RootCertResponse}, + }, + server::DropServer, +}; + +pub async fn generate_root_ca_rpc( + server: Arc, + message: TorrentialBound, +) -> Result<(), anyhow::Error> { + let manifest = droplet_rs::ssl::generate_root_ca()?; + let mut manifest = manifest.into_iter(); + + let mut root_ca = RootCertResponse::new(); + root_ca.cert = manifest + .next() + .ok_or(anyhow!("root ca generation missing cert"))?; + root_ca.priv_ = manifest + .next() + .ok_or(anyhow!("root ca generation missing priv"))?; + + server + .send_message( + DropBoundType::ROOT_CA_COMPLETE, + root_ca, + Some(message.message_id), + ) + .await?; + + Ok(()) +} + +pub async fn generate_client_cert_rpc( + server: Arc, + message: TorrentialBound, +) -> Result<(), anyhow::Error> { + let generate_message = ClientCertQuery::parse_from_bytes(&message.data)?; + + let cert = droplet_rs::ssl::generate_client_certificate( + generate_message.client_id, + generate_message.client_name, + generate_message.root_cert, + generate_message.root_priv, + )?; + let mut cert = cert.into_iter(); + + let mut client_cert = ClientCertResponse::new(); + client_cert.cert = cert + .next() + .ok_or(anyhow!("client cert generation missing cert"))?; + client_cert.priv_ = cert + .next() + .ok_or(anyhow!("client cert generation missing priv"))?; + + server + .send_message( + DropBoundType::CLIENT_CERT_COMPLETE, + client_cert, + Some(message.message_id), + ) + .await?; + + Ok(()) +} diff --git a/torrential/src/droplet/manifest.rs b/torrential/src/droplet/manifest.rs index b91fb959..688a9ce4 100644 --- a/torrential/src/droplet/manifest.rs +++ b/torrential/src/droplet/manifest.rs @@ -3,33 +3,35 @@ use std::{ sync::{Arc, LazyLock}, }; -use log::{info, warn}; +use log::info; use protobuf::Message; use serde_json::json; use tokio::{spawn, sync::Semaphore}; use crate::{ - droplet, proto::{ core::{DropBoundType, TorrentialBound}, - droplet::{ - GenerateManifest, ManifestComplete, ManifestError, ManifestLog, ManifestProgress, - }, + droplet::{GenerateManifest, ManifestComplete, ManifestLog, ManifestProgress}, }, server::DropServer, }; static READER_SEMAPHORE: LazyLock = LazyLock::new(|| { - let cores = num_cpus::get(); + let cores = std::env::var("READER_THREADS") + .ok() + .map(|v| str::parse::(&v).ok()) + .flatten() + .unwrap_or(num_cpus::get() / 2); + info!("using {} import threads", cores); Semaphore::new(cores) }); -async fn generate_manifest_raw( +pub async fn generate_manifest_rpc( server: Arc, message: TorrentialBound, ) -> Result<(), anyhow::Error> { let manifest_message = GenerateManifest::parse_from_bytes(&message.data)?; - + let manifest = droplet_rs::manifest::generate_manifest_rusty( &PathBuf::from(manifest_message.version_dir), |progress| { @@ -77,25 +79,3 @@ async fn generate_manifest_raw( Ok(()) } - -pub async fn generate_manifest(server: Arc, message: TorrentialBound) { - let message_id = message.message_id.clone(); - warn!("generating manifest..."); - let result = generate_manifest_raw(server.clone(), message).await; - info!("manifest generation exited"); - if let Err(err) = result { - warn!("manifest generation failed with err: {:?}", err); - let mut manifest_err = ManifestError::new(); - manifest_err.error = err.to_string(); - let _ = server - .send_message( - DropBoundType::MANIFEST_ERROR, - manifest_err, - Some(message_id), - ) - .await - .inspect_err(|err| { - warn!("failed to send manifest err: {err:?}"); - }); - } -} diff --git a/torrential/src/droplet/mod.rs b/torrential/src/droplet/mod.rs index e07a6a54..9b636aac 100644 --- a/torrential/src/droplet/mod.rs +++ b/torrential/src/droplet/mod.rs @@ -1,2 +1,32 @@ -pub mod manifest; +use std::sync::Arc; + +use log::{info, warn}; + +use crate::{proto::{core::{DropBoundType, TorrentialBound}, droplet::RpcError}, server::DropServer}; + pub mod cert; +pub mod manifest; +pub mod backend; + +pub async fn call_rpc(server: Arc, message: TorrentialBound, rpc: T) +where + T: AsyncFn(Arc, TorrentialBound) -> Result<(), anyhow::Error>, +{ + let message_id = message.message_id.clone(); + let result = rpc(server.clone(), message).await; + if let Err(err) = result { + warn!("manifest generation failed with err: {:?}", err); + let mut manifest_err = RpcError::new(); + manifest_err.error = err.to_string(); + let _ = server + .send_message( + DropBoundType::RPC_ERROR, + manifest_err, + Some(message_id), + ) + .await + .inspect_err(|err| { + warn!("failed to send manifest err: {err:?}"); + }); + } +} diff --git a/torrential/src/server/mod.rs b/torrential/src/server/mod.rs index ca583c51..fbe602ae 100644 --- a/torrential/src/server/mod.rs +++ b/torrential/src/server/mod.rs @@ -15,12 +15,23 @@ use tokio::{ use waitmap::WaitMap; use crate::{ - droplet::manifest::generate_manifest, + droplet::{ + backend::{has_backend_rpc, list_files_rpc, peek_file_rpc}, + call_rpc, + cert::generate_client_cert_rpc, + manifest::generate_manifest_rpc, + }, proto::core::{DropBound, DropBoundType, TorrentialBound, TorrentialBoundType}, }; pub mod download; +macro_rules! spawn_rpc { + ($myself:ident, $message:ident, $func_name:ident) => { + spawn(async move { call_rpc($myself.clone(), $message, $func_name).await }); + }; +} + pub struct DropServer { server: TcpListener, write_stream: Mutex, @@ -50,7 +61,22 @@ impl DropServer { match message.type_.unwrap() { TorrentialBoundType::GENERATE_MANIFEST => { - spawn(async move { generate_manifest(myself.clone(), message).await }); + spawn_rpc!(myself, message, generate_manifest_rpc); + } + TorrentialBoundType::GENERATE_ROOT_CA => { + spawn_rpc!(myself, message, generate_manifest_rpc); + } + TorrentialBoundType::GENERATE_CLIENT_CERT => { + spawn_rpc!(myself, message, generate_client_cert_rpc); + } + TorrentialBoundType::LIST_FILES_QUERY => { + spawn_rpc!(myself, message, list_files_rpc); + } + TorrentialBoundType::HAS_BACKEND_QUERY => { + spawn_rpc!(myself, message, has_backend_rpc); + } + TorrentialBoundType::PEEK_FILE_QUERY => { + spawn_rpc!(myself, message, peek_file_rpc); } _ => { myself.waitmap.insert(message.message_id.clone(), message);