feat: multithreaded manifest generation
This commit is contained in:
Generated
+115
-1
@@ -219,11 +219,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "droplet-rs"
|
||||
version = "0.10.0"
|
||||
version = "0.11.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"dyn-clone",
|
||||
"futures",
|
||||
"hex",
|
||||
"humansize",
|
||||
"rcgen",
|
||||
@@ -245,6 +246,95 @@ version = "1.0.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-channel"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-core"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
|
||||
|
||||
[[package]]
|
||||
name = "futures-executor"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-io"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
|
||||
|
||||
[[package]]
|
||||
name = "futures-macro"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-sink"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
|
||||
|
||||
[[package]]
|
||||
name = "futures-task"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
|
||||
|
||||
[[package]]
|
||||
name = "futures-util"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-macro",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "generic-array"
|
||||
version = "0.14.7"
|
||||
@@ -434,6 +524,12 @@ version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
|
||||
|
||||
[[package]]
|
||||
name = "pin-utils"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "powerfmt"
|
||||
version = "0.2.0"
|
||||
@@ -588,6 +684,12 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589"
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.100"
|
||||
@@ -692,9 +794,21 @@ dependencies = [
|
||||
"mio",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-macros"
|
||||
version = "2.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "typenum"
|
||||
version = "1.19.0"
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
edition = "2021"
|
||||
authors = ["Drop-OSS"]
|
||||
name = "droplet-rs"
|
||||
version = "0.10.0"
|
||||
version = "0.11.0"
|
||||
license = "AGPL-3.0-only"
|
||||
description = "Droplet is a `napi.rs` Rust/Node.js package full of high-performance and low-level utils for Drop"
|
||||
|
||||
@@ -13,7 +13,7 @@ time = "0.3.41"
|
||||
webpki = "0.22.4"
|
||||
ring = "0.17.14"
|
||||
dyn-clone = "1.0.20"
|
||||
tokio = { version = "^1.48.0", features = ["process", "fs", "io-util"] }
|
||||
tokio = { version = "^1.48.0", features = ["process", "fs", "io-util", "sync", "macros", "rt-multi-thread"] }
|
||||
anyhow = "1.0.100"
|
||||
async-trait = "0.1.89"
|
||||
serde = { version = "1.0.228", features = ["derive"] }
|
||||
@@ -21,6 +21,7 @@ serde_json = "1.0.145"
|
||||
humansize = "2.1.3"
|
||||
uuid = { version = "1.19.0", features = ["v4"] }
|
||||
sha2 = "0.10.9"
|
||||
futures = "0.3.31"
|
||||
|
||||
[dependencies.x509-parser]
|
||||
version = "0.17.0"
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#![deny(clippy::all)]
|
||||
#![feature(impl_trait_in_bindings)]
|
||||
|
||||
pub mod file_utils;
|
||||
pub mod ssl;
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use droplet_rs::manifest::generate_manifest_rusty;
|
||||
use tokio::runtime::{Handle, Runtime};
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn main() {
|
||||
let metrics = Handle::current().metrics();
|
||||
println!("using {} workers", metrics.num_workers());
|
||||
generate_manifest_rusty(
|
||||
&PathBuf::from("/home/decduck/.local/share/Steam/steamapps/common/Savage Resurrection"),
|
||||
|_| {},
|
||||
|message| {
|
||||
println!("{}", message);
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
@@ -1,14 +1,23 @@
|
||||
use std::{collections::HashMap, path::Path};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
future::Future,
|
||||
path::Path,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
|
||||
use anyhow::anyhow;
|
||||
use anyhow::{anyhow, Error};
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use hex::ToHex as _;
|
||||
use humansize::{BINARY, format_size};
|
||||
use humansize::{format_size, BINARY};
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use sha2::{Digest as _, Sha256};
|
||||
use tokio::io::AsyncReadExt as _;
|
||||
use tokio::{io::AsyncReadExt as _, join, sync::Mutex};
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, Clone)]
|
||||
struct FileEntry {
|
||||
filename: String,
|
||||
start: usize,
|
||||
@@ -16,7 +25,7 @@ struct FileEntry {
|
||||
permissions: u32,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, Clone)]
|
||||
struct ChunkData {
|
||||
files: Vec<FileEntry>,
|
||||
checksum: String,
|
||||
@@ -24,9 +33,9 @@ struct ChunkData {
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Manifest {
|
||||
version: String,
|
||||
chunks: HashMap<String, ChunkData>,
|
||||
size: u64,
|
||||
version: String,
|
||||
chunks: HashMap<String, ChunkData>,
|
||||
size: u64,
|
||||
}
|
||||
|
||||
const CHUNK_SIZE: u64 = 1024 * 1024 * 64;
|
||||
@@ -42,7 +51,7 @@ pub async fn generate_manifest_rusty<T: Fn(String) -> (), V: Fn(f32) -> ()>(
|
||||
let mut backend =
|
||||
create_backend_constructor(dir).ok_or(anyhow!("Could not create backend for path."))?()?;
|
||||
|
||||
let required_single_file = true; //backend.require_whole_files();
|
||||
let required_single_file = backend.require_whole_files();
|
||||
|
||||
let files = backend.list_files().await?;
|
||||
// Filepath to chunk data
|
||||
@@ -123,71 +132,107 @@ pub async fn generate_manifest_rusty<T: Fn(String) -> (), V: Fn(f32) -> ()>(
|
||||
chunks.len()
|
||||
));
|
||||
|
||||
let mut manifest: HashMap<String, ChunkData> = HashMap::new();
|
||||
let mut total_manifest_length = 0;
|
||||
let manifest: Arc<Mutex<HashMap<String, ChunkData>>> = Arc::new(Mutex::new(HashMap::new()));
|
||||
let total_manifest_length = Arc::new(AtomicU64::new(0));
|
||||
|
||||
let mut read_buf = vec![0; 1024 * 1024 * 64];
|
||||
let backend = Arc::new(Mutex::new(backend));
|
||||
|
||||
let chunk_len = chunks.len();
|
||||
let futures: FuturesUnordered<impl Future<Output = Result<(), Error>>> =
|
||||
FuturesUnordered::new();
|
||||
let (send_log, mut recieve_log) = tokio::sync::mpsc::channel(16);
|
||||
for (index, chunk) in chunks.into_iter().enumerate() {
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let mut hasher = Sha256::new();
|
||||
let send_log = send_log.clone();
|
||||
let backend = backend.clone();
|
||||
let total_manifest_length = total_manifest_length.clone();
|
||||
let manifest = manifest.clone();
|
||||
futures.push((async move || -> Result<(), Error> {
|
||||
let mut read_buf = vec![0; 1024 * 1024 * 64];
|
||||
|
||||
let mut chunk_data = ChunkData {
|
||||
files: Vec::new(),
|
||||
checksum: String::new(),
|
||||
};
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let mut hasher = Sha256::new();
|
||||
|
||||
let mut chunk_length = 0;
|
||||
let mut chunk_data = ChunkData {
|
||||
files: Vec::new(),
|
||||
checksum: String::new(),
|
||||
};
|
||||
|
||||
for (file, start, length) in chunk {
|
||||
log_sfn(format!(
|
||||
"reading {} from {} to {}, {}",
|
||||
file.relative_filename,
|
||||
start,
|
||||
start + length,
|
||||
format_size(length, BINARY)
|
||||
));
|
||||
let mut reader = backend.reader(&file, start, start + length).await?;
|
||||
let mut chunk_length = 0;
|
||||
|
||||
loop {
|
||||
let amount = reader.read(&mut read_buf).await?;
|
||||
if amount == 0 {
|
||||
break;
|
||||
for (file, start, length) in chunk {
|
||||
/*
|
||||
send_log
|
||||
.send(format!(
|
||||
"reading {} from {} to {}, {}",
|
||||
file.relative_filename,
|
||||
start,
|
||||
start + length,
|
||||
format_size(length, BINARY)
|
||||
))
|
||||
.await?;
|
||||
*/
|
||||
let mut reader = {
|
||||
let mut backend_lock = backend.lock().await;
|
||||
let reader = backend_lock.reader(&file, start, start + length).await?;
|
||||
reader
|
||||
};
|
||||
|
||||
loop {
|
||||
let amount = reader.read(&mut read_buf).await?;
|
||||
if amount == 0 {
|
||||
break;
|
||||
}
|
||||
hasher.update(&read_buf[0..amount]);
|
||||
}
|
||||
hasher.update(&read_buf[0..amount]);
|
||||
|
||||
chunk_length += length;
|
||||
|
||||
chunk_data.files.push(FileEntry {
|
||||
filename: file.relative_filename,
|
||||
start: start.try_into().unwrap(),
|
||||
length: length.try_into().unwrap(),
|
||||
permissions: file.permission,
|
||||
});
|
||||
}
|
||||
|
||||
chunk_length += length;
|
||||
send_log
|
||||
.send(format!(
|
||||
"created chunk of size {} (index {})",
|
||||
format_size(chunk_length, BINARY),
|
||||
index
|
||||
))
|
||||
.await?;
|
||||
|
||||
chunk_data.files.push(FileEntry {
|
||||
filename: file.relative_filename,
|
||||
start: start.try_into().unwrap(),
|
||||
length: length.try_into().unwrap(),
|
||||
permissions: file.permission,
|
||||
});
|
||||
}
|
||||
total_manifest_length.fetch_add(chunk_length, Ordering::Relaxed);
|
||||
|
||||
log_sfn(format!(
|
||||
"created chunk of size {} ({}/{})",
|
||||
format_size(chunk_length, BINARY),
|
||||
index,
|
||||
chunk_len
|
||||
));
|
||||
total_manifest_length += chunk_length;
|
||||
let hash: String = hasher.finalize().encode_hex();
|
||||
chunk_data.checksum = hash;
|
||||
{
|
||||
let mut manifest_lock = manifest.lock().await;
|
||||
manifest_lock.insert(uuid, chunk_data);
|
||||
};
|
||||
|
||||
let hash: String = hasher.finalize().encode_hex();
|
||||
chunk_data.checksum = hash;
|
||||
manifest.insert(uuid, chunk_data);
|
||||
|
||||
let progress: f32 = (index as f32 / chunk_len as f32) * 100.0f32;
|
||||
progress_sfn(progress);
|
||||
Ok(())
|
||||
})());
|
||||
}
|
||||
drop(send_log);
|
||||
join!(
|
||||
async move {
|
||||
while let Some(message) = recieve_log.recv().await {
|
||||
log_sfn(message);
|
||||
}
|
||||
},
|
||||
futures.collect::<Vec<Result<(), Error>>>()
|
||||
);
|
||||
|
||||
let manifest = manifest.lock().await;
|
||||
let manifest = manifest.clone();
|
||||
let manifest_size = size_of_val(&manifest);
|
||||
println!("manifest uses {} bytes", manifest_size);
|
||||
|
||||
Ok(json!(Manifest {
|
||||
version: "2".to_string(),
|
||||
chunks: manifest,
|
||||
size: total_manifest_length
|
||||
size: total_manifest_length.fetch_add(0, Ordering::Relaxed)
|
||||
})
|
||||
.to_string())
|
||||
}
|
||||
|
||||
@@ -195,8 +195,8 @@ impl VersionBackend for ZipVersionBackend {
|
||||
async fn reader(
|
||||
&mut self,
|
||||
file: &VersionFile,
|
||||
start: u64,
|
||||
end: u64,
|
||||
_start: u64,
|
||||
_end: u64,
|
||||
) -> anyhow::Result<Box<dyn MinimumFileObject>> {
|
||||
let mut read_command = Command::new("7z");
|
||||
read_command.args(vec!["e", "-so", &self.path, &file.relative_filename]);
|
||||
|
||||
Reference in New Issue
Block a user