diff --git a/cli/Cargo.lock b/cli/Cargo.lock index 50755aba..c7e11ca1 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -442,9 +442,9 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" [[package]] name = "der-parser" @@ -578,7 +578,6 @@ dependencies = [ [[package]] name = "droplet-rs" version = "0.14.1" -source = "git+https://github.com/Drop-OSS/droplet-rs.git#f17a585b563d874ef9a09c27be5169b7e728d148" dependencies = [ "anyhow", "async-trait", @@ -1270,9 +1269,9 @@ checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "libm" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" +checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" [[package]] name = "libredox" @@ -1375,9 +1374,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" [[package]] name = "num-integer" @@ -2151,10 +2150,11 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook-registry" -version = "1.4.7" +version = "1.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7664a098b8e616bdfcc2dc0e9ac44eb231eedf41db4e9fe95d8d32ec728dedad" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" dependencies = [ + "errno", "libc", ] @@ -2305,30 +2305,30 @@ dependencies = [ [[package]] name = "time" -version = "0.3.44" +version = "0.3.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5" dependencies = [ "deranged", "itoa", "num-conv", "powerfmt", - "serde", + "serde_core", "time-core", "time-macros", ] [[package]] name = "time-core" -version = "0.1.6" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.24" +version = "0.2.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4" dependencies = [ "num-conv", "time-core", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index cdfbd0fa..e506a0d7 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -11,7 +11,7 @@ clap = { version = "4.5.54", features = ["derive"] } console = "0.16.2" dialoguer = "0.12.0" dirs = "6.0.0" -droplet-rs = { git = "https://github.com/Drop-OSS/droplet-rs.git", version = "0.14" } +droplet-rs = { path = "../droplet-rs", version = "0.14" } fern = { version = "0.7.1", features = ["colored"] } futures = "0.3.31" indicatif = "0.18.3" diff --git a/cli/src/commands/connect/config.rs b/cli/src/commands/connect/config.rs index 3a0f5ce8..549109d9 100644 --- a/cli/src/commands/connect/config.rs +++ b/cli/src/commands/connect/config.rs @@ -100,8 +100,9 @@ pub async fn manage_configuration( let operator = config_option.build()?; generate_manifest(&operator).await?; + info!("Finished uploading manifest"); generate_speedtest(&operator).await?; - + info!("Finished uploading speedtest"); Ok(()) } @@ -129,8 +130,10 @@ async fn generate_speedtest(operator: &Operator) -> anyhow::Result<()> { progress_bar.set_position(progress_int); }); let written = tokio::io::copy(&mut reader, &mut writer).await?; + progress_bar.finish(); debug!("Wrote {} bytes to {:?}", written, operator.info()); writer.into_inner().close().await?; + debug!("Closed writer"); Ok(()) } diff --git a/cli/src/commands/upload/interface.rs b/cli/src/commands/upload/interface.rs index bb74fee9..2a16e552 100644 --- a/cli/src/commands/upload/interface.rs +++ b/cli/src/commands/upload/interface.rs @@ -1,4 +1,4 @@ -use std::{io::SeekFrom, path::Path, pin::Pin}; +use std::path::Path; use crate::{ cli::UploadInfo, @@ -6,19 +6,10 @@ use crate::{ manifest::{CompressionOption, DepotManifest, generate_v2_manifest}, operator_builder::OperatorBuilder, }; -use droplet_rs::manifest::ChunkData; -use futures::{AsyncWriteExt, StreamExt, TryStreamExt, future::join_all, stream}; +use futures::AsyncWriteExt; use log::info; -use opendal::Operator; -use tokio::{ - fs::File, - io::{AsyncRead, AsyncReadExt, AsyncSeekExt, Take}, -}; -use tokio_util::{ - bytes::Bytes, - compat::FuturesAsyncWriteCompatExt, - io::{ReaderStream, StreamReader}, -}; +use opendal::{FuturesAsyncWriter, Operator}; +use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; pub async fn upload( info: &UploadInfo, @@ -33,21 +24,25 @@ pub async fn upload( let mut existing_depot_manifest = get_depot_manifest(&operator).await?; - let v2_manifest = generate_v2_manifest(Path::new(path)).await?; - info!("Uploading chunks"); - for (id, data) in &v2_manifest.chunks { - info!("Uploading chunk id {id}"); - let mut reader = generate_chunk_readstream(path, data).await; - let mut writer = operator - .writer(&format!("{game_id}/{version_id}/{id}")) - .await? - .into_futures_async_write() - .compat_write(); - tokio::io::copy(&mut reader, &mut writer).await?; - writer.into_inner().close().await?; - } + let v2_manifest = generate_v2_manifest( + Path::new(path), + async |id: String| { + info!("Uploading chunk id {id}"); + let writer = operator + .writer(&format!("{game_id}/{version_id}/{id}")) + .await + .unwrap() + .into_futures_async_write() + .compat_write(); + writer + }, + |writer: Compat| async { + writer.into_inner().close().await.unwrap(); + }, + ) + .await?; info!("Finished uploading chunks"); @@ -59,32 +54,6 @@ pub async fn upload( Ok(()) } -// Black magic don't touch -/// Connects all of the files at the correct start and end points into a single, continuous AsyncRead object -pub async fn generate_chunk_readstream<'a, P: AsRef + 'a>( - path: P, - data: &'a ChunkData, -) -> Pin> { - let path = path.as_ref().to_path_buf(); - let files = data.files.clone(); - - let stream = stream::iter(files) - .map(move |f| { - let path = path.clone(); - // Lazy block to ensure that not too many files get opened at once - async move { - let mut file = File::open(path.join(f.filename)).await?; - file.seek(SeekFrom::Start(f.start as u64)).await?; - tokio::io::Result::Ok(file.take(f.length as u64)) - } - }) - .buffered(2) // Could also be 1. Just removes a bit of latency from opening files buy preparing the next one immediately - .map_ok(|file| ReaderStream::new(file)) - .try_flatten(); - let reader = StreamReader::new(stream); - Box::pin(reader) -} - async fn get_depot_manifest(operator: &Operator) -> Result { let existing_depot_manifest = operator.read("manifest.json").await?.to_bytes(); let existing_depot_manifest: DepotManifest = diff --git a/cli/src/manifest.rs b/cli/src/manifest.rs index fe766f15..b84884c2 100644 --- a/cli/src/manifest.rs +++ b/cli/src/manifest.rs @@ -1,9 +1,12 @@ use std::{collections::HashMap, path::Path}; -use droplet_rs::manifest::{Manifest, generate_manifest_rusty}; +use droplet_rs::manifest::{ + Manifest, generate_manifest_rusty, generate_manifest_rusty_v2, +}; use indicatif::{ProgressBar, ProgressStyle}; use log::info; use serde::{Deserialize, Serialize}; +use tokio::io::AsyncWrite; #[derive(Serialize, Deserialize)] pub struct DepotManifest { @@ -37,20 +40,27 @@ impl DepotManifest { } } -pub async fn generate_v2_manifest(dir: &Path) -> anyhow::Result { +pub async fn generate_v2_manifest(dir: &Path, factory: F, closer: CloseF) -> anyhow::Result +where + W: AsyncWrite + Unpin, + F: AsyncFn(String) -> W, + CloseF: AsyncFn(W) +{ let progress_bar = ProgressBar::new(10_000).with_style( ProgressStyle::default_bar() .template("[{elapsed_precise}] [ETA {eta}] {bar} {percent_precise}%") .unwrap(), ); - generate_manifest_rusty( + generate_manifest_rusty_v2( dir, |progress| { let progress_int = (progress * 100f32).round() as u64; progress_bar.set_position(progress_int); }, |log| progress_bar.suspend(|| info!("{}", log)), + factory, + closer ) .await }