From bb3280cedf108d1b76151073914069939bdbd95c Mon Sep 17 00:00:00 2001 From: quexeky Date: Sun, 25 Jan 2026 22:07:50 +1100 Subject: [PATCH] fix: Speedtest not registering number of bytes read --- cli/src/commands/connect/config.rs | 63 +++++++++++++------ cli/src/commands/connect/interactive.rs | 6 +- cli/src/commands/connect/mod.rs | 1 + cli/src/commands/connect/s3.rs | 2 +- .../commands/{upload => connect}/speedtest.rs | 22 ++++--- cli/src/commands/upload/chunk_reader.rs | 3 +- cli/src/commands/upload/interface.rs | 27 +++++--- cli/src/commands/upload/mod.rs | 5 +- cli/src/commands/upload/uploadable.rs | 2 - cli/src/manifest.rs | 10 +-- 10 files changed, 90 insertions(+), 51 deletions(-) rename cli/src/commands/{upload => connect}/speedtest.rs (55%) diff --git a/cli/src/commands/connect/config.rs b/cli/src/commands/connect/config.rs index 8ac23e81..1f42f7db 100644 --- a/cli/src/commands/connect/config.rs +++ b/cli/src/commands/connect/config.rs @@ -1,15 +1,22 @@ -use crate::{commands::{connect::{ - config_option::{ConfigOption, ConfigOptionCli}, - configurable::Configure, - s3::S3Config, -}, upload::speedtest::Speedtest}, manifest::DepotManifest}; +use crate::{ + commands::{ + connect::{ + config_option::{ConfigOption, ConfigOptionCli}, + configurable::Configure, + s3::S3Config, + speedtest::{SPEEDTEST_PATH, Speedtest} + }, + }, + manifest::DepotManifest, +}; use dialoguer::{Confirm, theme::ColorfulTheme}; use futures::AsyncWriteExt; +use indicatif::{ProgressBar, ProgressStyle}; use log::{debug, info, warn}; use opendal::Operator; use serde::{Deserialize, Serialize}; -use tokio_util::compat::FuturesAsyncWriteCompatExt; use std::{collections::HashMap, fs, ops::Not}; +use tokio_util::compat::FuturesAsyncWriteCompatExt; const CONFIG_DIR: &str = "downpour/config.json"; @@ -41,7 +48,7 @@ impl Config { let save_path = dirs::config_dir() .expect("Apparently your home directory doesn't exist") // Should probably formalise that error .join(CONFIG_DIR); - if fs::exists(&save_path).expect(&format!("Could not read save path {:#?}", &save_path)) { + if fs::exists(&save_path).unwrap_or_else(|_| panic!("Could not read save path {:#?}", &save_path)) { serde_json::from_str(&fs::read_to_string(save_path).unwrap()).unwrap() } else { Config::new() @@ -74,7 +81,6 @@ impl Config { }) .next() .cloned() - .map(|c| c.into()) } else { None } @@ -89,7 +95,7 @@ pub async fn manage_configuration( name: &String, option: &ConfigOptionCli, ) -> anyhow::Result<()> { - if config.exists(&name) { + if config.exists(name) { let confirm = Confirm::with_theme(&ColorfulTheme::default()) .with_prompt(format!( "An entry already exists with the name \"{}\". Would you like to overwrite it?", @@ -106,31 +112,50 @@ pub async fn manage_configuration( config.add_item(name.clone(), config_option.clone()); let operator = config_option.build()?; - generate_speedtest(&operator).await?; generate_manifest(&operator).await?; + generate_speedtest(&operator).await?; Ok(()) } async fn generate_speedtest(operator: &Operator) -> anyhow::Result<()> { - if operator.exists("speedtest").await?.not() { + // Workaround to operator.exists("...") also logging a 404 warning + let lister = operator.list_with(SPEEDTEST_PATH).limit(1).await?; + if lister.is_empty().not() { info!("Speedtest already exists on Depot. Skipping speedtest upload..."); - return Ok(()) + return Ok(()); } - let mut writer = operator.writer("speedtest").await?.into_futures_async_write().compat_write(); - let mut reader = Speedtest::new(); + let mut writer = operator + .writer(SPEEDTEST_PATH) + .await? + .into_futures_async_write() + .compat_write(); + + let progress_bar = ProgressBar::new(10_000).with_style( + ProgressStyle::default_bar() + .template("[{elapsed_precise}] [ETA {eta}] {bar} {percent_precise}%") + .unwrap(), + ); + + let mut reader = Speedtest::new(|progress| { + let progress_int = (progress * 100f32).round() as u64; + progress_bar.set_position(progress_int); + }); let written = tokio::io::copy(&mut reader, &mut writer).await?; debug!("Wrote {} bytes to {:?}", written, operator.info()); writer.into_inner().close().await?; Ok(()) } async fn generate_manifest(operator: &Operator) -> anyhow::Result<()> { - info!("Manifest already exists on Depot. Skipping manifest upload..."); - if operator.exists("manifest.json").await?.not() { - return Ok(()) + let lister = operator.list_with("manifest.json").limit(1).await?; + if lister.is_empty().not() { + info!("Manifest already exists on Depot. Skipping manifest upload..."); + return Ok(()); } let data = DepotManifest::new(); - operator.write("manifest.json", serde_json::to_string(&data)?).await?; + operator + .write("manifest.json", serde_json::to_string(&data)?) + .await?; Ok(()) -} \ No newline at end of file +} diff --git a/cli/src/commands/connect/interactive.rs b/cli/src/commands/connect/interactive.rs index 8c53012d..8e823198 100644 --- a/cli/src/commands/connect/interactive.rs +++ b/cli/src/commands/connect/interactive.rs @@ -8,7 +8,7 @@ macro_rules! interactive_variable { let $var = if let Some($var) = $value.$var { $var } else { - crate::commands::connect::interactive::query_variable($prompt).unwrap() + $crate::commands::connect::interactive::query_variable($prompt).unwrap() }; }; } @@ -18,7 +18,7 @@ macro_rules! interactive_optional_variable { let $var = if let Some($var) = $value.$var { Some($var) } else { - crate::commands::connect::interactive::query_optional_variable($prompt).unwrap() + $crate::commands::connect::interactive::query_optional_variable($prompt).unwrap() }; }; } @@ -40,7 +40,7 @@ where .with_prompt(prompt.to_string()) .allow_empty(true) .interact_text()?; - if input.to_string().len() == 0 { + if input.to_string().is_empty() { return Ok(None); } Ok(Some(input)) diff --git a/cli/src/commands/connect/mod.rs b/cli/src/commands/connect/mod.rs index dcb64e3e..397bc110 100644 --- a/cli/src/commands/connect/mod.rs +++ b/cli/src/commands/connect/mod.rs @@ -4,3 +4,4 @@ pub mod s3; #[macro_use] pub mod interactive; pub mod config_option; +pub mod speedtest; \ No newline at end of file diff --git a/cli/src/commands/connect/s3.rs b/cli/src/commands/connect/s3.rs index 6a7608d4..1b95c9e3 100644 --- a/cli/src/commands/connect/s3.rs +++ b/cli/src/commands/connect/s3.rs @@ -55,7 +55,7 @@ impl OperatorBuilder for S3Config { .secret_access_key(&self.secret_key) .region(&self.region) .endpoint(&self.endpoint) - .root(self.root.as_ref().map(|s| s.as_str()).unwrap_or("/")) + .root(self.root.as_deref().unwrap_or("/")) .bucket(&self.bucket_name) .disable_config_load(); diff --git a/cli/src/commands/upload/speedtest.rs b/cli/src/commands/connect/speedtest.rs similarity index 55% rename from cli/src/commands/upload/speedtest.rs rename to cli/src/commands/connect/speedtest.rs index 15a16bb9..a039469f 100644 --- a/cli/src/commands/upload/speedtest.rs +++ b/cli/src/commands/connect/speedtest.rs @@ -2,14 +2,15 @@ use rand::{RngCore, SeedableRng, rng, rngs::StdRng}; use tokio::io::AsyncRead; #[derive(Clone, Debug)] -pub struct Speedtest { +pub struct Speedtest { core: rand::rngs::StdRng, to_write: usize, + callback: Box, } pub const SPEEDTEST_BYTES: usize = 64 * 1024 * 1024; pub const SPEEDTEST_PATH: &str = "speedtest"; -impl AsyncRead for Speedtest { +impl AsyncRead for Speedtest { fn poll_read( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, @@ -17,17 +18,24 @@ impl AsyncRead for Speedtest { ) -> std::task::Poll> { let mut s = self; let to_write = buf.remaining().min(s.to_write); - s.to_write = s.to_write.saturating_sub(to_write); - let fill_slice = buf.initialize_unfilled_to(to_write); - s.core.fill_bytes(fill_slice); + + let filled = { + let fill_slice = buf.initialize_unfilled_to(to_write); + s.core.fill_bytes(fill_slice); + fill_slice.len() + }; + s.to_write = s.to_write.saturating_sub(filled); + (s.callback)((1f32 - (s.to_write as f32 / SPEEDTEST_BYTES as f32)) * 100f32); + buf.advance(filled); std::task::Poll::Ready(Ok(())) } } -impl Speedtest { - pub fn new() -> Self { +impl Speedtest { + pub fn new(callback: F) -> Self { Self { core: StdRng::from_rng(&mut rng()), to_write: SPEEDTEST_BYTES, + callback: Box::new(callback), } } } diff --git a/cli/src/commands/upload/chunk_reader.rs b/cli/src/commands/upload/chunk_reader.rs index 507b3d07..e51dc566 100644 --- a/cli/src/commands/upload/chunk_reader.rs +++ b/cli/src/commands/upload/chunk_reader.rs @@ -45,7 +45,7 @@ impl ChunkReader { .collect::>() .into_iter(); Self { - files: files, + files, active: None, } } @@ -65,7 +65,6 @@ impl AsyncRead for ChunkReader { continue; } Ok(n) => { - buf.advance(n); return Poll::Ready(Ok(())); diff --git a/cli/src/commands/upload/interface.rs b/cli/src/commands/upload/interface.rs index b2244142..36e8fdf3 100644 --- a/cli/src/commands/upload/interface.rs +++ b/cli/src/commands/upload/interface.rs @@ -1,10 +1,14 @@ -use std::path::{Path, PathBuf}; +use std::path::Path; use crate::{ cli::UploadInfo, - commands::{connect::config::Config, upload::{chunk_reader::ChunkReader, uploadable::OperatorBuilder}}, + commands::{ + connect::config::Config, + upload::{chunk_reader::ChunkReader, uploadable::OperatorBuilder}, + }, manifest::generate_manifest, }; +use futures::AsyncWriteExt; use log::info; use tokio_util::compat::FuturesAsyncWriteCompatExt; @@ -13,20 +17,25 @@ pub async fn upload(info: &UploadInfo, config: Config) -> anyhow::Result<()> { let path = &info.path; let version_id = &info.version_id; - let manifest = generate_manifest(&Path::new(path)).await?; + let manifest = generate_manifest(Path::new(path)).await?; let operator = match info.upload_style { crate::cli::UploadStyle::S3 => config - .get_active_s3() - .ok_or(anyhow::Error::msg("Could not get active S3 value"))?.build()?, + .get_active_s3() + .ok_or(anyhow::Error::msg("Could not get active S3 value"))? + .build()?, }; info!("Uploading chunks"); for (id, data) in &manifest.chunks { info!("Uploading chunk id {id}"); - let mut reader = ChunkReader::new(&path, data); - let mut writer = operator.writer(&format!("{game_id}/{version_id}/{id}")).await?.into_futures_async_write().compat_write(); - tokio::io::copy(&mut reader, &mut writer); + let mut reader = ChunkReader::new(path, data); + let mut writer = operator + .writer(&format!("{game_id}/{version_id}/{id}")) + .await? + .into_futures_async_write() + .compat_write(); + tokio::io::copy(&mut reader, &mut writer).await?; + writer.into_inner().close().await?; } info!("Finished uploading chunks"); Ok(()) } - diff --git a/cli/src/commands/upload/mod.rs b/cli/src/commands/upload/mod.rs index 76fe819d..03bf694e 100644 --- a/cli/src/commands/upload/mod.rs +++ b/cli/src/commands/upload/mod.rs @@ -1,4 +1,3 @@ -pub mod interface; -pub mod speedtest; -pub mod uploadable; pub mod chunk_reader; +pub mod interface; +pub mod uploadable; diff --git a/cli/src/commands/upload/uploadable.rs b/cli/src/commands/upload/uploadable.rs index 268a5d0b..c623c9eb 100644 --- a/cli/src/commands/upload/uploadable.rs +++ b/cli/src/commands/upload/uploadable.rs @@ -1,7 +1,5 @@ -use std::path::PathBuf; use async_trait::async_trait; -use droplet_rs::manifest::{ChunkData, Manifest}; use opendal::Operator; #[async_trait] diff --git a/cli/src/manifest.rs b/cli/src/manifest.rs index 0ef40c0f..e5e736f7 100644 --- a/cli/src/manifest.rs +++ b/cli/src/manifest.rs @@ -25,7 +25,7 @@ impl DepotManifest { content: HashMap::new(), } } - pub fn add(&mut self, game_id: String, version_id: String, compression: CompressionOption) { + pub fn append(&mut self, game_id: String, version_id: String, compression: CompressionOption) { self.content.insert( game_id, DepotManifestGameData { @@ -37,12 +37,13 @@ impl DepotManifest { } pub async fn generate_manifest(dir: &Path) -> anyhow::Result { - let progress_bar = ProgressBar::new(100_00).with_style( + let progress_bar = ProgressBar::new(10_000).with_style( ProgressStyle::default_bar() .template("[{elapsed_precise}] [ETA {eta}] {bar} {percent_precise}%") .unwrap(), ); - let res = generate_manifest_rusty( + + generate_manifest_rusty( dir, |progress| { let progress_int = (progress * 100f32).round() as u64; @@ -50,6 +51,5 @@ pub async fn generate_manifest(dir: &Path) -> anyhow::Result { }, |log| progress_bar.println(log), ) - .await; - res + .await }