From 62ff8be3c0fa696b71a3fced0887412a5f5cadbc Mon Sep 17 00:00:00 2001 From: DecDuck Date: Sat, 13 Dec 2025 21:14:59 +1100 Subject: [PATCH] feat: multithreaded manifest generation --- libraries/droplet/Cargo.lock | 116 ++++++++++++++- libraries/droplet/Cargo.toml | 5 +- libraries/droplet/src/lib.rs | 1 + libraries/droplet/src/main.rs | 19 +++ libraries/droplet/src/manifest.rs | 159 +++++++++++++-------- libraries/droplet/src/versions/backends.rs | 4 +- 6 files changed, 242 insertions(+), 62 deletions(-) create mode 100644 libraries/droplet/src/main.rs diff --git a/libraries/droplet/Cargo.lock b/libraries/droplet/Cargo.lock index fbd6236a..3740dca1 100644 --- a/libraries/droplet/Cargo.lock +++ b/libraries/droplet/Cargo.lock @@ -219,11 +219,12 @@ dependencies = [ [[package]] name = "droplet-rs" -version = "0.10.0" +version = "0.11.0" dependencies = [ "anyhow", "async-trait", "dyn-clone", + "futures", "hex", "humansize", "rcgen", @@ -245,6 +246,95 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -434,6 +524,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "powerfmt" version = "0.2.0" @@ -588,6 +684,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + [[package]] name = "syn" version = "2.0.100" @@ -692,9 +794,21 @@ dependencies = [ "mio", "pin-project-lite", "signal-hook-registry", + "tokio-macros", "windows-sys 0.61.2", ] +[[package]] +name = "tokio-macros" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "typenum" version = "1.19.0" diff --git a/libraries/droplet/Cargo.toml b/libraries/droplet/Cargo.toml index 1c70994f..5447a91d 100644 --- a/libraries/droplet/Cargo.toml +++ b/libraries/droplet/Cargo.toml @@ -2,7 +2,7 @@ edition = "2021" authors = ["Drop-OSS"] name = "droplet-rs" -version = "0.10.0" +version = "0.11.0" license = "AGPL-3.0-only" description = "Droplet is a `napi.rs` Rust/Node.js package full of high-performance and low-level utils for Drop" @@ -13,7 +13,7 @@ time = "0.3.41" webpki = "0.22.4" ring = "0.17.14" dyn-clone = "1.0.20" -tokio = { version = "^1.48.0", features = ["process", "fs", "io-util"] } +tokio = { version = "^1.48.0", features = ["process", "fs", "io-util", "sync", "macros", "rt-multi-thread"] } anyhow = "1.0.100" async-trait = "0.1.89" serde = { version = "1.0.228", features = ["derive"] } @@ -21,6 +21,7 @@ serde_json = "1.0.145" humansize = "2.1.3" uuid = { version = "1.19.0", features = ["v4"] } sha2 = "0.10.9" +futures = "0.3.31" [dependencies.x509-parser] version = "0.17.0" diff --git a/libraries/droplet/src/lib.rs b/libraries/droplet/src/lib.rs index 99364c0d..4a873503 100644 --- a/libraries/droplet/src/lib.rs +++ b/libraries/droplet/src/lib.rs @@ -1,4 +1,5 @@ #![deny(clippy::all)] +#![feature(impl_trait_in_bindings)] pub mod file_utils; pub mod ssl; diff --git a/libraries/droplet/src/main.rs b/libraries/droplet/src/main.rs new file mode 100644 index 00000000..724618b9 --- /dev/null +++ b/libraries/droplet/src/main.rs @@ -0,0 +1,19 @@ +use std::path::PathBuf; + +use droplet_rs::manifest::generate_manifest_rusty; +use tokio::runtime::{Handle, Runtime}; + +#[tokio::main] +pub async fn main() { + let metrics = Handle::current().metrics(); + println!("using {} workers", metrics.num_workers()); + generate_manifest_rusty( + &PathBuf::from("/home/decduck/.local/share/Steam/steamapps/common/Savage Resurrection"), + |_| {}, + |message| { + println!("{}", message); + }, + ) + .await + .unwrap(); +} diff --git a/libraries/droplet/src/manifest.rs b/libraries/droplet/src/manifest.rs index 57d952c1..ba759d25 100644 --- a/libraries/droplet/src/manifest.rs +++ b/libraries/droplet/src/manifest.rs @@ -1,14 +1,23 @@ -use std::{collections::HashMap, path::Path}; +use std::{ + collections::HashMap, + future::Future, + path::Path, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; -use anyhow::anyhow; +use anyhow::{anyhow, Error}; +use futures::{stream::FuturesUnordered, StreamExt}; use hex::ToHex as _; -use humansize::{BINARY, format_size}; +use humansize::{format_size, BINARY}; use serde::Serialize; use serde_json::json; use sha2::{Digest as _, Sha256}; -use tokio::io::AsyncReadExt as _; +use tokio::{io::AsyncReadExt as _, join, sync::Mutex}; -#[derive(Serialize)] +#[derive(Serialize, Clone)] struct FileEntry { filename: String, start: usize, @@ -16,7 +25,7 @@ struct FileEntry { permissions: u32, } -#[derive(Serialize)] +#[derive(Serialize, Clone)] struct ChunkData { files: Vec, checksum: String, @@ -24,9 +33,9 @@ struct ChunkData { #[derive(Serialize)] struct Manifest { - version: String, - chunks: HashMap, - size: u64, + version: String, + chunks: HashMap, + size: u64, } const CHUNK_SIZE: u64 = 1024 * 1024 * 64; @@ -42,7 +51,7 @@ pub async fn generate_manifest_rusty (), V: Fn(f32) -> ()>( let mut backend = create_backend_constructor(dir).ok_or(anyhow!("Could not create backend for path."))?()?; - let required_single_file = true; //backend.require_whole_files(); + let required_single_file = backend.require_whole_files(); let files = backend.list_files().await?; // Filepath to chunk data @@ -123,71 +132,107 @@ pub async fn generate_manifest_rusty (), V: Fn(f32) -> ()>( chunks.len() )); - let mut manifest: HashMap = HashMap::new(); - let mut total_manifest_length = 0; + let manifest: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let total_manifest_length = Arc::new(AtomicU64::new(0)); - let mut read_buf = vec![0; 1024 * 1024 * 64]; + let backend = Arc::new(Mutex::new(backend)); - let chunk_len = chunks.len(); + let futures: FuturesUnordered>> = + FuturesUnordered::new(); + let (send_log, mut recieve_log) = tokio::sync::mpsc::channel(16); for (index, chunk) in chunks.into_iter().enumerate() { - let uuid = uuid::Uuid::new_v4().to_string(); - let mut hasher = Sha256::new(); + let send_log = send_log.clone(); + let backend = backend.clone(); + let total_manifest_length = total_manifest_length.clone(); + let manifest = manifest.clone(); + futures.push((async move || -> Result<(), Error> { + let mut read_buf = vec![0; 1024 * 1024 * 64]; - let mut chunk_data = ChunkData { - files: Vec::new(), - checksum: String::new(), - }; + let uuid = uuid::Uuid::new_v4().to_string(); + let mut hasher = Sha256::new(); - let mut chunk_length = 0; + let mut chunk_data = ChunkData { + files: Vec::new(), + checksum: String::new(), + }; - for (file, start, length) in chunk { - log_sfn(format!( - "reading {} from {} to {}, {}", - file.relative_filename, - start, - start + length, - format_size(length, BINARY) - )); - let mut reader = backend.reader(&file, start, start + length).await?; + let mut chunk_length = 0; - loop { - let amount = reader.read(&mut read_buf).await?; - if amount == 0 { - break; + for (file, start, length) in chunk { + /* + send_log + .send(format!( + "reading {} from {} to {}, {}", + file.relative_filename, + start, + start + length, + format_size(length, BINARY) + )) + .await?; + */ + let mut reader = { + let mut backend_lock = backend.lock().await; + let reader = backend_lock.reader(&file, start, start + length).await?; + reader + }; + + loop { + let amount = reader.read(&mut read_buf).await?; + if amount == 0 { + break; + } + hasher.update(&read_buf[0..amount]); } - hasher.update(&read_buf[0..amount]); + + chunk_length += length; + + chunk_data.files.push(FileEntry { + filename: file.relative_filename, + start: start.try_into().unwrap(), + length: length.try_into().unwrap(), + permissions: file.permission, + }); } - chunk_length += length; + send_log + .send(format!( + "created chunk of size {} (index {})", + format_size(chunk_length, BINARY), + index + )) + .await?; - chunk_data.files.push(FileEntry { - filename: file.relative_filename, - start: start.try_into().unwrap(), - length: length.try_into().unwrap(), - permissions: file.permission, - }); - } + total_manifest_length.fetch_add(chunk_length, Ordering::Relaxed); - log_sfn(format!( - "created chunk of size {} ({}/{})", - format_size(chunk_length, BINARY), - index, - chunk_len - )); - total_manifest_length += chunk_length; + let hash: String = hasher.finalize().encode_hex(); + chunk_data.checksum = hash; + { + let mut manifest_lock = manifest.lock().await; + manifest_lock.insert(uuid, chunk_data); + }; - let hash: String = hasher.finalize().encode_hex(); - chunk_data.checksum = hash; - manifest.insert(uuid, chunk_data); - - let progress: f32 = (index as f32 / chunk_len as f32) * 100.0f32; - progress_sfn(progress); + Ok(()) + })()); } + drop(send_log); + join!( + async move { + while let Some(message) = recieve_log.recv().await { + log_sfn(message); + } + }, + futures.collect::>>() + ); + + let manifest = manifest.lock().await; + let manifest = manifest.clone(); + let manifest_size = size_of_val(&manifest); + println!("manifest uses {} bytes", manifest_size); Ok(json!(Manifest { version: "2".to_string(), chunks: manifest, - size: total_manifest_length + size: total_manifest_length.fetch_add(0, Ordering::Relaxed) }) .to_string()) } diff --git a/libraries/droplet/src/versions/backends.rs b/libraries/droplet/src/versions/backends.rs index cf1e408d..fa469db8 100644 --- a/libraries/droplet/src/versions/backends.rs +++ b/libraries/droplet/src/versions/backends.rs @@ -195,8 +195,8 @@ impl VersionBackend for ZipVersionBackend { async fn reader( &mut self, file: &VersionFile, - start: u64, - end: u64, + _start: u64, + _end: u64, ) -> anyhow::Result> { let mut read_command = Command::new("7z"); read_command.args(vec!["e", "-so", &self.path, &file.relative_filename]);